Read text messages from JMS queue and append to file
- listens to the JMS queue “test” receiving
String
s (1),
- converts incoming data to
akka.util.ByteString
(3),
- and appends the data to the file
target/out
(2).
- Scala
-
sourceimport java.nio.file.Paths
import akka.stream.IOResult
import akka.stream.alpakka.jms.JmsConsumerSettings
import akka.stream.alpakka.jms.scaladsl.{JmsConsumer, JmsConsumerControl}
import akka.stream.scaladsl.{FileIO, Keep, Sink, Source}
import akka.util.ByteString
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
val jmsSource: Source[String, JmsConsumerControl] = // (1)
JmsConsumer.textSource(
JmsConsumerSettings(system, connectionFactory).withBufferSize(10).withQueue("test")
)
val fileSink: Sink[ByteString, Future[IOResult]] = // (2)
FileIO.toPath(Paths.get("target/out.txt"))
val (runningSource, finished): (JmsConsumerControl, Future[IOResult]) =
// stream element type
jmsSource //: String
.map(ByteString(_)) //: ByteString (3)
.toMat(fileSink)(Keep.both)
.run()
- Java
-
source
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.japi.Pair;
import akka.stream.IOResult;
import akka.stream.alpakka.jms.JmsConsumerSettings;
import akka.stream.alpakka.jms.JmsProducerSettings;
import akka.stream.alpakka.jms.javadsl.JmsConsumer;
import akka.stream.alpakka.jms.javadsl.JmsConsumerControl;
import akka.stream.alpakka.jms.javadsl.JmsProducer;
import akka.stream.javadsl.FileIO;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.util.ByteString;
import scala.concurrent.ExecutionContext;
import javax.jms.ConnectionFactory;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
Source<String, JmsConsumerControl> jmsSource = // (1)
JmsConsumer.textSource(
JmsConsumerSettings.create(system, connectionFactory).withQueue("test"));
Sink<ByteString, CompletionStage<IOResult>> fileSink =
FileIO.toPath(Paths.get("target/out.txt")); // (2)
Pair<JmsConsumerControl, CompletionStage<IOResult>> pair =
jmsSource // : String
.map(ByteString::fromString) // : ByteString (3)
.toMat(fileSink, Keep.both())
.run(system);
Found an error in this documentation? The source code for this page can be found
here.
Please feel free to edit and contribute a pull request.