Read text messages from JMS queue and send to web server

  • listens to the JMS queue “test” receiving Strings (1),
  • converts incoming data to akka.util.ByteString (2),
  • puts the received text into an HttpRequest (3),
  • sends the created request via Akka Http (4),
  • prints the HttpResponse to standard out (5).
Scala
sourceimport akka.Done
import akka.actor.typed.scaladsl.adapter._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.alpakka.jms.JmsConsumerSettings
import akka.stream.alpakka.jms.scaladsl.{JmsConsumer, JmsConsumerControl}
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.util.ByteString

import scala.concurrent.Future
import scala.concurrent.duration.DurationInt

val jmsSource: Source[String, JmsConsumerControl] =                                 // (1)
JmsConsumer.textSource(
  JmsConsumerSettings(system, connectionFactory).withBufferSize(10).withQueue("test")
)

val (runningSource, finished): (JmsConsumerControl, Future[Done]) =
  jmsSource                                                   //: String
    .map(ByteString(_))                                       //: ByteString   (2)
    .map { bs =>
    HttpRequest(uri = Uri("http://localhost:8080/hello"),   //: HttpRequest  (3)
      entity = HttpEntity(bs))
  }
    .mapAsyncUnordered(4)(Http(system).singleRequest(_))            //: HttpResponse (4)
    .toMat(Sink.foreach(println))(Keep.both)                  //               (5)
    .run()
Java
source
import akka.Done; import akka.actor.typed.ActorSystem; import akka.actor.typed.javadsl.Behaviors; import static akka.actor.typed.javadsl.Adapter.*; import akka.http.javadsl.Http; import akka.http.javadsl.model.HttpRequest; import akka.japi.Pair; import akka.stream.alpakka.jms.JmsConsumerSettings; import akka.stream.alpakka.jms.JmsProducerSettings; import akka.stream.alpakka.jms.javadsl.JmsConsumer; import akka.stream.alpakka.jms.javadsl.JmsConsumerControl; import akka.stream.alpakka.jms.javadsl.JmsProducer; import akka.stream.javadsl.Keep; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import akka.util.ByteString; import playground.ActiveMqBroker; import playground.WebServer; import scala.concurrent.ExecutionContext; import javax.jms.ConnectionFactory; import java.util.Arrays; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; final Http http = Http.get(toClassic(system)); Source<String, JmsConsumerControl> jmsSource = // (1) JmsConsumer.textSource( JmsConsumerSettings.create(system, connectionFactory).withQueue("test")); int parallelism = 4; Pair<JmsConsumerControl, CompletionStage<Done>> pair = jmsSource // : String .map(ByteString::fromString) // : ByteString (2) .map( bs -> HttpRequest.create("http://localhost:8080/hello") .withEntity(bs)) // : HttpRequest (3) .mapAsyncUnordered(parallelism, http::singleRequest) // : HttpResponse (4) .toMat(Sink.foreach(System.out::println), Keep.both()) // (5) .run(system);
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.