Intro
Akka Streams is a powerful and flexible toolkit for building highly concurrent, distributed and fault-tolerant event-driven applications. It provides a high-level API geared towards efficient processing. This article is directed to developers that have attended the Akka Streams Course from the Akkademy or finished the Quickstart Guide. It is designed to be a follow-up read where you can learn the concepts and the best practices for production-ready Streaming applications. I hope you enjoy this guide and that it will get you up to speed quickly.
Tips and tricks
Operators
In Akka Streams, operators are the functional components that perform specific data processing tasks within your streaming pipeline. They enable you to transform, filter, aggregate, or manipulate the data as it flows through an Akka stream. Operators are combined with Sources, Flows and Sinks to create a comprehensive data processing pipeline.
- Source: An operator emitting data elements to exactly one output.
- Flow: An operator transforming data elements with exactly one input and output.
- Sink: An operator receiving data elements from exactly one input.
For a complete overview of the built-in processing operators, you can look at the operator index (This page provides you with examples and is a resource I use a lot.)
There are a lot of operators that have valuable functions and some operators that you are going to need more often. I want to explain some and show you when you should use them.
- Logging with
Log
- Transforming elements with
Map
- Transforming elements with
MapConcat
- Emitting multiple elements from one element with
FlatMapConcat
- Stream enrichment with
ZipWith
Logging with log
The log
operator enables the logging of elements passing through the stream, making it easier to monitor and debug data flow.
List<Integer> elements = Arrays.asList(1, 2, 3);
Source<Integer, NotUsed> source = Source.from(elements);
source
.map(e -> {
if (e == 3) {
throw new RuntimeException();
} else {
return e;
}
})
.log("ClassName.MethodName")
.withAttributes(
Attributes.createLogLevels(
Attributes.logLevelOff(),
Attributes.logLevelInfo(),
Attributes.logLevelError()))
.runForeach(System.out::println, system);
/* Prints:
1
2
[ERROR] [example] Upstream failed.
java.lang.RuntimeException ...
*/
val source = Source(List(1, 2, 3))
source
.map(e => if (e == 3) throw new RuntimeException() else e)
.log("ClassName.MethodName")
.addAttributes(
Attributes.logLevels(
onElement = Attributes.LogLevels.Off,
onFinish = Attributes.LogLevels.Info,
onFailure = Attributes.LogLevels.Error))
.runForeach(println)
/* Prints:
1
2
[ERROR] [example] Upstream failed.
java.lang.RuntimeException ...
*/
The log operator is applied to a Source stream containing integers. It logs elements with the prefix "ClassName.MethodName" as they pass through the stream. The stream is then processed using runForeach, which prints each element to the console. The default log levels do not log each element. There is only one info message after the stream is completed or in case of failures. The addAttributes configuration is only needed if you want to implement a different behavior.
The log operator is particularly useful during the development and debugging phase of an Akka Streams application. It allows developers to track the flow of data through the stream, helping to identify unexpected behavior, which in turn, facilitates quick resolution of issues in the streaming pipeline.
Transforming elements with map
The map
operator is a transformation operator in the reactive programming world. It's used to transform the items emitted into other elements. The map operator takes each item emitted by the source and applies a function to it. It always transforms one element into one element.
Source.range(1,4)
.map(i -> Arrays.asList(i * 2))
.runForeach(System.out::println, system);
/* Prints:
2
4
6
8
*/
val source = Source(1 to 4)
def double(i: Int): Int = i * 2
source.map(i => double(i).toString).runForeach(println)
/* Prints:
2
4
6
8
*/
Transforming elements with mapConcat
The mapConcat
operator is a transformation operator as well. It’s very similar to the map operator. Except for the fact that the mapConcat operator transforms each element into zero or more elements.
Source.range(1,3)
.mapConcat(num -> Arrays.asList(num, num * 2))
.runForeach(System.out::println, system);
/* Prints:
1
2
2
4
3
6
*/
Source(1 to 3).
.mapConcat(num => List(num, num * 2))
.runForeach(println)
/* Prints:
1
2
2
4
3
6
*/
In this example, the source emits the numbers 1, 2, and 3. The mapConcat
operator creates a new List
with the value and the value times two. That would result in a List(List(1,2), (List(2,4), List(3,6))
. Then those Sub-Lists get flattened and concatenated. The resulting stream emits the numbers 1, 2, 2, 4, 3, and 6.
Understanding the mapConcat
operator is a stepping stone toward understanding the flatMapConcat
operator. Instead of mapping elements to a List
of elements, it transforms elements into Sources
of elements. The Sources
get flattened in the same way the List
got flattened in the previous example.
Emitting multiple elements from one element with flatMapConcat
The flatMapConcat
operator in Akka Streams transforms and flattens elements from the input stream into new nested streams, and concatenates them sequentially into a single output stream.
Source.range(1,3)
.flatMapConcat(num -> Source.from(Arrays.asList(num, num * 2)))
.runForeach(System.out::println, system);
/* Prints:
1
2
2
4
3
6
*/
Source(1 to 3)
.flatMapConcat(num => Source(List(num, num * 2)))
.runForeach(println)
/* Prints:
1
2
2
4
3
6
*/
The flatMapConcat
operator is applied to a Source
stream containing integers. It maps each input element to a new Source
containing the original element and its double. These nested streams are then concatenated into a single output stream. Using runForeach
, the resulting stream is processed and each element is printed to the console.
The flatMapConcat
operator is particularly useful if you need to perform a transformation on elements in a stream that results in multiple output elements per input element. In addition, you want to maintain a strict sequential order of the output elements. For example, this could be useful when expanding a stream of events into multiple related sub-events while preserving their original order for processing or analysis.
Stream enrichment with zipWith
The zipWith
operator combines elements from multiple sources, applying a user-defined combine function to each pair of elements from the input streams and passing the returned value downstream.
public static void main(String[] args) {
final ActorSystem system = ActorSystem.create("system");
List sourceTemperature = Arrays.asList(31.1, 31.4, 31.5);
List sourceHumidity = Arrays.asList(44, 48, 51);
Source<double, notused=""> temperatureSource = Source.from(sourceTemperature);
Source<integer, notused=""> humiditySource = Source.from(sourceHumidity);
temperatureSource
.zipWith(humiditySource, (temperature, humidity) -> new WeatherData(temperature, humidity.toString()))
.runForeach(System.out::println, system);
}
private static class WeatherData {
private final Double temperature;
private final String humidity;
private WeatherData(Double temperature, String humidity) {
this.temperature = temperature;
this.humidity = humidity;
}
@Override
public String toString() {
return "WeatherData{temperature=" + temperature + ", humidity='" + humidity + '}';
}
}
/* Prints:
WeatherData(31.1,44)
WeatherData(31.4,48)
WeatherData(31.5,51)
*/
</integer,></double,>
case class WeatherData(temperature: Double, fruit: String)
val sourceTemperature = Source(List(31.1, 31.4, 31.5))
val sourceHumidity = Source(List(44, 48, 51))
sourceTemperature
.zipWith(sourceHumidity) { (temperature, humidity) => WeatherData(temperature, humidity.toString) }
.runForeach(println)
/* Prints:
WeatherData(31.1,44)
WeatherData(31.4,48)
WeatherData(31.5,51)
*/
The zipWith
operator is used to combine two Source
streams, sourceTemperature
and sourceHumidity
. The operator applies a function that creates a WeatherData
case class instance for each pair of temperature and humidity values. The resulting stream contains WeatherData instances, which are then printed to the console using runForeach.
The zipWith
operator is particularly useful if you need to merge two streams of correlated data. You can merge the elements into one data object and continue further processing.
Actor integration
Akka Streams and Akka Actors are two powerful tools within the Akka toolkit, each with its unique strengths. While Akka Streams excel in handling back-pressure and building complex data processing pipelines, Akka Actors shine in managing state and handling concurrent, distributed, and fault-tolerant computing.
In this part, we will explore how to integrate actors into Akka Streams, combining their capabilities to create robust and flexible applications. Integrating actors into Akka Streams can be achieved through built-in operators that facilitate communication between streams and actors. The three primary operators for this purpose are ActorFlow.ask
, Source.queue
and Sink.actorRefWithBackpressure
.
ActorFlow.ask Operator: The ask operator allows you to send messages from a stream to an actor, and expect a response
from the actor. The operator takes a function that maps elements in the stream to messages for the actor, and an implicit Timeout parameter to specify the maximum wait time for a response. The ask operator returns a new stream containing the actor's responses.
Here's a simple example:
public static void main(String[] args) {
ActorSystem<Void> system = ActorSystem.create(PrinterMain.create(), "system");
}
record Print(String msg, ActorRef<Printed> replyTo) {}
record Printed(String msg, ActorRef<Print> from) {}
static class Printer {
static Behavior<Print> create() {
return Behaviors.setup(
context ->
Behaviors.receiveMessage(print -> {
System.out.println(print.msg);
print.replyTo.tell(new Printed(print.msg(), context.getSelf()));
return Behaviors.same();
})
);
}
}
static class PrinterMain {
static Behavior<Void> create() {
return Behaviors.setup(context -> {
ActorRef<Print> printerRef = context.spawn(Printer.create(), "printer");
Flow<String, Printed, NotUsed> askFlow = ActorFlow.ask(
printerRef,
Duration.ofSeconds(1),
Print::new);
var numbers = IntStream.rangeClosed(1, 50).boxed();
CompletionStage<Printed> done = Source.fromJavaStream(() -> numbers)
.map(Object::toString)
.via(askFlow)
.runWith(Sink.last(), context.getSystem());
done.thenRun(context.getSystem()::terminate);
return Behaviors.same();
});
}
}
/* Prints:
1
2
...
49
50
*/
object Printer {
final case class Print(msg: String, replyTo: ActorRef[Printed])
final case class Printed(msg: String, from: ActorRef[Print])
def apply(): Behavior[Print] = Behaviors.receive { (context, print) =>
println(print.msg)
print.replyTo ! Printed(print.msg, context.self)
Behaviors.same
}
}
object PrinterMain {
def apply(): Behavior[Void] =
Behaviors.setup { context =>
val greeterRef = context.spawn(Printer(), "printer")
implicit val timeout: Timeout = 1.second
val askFlow: Flow[String, Printed, NotUsed] = ActorFlow.ask(greeterRef)(Print.apply)
implicit val system = context.system
val done = Source(1 to 50)
.map(_.toString)
.via(askFlow)
.map(_.msg)
.runWith(Sink.seq)
implicit val ex = context.executionContext
done.onComplete(done => system.terminate())
Behaviors.same
}
}
object Main extends App {
val system = ActorSystem(PrinterMain(), "system")
}
/* Prints:
1
2
...
49
50
*/
In this example, we create an actor Printer
Actor that receives a Print
message, logs the data, and returns a Printed
message. The ask operator is used to send messages to this actor and receive responses. The resulting stream contains the processed elements, which are then printed to the console.
Source.queue Operator: This queue operator is useful when you want to start a stream and send elements at a later point in time. You can offer elements to the queue and they will be emitted to the stream if there is demand from downstream, otherwise, they will be buffered until the request for demand is received.
public static void main(String[] args) {
ActorSystem system = ActorSystem.create(Behaviors.empty(), "system");
int bufferSize = 20;
int elementsToProcess = 5;
SourceQueueWithComplete queue = Source.queue(bufferSize, OverflowStrategy.backpressure())
.throttle(elementsToProcess, Duration.ofSeconds(3))
.map(x -> x * x)
.toMat(Sink.foreach(x -> System.out.println("completed " + x)), Keep.left())
.run(system);
Source.fromJavaStream(() -> IntStream.rangeClosed(1, 10).boxed())
.mapAsync(1, x -> queue.offer(x)
.thenApply(result -> {
if (result instanceof QueueOfferResult.Enqueued$) {
System.out.println("enqueued " + x);
} else if (result instanceof QueueOfferResult.Dropped$) {
System.out.println("dropped " + x);
} else if (result instanceof QueueOfferResult.Failure) {
System.out.println("Offer failed " + ((QueueOfferResult.Failure) result).cause().getMessage());
} else if (result instanceof QueueOfferResult.QueueClosed$) {
System.out.println("Source Queue closed");
}
return NotUsed.getInstance();
}))
.runWith(Sink.ignore(), system);
}
/* Prints:
enqueued 1
enqueued 2
enqueued 3
enqueued 4
enqueued 5
enqueued 6
enqueued 7
enqueued 8
enqueued 9
enqueued 10
completed 1
completed 4
completed 9
completed 16
completed 25
completed 36
completed 49
completed 64
completed 81
completed 100
*/
object Main extends App {
implicit val system = ActorSystem(Behaviors.empty, "system")
implicit val executionContext = system.executionContext
val bufferSize = 20
val elementsToProcess = 5
val queue = Source
.queue[Int](bufferSize)
.throttle(elementsToProcess, 3.second)
.map(x => x * x)
.toMat(Sink.foreach(x => println(s"completed $x")))(Keep.left)
.run()
val source = Source(1 to 10)
source.map(x => {
queue.offer(x) match {
case QueueOfferResult.Enqueued => println(s"enqueued $x")
case QueueOfferResult.Dropped => println(s"dropped $x")
case QueueOfferResult.Failure(ex) => println(s"Offer failed ${ex.getMessage}")
case QueueOfferResult.QueueClosed => println("Source Queue closed")
}
})
.runWith(Sink.ignore)
}
/* Prints:
enqueued 1
enqueued 2
enqueued 3
enqueued 4
enqueued 5
enqueued 6
enqueued 7
enqueued 8
enqueued 9
enqueued 10
completed 1
completed 4
completed 9
completed 16
completed 25
completed 36
completed 49
completed 64
completed 81
completed 100
*/
A Source queue of Integers is created with a buffer size of 20. The throttle operator is used to control the rate of the stream. It allows the processing of 5 elements every 3 seconds.
Each element is offered to the queue and the result of the offer is handled accordingly:
- Enqueued: The element was successfully added to the queue
- Dropped: The element could not be added because the queue was full
- Failure: An error occurred when trying to add the element
- QueueClosed: The queue was closed when trying to add the element
ActorSink.actorRefWithBackpressure Operator: This actorRefWithBackpressure
sends elements from a Source to a Sink, which is connected to a given ActorRef
using backpressure. This pushes elements downstream if the actor signals a demand. Therefore, there won't be a message overflow.
enum Ack {
INSTANCE;
}
interface Protocol {}
record Init(ActorRef ack) implements Protocol {}
record Message(ActorRef ackTo, String msg) implements Protocol {}
record Complete() implements Protocol {}
record Fail(Throwable ex) implements Protocol {}
final ActorRef actorRef = // spawned actor
final Complete completeMessage = new Complete();
final Sink<string, notused=""> sink =
ActorSink.actorRefWithBackpressure(
actorRef,
(responseActorRef, element) -> new Message(responseActorRef, element),
(responseActorRef) -> new Init(responseActorRef),
Ack.INSTANCE,
completeMessage,
(exception) -> new Fail(exception));
Source.single("msg1").runWith(sink, system);
</string,>
trait Ack
object Ack extends Ack
trait Protocol
case class Init(ackTo: ActorRef[Ack]) extends Protocol
case class Message(ackTo: ActorRef[Ack], msg: String) extends Protocol
case object Complete extends Protocol
case class Fail(ex: Throwable) extends Protocol
val actor: ActorRef[Protocol] = targetActor()
val sink: Sink[String, NotUsed] = ActorSink.actorRefWithBackpressure(
ref = actor,
messageAdapter = (responseActorRef: ActorRef[Ack], element) => Message(responseActorRef, element),
onInitMessage = (responseActorRef: ActorRef[Ack]) => Init(responseActorRef),
ackMessage = Ack,
onCompleteMessage = Complete,
onFailureMessage = (exception) => Fail(exception))
Source.single("msg1").runWith(sink)
The Protocol
trait defines the messages which are used to communicate between the stream and the actor.
- Init: A message sent when the stream starts, containing a reference to the actor that will receive the acknowledgment messages.
- Message: A message containing the actual data element from the stream and a reference to the actor that will receive the acknowledgment messages.
- Complete: A message sent when the stream is successfully completed.
- Fail: A message sent when the stream encounters a failure, containing the exception.
Then one creates an actor by calling a targetActor()
function (which is not defined in the code snippet provided).
Define a Sink
named sink using the ActorSink.actorRefWithBackpressure
method.
ref
The actor reference to which the stream will send messages.messageAdapter
A function that converts the stream element into a Message with the appropriate acknowledgment actor reference.onInitMessage
A function that creates theInit
message containing the acknowledgment actor reference.ackMessage
The acknowledgment message that the actor sends back to the stream after processing an element.onCompleteMessage
The message to send when the stream is successfully completed.onFailureMessage
A function that converts a stream failure (exception) into a Fail message.
Finally, we create a Source
with a single element "msg1" and connect it to the sink. This source will send the "msg1" string to the actor through the sink.
When the stream is executed, it will send an Init
message to the actor, followed by a Message containing the "msg1" element. The actor will process the message and send back an Ack
message to the stream, indicating that it is ready to receive the next message. In this case, since there is only one element in the source, the stream will complete and send a Complete
message to the actor. If an error occurs during the stream processing, the Fail
message with the exception will be sent to the actor.
A great read on Actor Integration with Akka Streams is also this article from RockTheJvm.
Akka Alpakka - Simple Integration to External Services
Akka Alpakka is a powerful library that extends the capabilities of Akka Streams, enabling seamless integration with various external systems and technologies. Such as databases, message brokers, cloud storage providers, and more. By providing a diverse set of connectors, Alpakka simplifies the process of exchanging data between Akka Streams applications and other platforms.
Side note: Akka Alpakka embodies the notion of adaptability and compatibility, as alpacas are known for their ability to thrive in diverse environments.
The following three connectors are very popular. I will shortly describe them and name the benefits of using Akka Alpakka afterward.
- Apache Kafka Connector: This connector enables seamless integration with the widely-used Apache Kafka distributed streaming platform, simplifying the process of producing and consuming messages in Akka Streams applications.
- Apache Cassandra Connector: The Cassandra Connector empowers Akka Streams applications to interact with Apache Cassandra, a highly scalable and distributed NoSQL database. It enables efficient and reliable data storage and retrieval through CassandraSessions.
- AWS S3 Connector: The S3 Connector streamlines the storing and downloading of files from and to AWS. Additionally, it includes utility functions for bucket management such as creating, listing, and deleting them.
Why Akka Alpakka is Invaluable to Developers: Akka Alpakka's pre-built connectors save developers countless hours of effort by eliminating the need to create custom solutions for data exchange between Akka Streams applications and external systems. With Alpakka, developers can focus on their core business logic, knowing that the library's connectors are designed with robustness, performance, and scalability in mind. Furthermore, the connectors are built to adhere to the Reactive Streams specification, ensuring seamless integration with Akka Streams' back-pressure mechanism, and providing an end-to-end reactive pipeline.
TestKit for easy testing
The Akka Streams TestKit provides a powerful and flexible way to test your Akka Stream applications. With its suite of built-in tools and utilities, the TestKit makes it easy to verify the correctness and performance of your stream components. It comes with two main components that are TestSource and TestSink which provide sources and sinks that materialize to probes that allow fluent API.
Source<Integer, NotUsed> sourceUnderTest = Source.fromJavaStream(() -> IntStream.rangeClosed(1, 4))
.filter(num -> num % 2 == 0)
.map(num -> num * 2);
sourceUnderTest.runWith(TestSink.probe(testKit.system()), testKit.system())
.request(2)
.expectNext(4, 8)
.expectComplete();
val sourceUnderTest = Source(1 to 4).filter(_ % 2 == 0).map(_ * 2)
sourceUnderTest.runWith(TestSink[Int]()).request(2).expectNext(4, 8).expectComplete()
This code snippet tests a Source that takes a sequence of integers from 1 to 4, filters out odd numbers, and multiplies the even numbers by 2.
Sink<Integer, NotUsed> sinkUnderTest = Sink.cancelled();
TestSource.<Integer>probe(testKit.system())
.toMat(sinkUnderTest, Keep.left())
.run(testKit.system())
.expectCancellation();
val sinkUnderTest = Sink.cancelled
TestSource[Int]().toMat(sinkUnderTest)(Keep.left).run().expectCancellation()
This code snippet tests the behavior of a Sink
that cancels the stream immediately upon materialization.
As you can see, each component can be tested separately. One is able to break a stream into multiple flows and unit test each of them individually. This is incredibly useful for ensuring that your application is running fine and simplifies the creation of complex tests.
Because of the reusability, you can create your stream components once and reuse them any number of times. And because of the composability, one can create arbitrary complex data pipelines by concatenating individual components. A Stream component is independent, if it runs in one place it will run everywhere.
Conclusion
In this blog, I've introduced you to the tools that will allow you to quickly become productive. From its set of built-in operators to integrating with external services using Akka Alpakka, combining your Streams with Akka Actors and testing them with the TestKit. As you continue to delve into the world of Akka Streams, you will discover that its ability to manage back pressure, provide fault tolerance, and enable distributed processing makes it an invaluable asset for building scalable and efficient data-driven applications. By following the tips, tricks, and best practices outlined in this article, you will be well on your way to mastering Akka Streams and leveraging its full potential to develop high-performance, resilient applications. Now it's your turn to start experimenting with Akka Streams today and watch your productivity soar!