Read text messages from JMS queue and send to web socket

  • listens to the JMS queue “test” receiving Strings (1),
  • configures a web socket flow to localhost (2),
  • converts incoming data to a ws.TextMessageakka.http.javadsl.model.ws.TextMessage,
  • pass the message via the web socket flow (4),
  • convert the (potentially chunked) web socket reply to a String (5),
  • prefix the String (6),
  • end the stream by writing the values to standard out (7).
Scala
sourceimport akka.Done
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.actor.typed.scaladsl.adapter._
import akka.http.scaladsl.model.ws.{WebSocketRequest, WebSocketUpgradeResponse}
import akka.stream.alpakka.jms.JmsConsumerSettings
import akka.stream.alpakka.jms.scaladsl.{JmsConsumer, JmsConsumerControl}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}

import scala.concurrent.Future

val jmsSource: Source[String, JmsConsumerControl] =
  JmsConsumer.textSource(                                                           // (1)
    JmsConsumerSettings(system, connectionFactory).withBufferSize(10).withQueue("test")
  )

val webSocketFlow: Flow[ws.Message, ws.Message, Future[WebSocketUpgradeResponse]] = // (2)
  Http(system.toClassic).webSocketClientFlow(WebSocketRequest("ws://localhost:8080/webSocket/ping"))

val ((runningSource, wsUpgradeResponse), streamCompletion): ((JmsConsumerControl, Future[WebSocketUpgradeResponse]), Future[Done]) =
                                                   // stream element type
  jmsSource                                        //: String
    .map(ws.TextMessage(_))                        //: ws.TextMessage                  (3)
    .viaMat(webSocketFlow)(Keep.both)              //: ws.TextMessage                  (4)
    .mapAsync(1)(wsMessageToString)                //: String                          (5)
    .map("client received: " + _)                  //: String                          (6)
    .toMat(Sink.foreach(println))(Keep.both)       //                                  (7)
    .run()

/**
 * Convert potentially chunked WebSocket Message to a string.
 */
def wsMessageToString: ws.Message => Future[String] = {
  case message: ws.TextMessage.Strict =>
    Future.successful(message.text)

  case message: ws.TextMessage.Streamed =>
    val seq = message.textStream.runWith(Sink.seq)
    seq.map(seq => seq.mkString)

  case message =>
    Future.successful(message.toString)
}
Java
source
import akka.Done; import akka.actor.typed.ActorSystem; import akka.actor.typed.javadsl.Behaviors; import static akka.actor.typed.javadsl.Adapter.*; import akka.http.javadsl.Http; import akka.http.javadsl.model.StatusCodes; import akka.http.javadsl.model.ws.Message; import akka.http.javadsl.model.ws.TextMessage; import akka.http.javadsl.model.ws.WebSocketRequest; import akka.http.javadsl.model.ws.WebSocketUpgradeResponse; import akka.japi.Pair; import akka.stream.alpakka.jms.JmsConsumerSettings; import akka.stream.alpakka.jms.JmsProducerSettings; import akka.stream.alpakka.jms.javadsl.JmsConsumer; import akka.stream.alpakka.jms.javadsl.JmsConsumerControl; import akka.stream.alpakka.jms.javadsl.JmsProducer; import akka.stream.javadsl.Flow; import akka.stream.javadsl.Keep; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import playground.ActiveMqBroker; import playground.WebServer; import scala.concurrent.ExecutionContext; import javax.jms.ConnectionFactory; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; final Http http = Http.get(toClassic(system)); Source<String, JmsConsumerControl> jmsSource = // (1) JmsConsumer.textSource( JmsConsumerSettings.create(system, connectionFactory) .withBufferSize(10) .withQueue("test")); Flow<Message, Message, CompletionStage<WebSocketUpgradeResponse>> webSocketFlow = // (2) http.webSocketClientFlow(WebSocketRequest.create("ws://localhost:8080/webSocket/ping")); int parallelism = 4; Pair<Pair<JmsConsumerControl, CompletionStage<WebSocketUpgradeResponse>>, CompletionStage<Done>> pair = jmsSource // : String .map( s -> { Message msg = TextMessage.create(s); return msg; }) // : Message (3) .viaMat(webSocketFlow, Keep.both()) // : Message (4) .mapAsync(parallelism, this::wsMessageToString) // : String (5) .map(s -> "client received: " + s) // : String (6) .toMat(Sink.foreach(System.out::println), Keep.both()) // (7) .run(system); /** Convert potentially chunked WebSocket Message to a string. */ private CompletionStage<String> wsMessageToString(Message msg) { if (msg.isText()) { TextMessage tMsg = msg.asTextMessage(); if (tMsg.isStrict()) { return CompletableFuture.completedFuture(tMsg.getStrictText()); } else { CompletionStage<List<String>> strings = tMsg.getStreamedText().runWith(Sink.seq(), system); return strings.thenApply(list -> String.join("", list)); } } else { return CompletableFuture.completedFuture(msg.toString()); } }
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.