Read text messages from JMS queue and send to web server
- listens to the JMS queue “test” receiving
String
s (1),
- converts incoming data to
akka.util.ByteString
(2),
- puts the received text into an
HttpRequest
(3),
- sends the created request via Akka Http (4),
- prints the
HttpResponse
to standard out (5).
- Scala
-
sourceimport akka.Done
import akka.actor.typed.scaladsl.adapter._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.alpakka.jms.JmsConsumerSettings
import akka.stream.alpakka.jms.scaladsl.{JmsConsumer, JmsConsumerControl}
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.util.ByteString
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
val jmsSource: Source[String, JmsConsumerControl] = // (1)
JmsConsumer.textSource(
JmsConsumerSettings(system, connectionFactory).withBufferSize(10).withQueue("test")
)
val (runningSource, finished): (JmsConsumerControl, Future[Done]) =
jmsSource //: String
.map(ByteString(_)) //: ByteString (2)
.map { bs =>
HttpRequest(uri = Uri("http://localhost:8080/hello"), //: HttpRequest (3)
entity = HttpEntity(bs))
}
.mapAsyncUnordered(4)(Http(system).singleRequest(_)) //: HttpResponse (4)
.toMat(Sink.foreach(println))(Keep.both) // (5)
.run()
- Java
-
source
import akka.Done;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import static akka.actor.typed.javadsl.Adapter.*;
import akka.http.javadsl.Http;
import akka.http.javadsl.model.HttpRequest;
import akka.japi.Pair;
import akka.stream.alpakka.jms.JmsConsumerSettings;
import akka.stream.alpakka.jms.JmsProducerSettings;
import akka.stream.alpakka.jms.javadsl.JmsConsumer;
import akka.stream.alpakka.jms.javadsl.JmsConsumerControl;
import akka.stream.alpakka.jms.javadsl.JmsProducer;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.util.ByteString;
import playground.ActiveMqBroker;
import playground.WebServer;
import scala.concurrent.ExecutionContext;
import javax.jms.ConnectionFactory;
import java.util.Arrays;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
final Http http = Http.get(toClassic(system));
Source<String, JmsConsumerControl> jmsSource = // (1)
JmsConsumer.textSource(
JmsConsumerSettings.create(system, connectionFactory).withQueue("test"));
int parallelism = 4;
Pair<JmsConsumerControl, CompletionStage<Done>> pair =
jmsSource // : String
.map(ByteString::fromString) // : ByteString (2)
.map(
bs ->
HttpRequest.create("http://localhost:8080/hello")
.withEntity(bs)) // : HttpRequest (3)
.mapAsyncUnordered(parallelism, http::singleRequest) // : HttpResponse (4)
.toMat(Sink.foreach(System.out::println), Keep.both()) // (5)
.run(system);
Found an error in this documentation? The source code for this page can be found
here.
Please feel free to edit and contribute a pull request.