Read text messages from JMS queue and create one file per message

  • listens to the JMS queue “test” receiving Strings (1),
  • converts incoming data to akka.util.ByteString (2),
  • combines the incoming data with a counter (3),
  • creates an intermediary stream writing the incoming data to a file using the counter value to create unique file names (4).
Scala
sourceimport java.nio.file.Paths

import akka.stream.alpakka.jms.JmsConsumerSettings
import akka.stream.alpakka.jms.scaladsl.{JmsConsumer, JmsConsumerControl}
import akka.stream.scaladsl.{FileIO, Keep, Sink, Source}
import akka.util.ByteString

import scala.concurrent.duration.DurationInt

val jmsSource: Source[String, JmsConsumerControl] =                                   // (1)
  JmsConsumer.textSource(
    JmsConsumerSettings(system, connectionFactory).withBufferSize(10).withQueue("test")
  )
                                                          // stream element type
val runningSource = jmsSource                             //: String
  .map(ByteString(_))                                     //: ByteString         (2)
  .zipWithIndex                                           //: (ByteString, Long) (3)
  .mapAsyncUnordered(parallelism = 5) { case (byteStr, number) =>
    Source                                                //                     (4)
      .single(byteStr)
      .runWith(FileIO.toPath(Paths.get(s"target/out-$number.txt")))
  }                                                       //: IoResult
  .toMat(Sink.ignore)(Keep.left)
  .run()
Java
source
import akka.Done; import akka.actor.typed.ActorSystem; import akka.actor.typed.javadsl.Behaviors; import akka.japi.Pair; import akka.stream.KillSwitch; import akka.stream.alpakka.jms.JmsConsumerSettings; import akka.stream.alpakka.jms.JmsProducerSettings; import akka.stream.alpakka.jms.javadsl.JmsConsumer; import akka.stream.alpakka.jms.javadsl.JmsConsumerControl; import akka.stream.alpakka.jms.javadsl.JmsProducer; import akka.stream.javadsl.FileIO; import akka.stream.javadsl.Keep; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import akka.util.ByteString; import playground.ActiveMqBroker; import scala.concurrent.ExecutionContext; import javax.jms.ConnectionFactory; import java.nio.file.Paths; import java.util.Arrays; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; Source<String, JmsConsumerControl> jmsConsumer = // (1) JmsConsumer.textSource( JmsConsumerSettings.create(system, connectionFactory).withQueue("test")); int parallelism = 5; Pair<JmsConsumerControl, CompletionStage<Done>> pair = jmsConsumer // : String .map(ByteString::fromString) // : ByteString (2) .zipWithIndex() // : Pair<ByteString, Long> (3) .mapAsyncUnordered( parallelism, (in) -> { ByteString byteString = in.first(); Long number = in.second(); return Source // (4) .single(byteString) .runWith( FileIO.toPath(Paths.get("target/out-" + number + ".txt")), system); }) // : IoResult .toMat(Sink.ignore(), Keep.both()) .run(system);
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.