Read text messages from JMS queue and send to web socket
- listens to the JMS queue “test” receiving
String
s (1),
- configures a web socket flow to localhost (2),
- converts incoming data to a
ws.TextMessage
akka.http.javadsl.model.ws.TextMessage
,
- pass the message via the web socket flow (4),
- convert the (potentially chunked) web socket reply to a
String
(5),
- prefix the
String
(6),
- end the stream by writing the values to standard out (7).
- Scala
-
sourceimport akka.Done
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.actor.typed.scaladsl.adapter._
import akka.http.scaladsl.model.ws.{WebSocketRequest, WebSocketUpgradeResponse}
import akka.stream.alpakka.jms.JmsConsumerSettings
import akka.stream.alpakka.jms.scaladsl.{JmsConsumer, JmsConsumerControl}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import scala.concurrent.Future
val jmsSource: Source[String, JmsConsumerControl] =
JmsConsumer.textSource( // (1)
JmsConsumerSettings(system, connectionFactory).withBufferSize(10).withQueue("test")
)
val webSocketFlow: Flow[ws.Message, ws.Message, Future[WebSocketUpgradeResponse]] = // (2)
Http(system.toClassic).webSocketClientFlow(WebSocketRequest("ws://localhost:8080/webSocket/ping"))
val ((runningSource, wsUpgradeResponse), streamCompletion): ((JmsConsumerControl, Future[WebSocketUpgradeResponse]), Future[Done]) =
// stream element type
jmsSource //: String
.map(ws.TextMessage(_)) //: ws.TextMessage (3)
.viaMat(webSocketFlow)(Keep.both) //: ws.TextMessage (4)
.mapAsync(1)(wsMessageToString) //: String (5)
.map("client received: " + _) //: String (6)
.toMat(Sink.foreach(println))(Keep.both) // (7)
.run()
/**
* Convert potentially chunked WebSocket Message to a string.
*/
def wsMessageToString: ws.Message => Future[String] = {
case message: ws.TextMessage.Strict =>
Future.successful(message.text)
case message: ws.TextMessage.Streamed =>
val seq = message.textStream.runWith(Sink.seq)
seq.map(seq => seq.mkString)
case message =>
Future.successful(message.toString)
}
- Java
-
source
import akka.Done;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import static akka.actor.typed.javadsl.Adapter.*;
import akka.http.javadsl.Http;
import akka.http.javadsl.model.StatusCodes;
import akka.http.javadsl.model.ws.Message;
import akka.http.javadsl.model.ws.TextMessage;
import akka.http.javadsl.model.ws.WebSocketRequest;
import akka.http.javadsl.model.ws.WebSocketUpgradeResponse;
import akka.japi.Pair;
import akka.stream.alpakka.jms.JmsConsumerSettings;
import akka.stream.alpakka.jms.JmsProducerSettings;
import akka.stream.alpakka.jms.javadsl.JmsConsumer;
import akka.stream.alpakka.jms.javadsl.JmsConsumerControl;
import akka.stream.alpakka.jms.javadsl.JmsProducer;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import playground.ActiveMqBroker;
import playground.WebServer;
import scala.concurrent.ExecutionContext;
import javax.jms.ConnectionFactory;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
final Http http = Http.get(toClassic(system));
Source<String, JmsConsumerControl> jmsSource = // (1)
JmsConsumer.textSource(
JmsConsumerSettings.create(system, connectionFactory)
.withBufferSize(10)
.withQueue("test"));
Flow<Message, Message, CompletionStage<WebSocketUpgradeResponse>> webSocketFlow = // (2)
http.webSocketClientFlow(WebSocketRequest.create("ws://localhost:8080/webSocket/ping"));
int parallelism = 4;
Pair<Pair<JmsConsumerControl, CompletionStage<WebSocketUpgradeResponse>>, CompletionStage<Done>>
pair =
jmsSource // : String
.map(
s -> {
Message msg = TextMessage.create(s);
return msg;
}) // : Message (3)
.viaMat(webSocketFlow, Keep.both()) // : Message (4)
.mapAsync(parallelism, this::wsMessageToString) // : String (5)
.map(s -> "client received: " + s) // : String (6)
.toMat(Sink.foreach(System.out::println), Keep.both()) // (7)
.run(system);
/** Convert potentially chunked WebSocket Message to a string. */
private CompletionStage<String> wsMessageToString(Message msg) {
if (msg.isText()) {
TextMessage tMsg = msg.asTextMessage();
if (tMsg.isStrict()) {
return CompletableFuture.completedFuture(tMsg.getStrictText());
} else {
CompletionStage<List<String>> strings =
tMsg.getStreamedText().runWith(Sink.seq(), system);
return strings.thenApply(list -> String.join("", list));
}
} else {
return CompletableFuture.completedFuture(msg.toString());
}
}
Found an error in this documentation? The source code for this page can be found
here.
Please feel free to edit and contribute a pull request.