Read text messages from JMS queue and create one file per message
- listens to the JMS queue “test” receiving
String
s (1),
- converts incoming data to
akka.util.ByteString
(2),
- combines the incoming data with a counter (3),
- creates an intermediary stream writing the incoming data to a file using the counter value to create unique file names (4).
- Scala
-
sourceimport java.nio.file.Paths
import akka.stream.alpakka.jms.JmsConsumerSettings
import akka.stream.alpakka.jms.scaladsl.{JmsConsumer, JmsConsumerControl}
import akka.stream.scaladsl.{FileIO, Keep, Sink, Source}
import akka.util.ByteString
import scala.concurrent.duration.DurationInt
val jmsSource: Source[String, JmsConsumerControl] = // (1)
JmsConsumer.textSource(
JmsConsumerSettings(system, connectionFactory).withBufferSize(10).withQueue("test")
)
// stream element type
val runningSource = jmsSource //: String
.map(ByteString(_)) //: ByteString (2)
.zipWithIndex //: (ByteString, Long) (3)
.mapAsyncUnordered(parallelism = 5) { case (byteStr, number) =>
Source // (4)
.single(byteStr)
.runWith(FileIO.toPath(Paths.get(s"target/out-$number.txt")))
} //: IoResult
.toMat(Sink.ignore)(Keep.left)
.run()
- Java
-
source
import akka.Done;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.japi.Pair;
import akka.stream.KillSwitch;
import akka.stream.alpakka.jms.JmsConsumerSettings;
import akka.stream.alpakka.jms.JmsProducerSettings;
import akka.stream.alpakka.jms.javadsl.JmsConsumer;
import akka.stream.alpakka.jms.javadsl.JmsConsumerControl;
import akka.stream.alpakka.jms.javadsl.JmsProducer;
import akka.stream.javadsl.FileIO;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.util.ByteString;
import playground.ActiveMqBroker;
import scala.concurrent.ExecutionContext;
import javax.jms.ConnectionFactory;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
Source<String, JmsConsumerControl> jmsConsumer = // (1)
JmsConsumer.textSource(
JmsConsumerSettings.create(system, connectionFactory).withQueue("test"));
int parallelism = 5;
Pair<JmsConsumerControl, CompletionStage<Done>> pair =
jmsConsumer // : String
.map(ByteString::fromString) // : ByteString (2)
.zipWithIndex() // : Pair<ByteString, Long> (3)
.mapAsyncUnordered(
parallelism,
(in) -> {
ByteString byteString = in.first();
Long number = in.second();
return Source // (4)
.single(byteString)
.runWith(
FileIO.toPath(Paths.get("target/out-" + number + ".txt")), system);
}) // : IoResult
.toMat(Sink.ignore(), Keep.both())
.run(system);
Found an error in this documentation? The source code for this page can be found
here.
Please feel free to edit and contribute a pull request.