Read text messages from JMS queue and append to file

  • listens to the JMS queue “test” receiving Strings (1),
  • converts incoming data to akka.util.ByteString (3),
  • and appends the data to the file target/out (2).
Scala
sourceimport java.nio.file.Paths

import akka.stream.IOResult
import akka.stream.alpakka.jms.JmsConsumerSettings
import akka.stream.alpakka.jms.scaladsl.{JmsConsumer, JmsConsumerControl}
import akka.stream.scaladsl.{FileIO, Keep, Sink, Source}
import akka.util.ByteString

import scala.concurrent.Future
import scala.concurrent.duration.DurationInt

val jmsSource: Source[String, JmsConsumerControl] =        // (1)
  JmsConsumer.textSource(
    JmsConsumerSettings(system, connectionFactory).withBufferSize(10).withQueue("test")
  )

val fileSink: Sink[ByteString, Future[IOResult]] = // (2)
  FileIO.toPath(Paths.get("target/out.txt"))

val (runningSource, finished): (JmsConsumerControl, Future[IOResult]) =
                                                   // stream element type
  jmsSource                                        //: String
    .map(ByteString(_))                            //: ByteString    (3)
    .toMat(fileSink)(Keep.both)
    .run()
Java
source
import akka.actor.typed.ActorSystem; import akka.actor.typed.javadsl.Behaviors; import akka.japi.Pair; import akka.stream.IOResult; import akka.stream.alpakka.jms.JmsConsumerSettings; import akka.stream.alpakka.jms.JmsProducerSettings; import akka.stream.alpakka.jms.javadsl.JmsConsumer; import akka.stream.alpakka.jms.javadsl.JmsConsumerControl; import akka.stream.alpakka.jms.javadsl.JmsProducer; import akka.stream.javadsl.FileIO; import akka.stream.javadsl.Keep; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import akka.util.ByteString; import scala.concurrent.ExecutionContext; import javax.jms.ConnectionFactory; import java.nio.file.Paths; import java.util.Arrays; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; Source<String, JmsConsumerControl> jmsSource = // (1) JmsConsumer.textSource( JmsConsumerSettings.create(system, connectionFactory).withQueue("test")); Sink<ByteString, CompletionStage<IOResult>> fileSink = FileIO.toPath(Paths.get("target/out.txt")); // (2) Pair<JmsConsumerControl, CompletionStage<IOResult>> pair = jmsSource // : String .map(ByteString::fromString) // : ByteString (3) .toMat(fileSink, Keep.both()) .run(system);
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.