Akka Streams is one of the foundational components at the core of Akka, and the most notable new feature in Streams for the 23.05 release is the introduction of a new operator, mapAsyncPartitioned
. This operator improves on the performance of the existing mapAsync
operator in situations where mapAsync
“over-parallelizes”: as a consequence, it is more able to squeeze out the maximum amount of parallelism. This in turn can lead to systems that accomplish more overall while reducing resource requirements. Developers have long been choosing Akka for applications which need to accomplish more with less; this blog dives into one way this release can help applications accomplish even more.
Akka users often use mapAsync
in streams which need to interact with the “world outside the stream”: Colin Breck has written about the concurrency-control benefits of mapAsync in situations where events from an incoming stream are delivered to actors for stateful processing. Another use case for such a stream interaction is updating an external system or database in response to state changes as a key part of implementing Command/Query Responsibility Segregation. The ability to perform these interactions asynchronously and in parallel while preserving flow control is a distinguishing aspect of Akka Streams in comparison to frameworks like Flink or Kafka Streams.
An example
One might write (in Java) the following stream using Akka 2.6, to consume messages from Kafka and deliver them to actors which are distributed across a cluster using Akka Cluster Sharding:
RestartSource.onFailuresWithBackoff(
restartSettings,
() ->
Consumer.committableSource(consumerSettings, subscription)
.mapAsync(
askParallelism,
kafkaMessage -> {
var entityId = kafkaMessage.record().key();
var content = kafkaMessage.record().value();
var offset = kafkaMessage.committableOffset();
return sharding.entityRefFor(entityTypeKey, entityId)
.ask(
replyTo ->
new MyActor.MessageFromKafka(content, replyTo),
Duration.ofMillis(500)
).thenApply(done -> offset);
}
)
.via(Committer.flow(committerSettings))
)
The askParallelism
allows us to limit the total number of asks which are in-flight, though there are situations when (in order to preserve the overall order of the stream), the actual number of active asks may be reduced to as few as one.
However, askParallelism
is not a magic “go faster” dial, as can be seen in a benchmark application (the code snippet above is taken from that benchmark and lightly adapted for clarity):
askParallelism | Average throughput (messages per second) while completely processing 1 million messages |
---|---|
100 | 755 |
200 | 1168 |
400 | 1834 |
800 | 2540 |
1000 | 2457 |
1200 | 2596 |
1400 | 2515 |
1600 | 2494 |
1700 | 2632 |
1800 | 2610 |
1900 | 2681 |
2000 | 2058 |
2600 | 820 |
3200 | 218 |
The reason for this is that it’s possible for stream elements A and B to contend with one another in their processing so that one of them being in-flight at the same time as the other will delay the other’s completion. In this particular example, the major contention occurs because the messages in the respective asks are to the same sharded entity and queue up in the actor’s mailbox; similar dynamics though can be observed when ACID transactions conflict and need to be retried when using a SQL database, for example. Because mapAsync
starts processing an element as soon as it is received, increasing askParallelism
in this benchmark increases the chance that multiple elements directed for the same entity are in-flight and contending with each other. With a sufficient number of contending elements, an ask will time out, which will fail the stage and the RestartSource
will exponentially back-off to prevent overload.
In any system where efficient resource usage is a priority, there is a hard maximum level of realizable parallelism, but it’s more likely that the actual throughput-maximizing level of parallelism depends on keeping the level of contention within a window of level-of-parallelism elements within some limit. In most “real-world” streams, this level will vary over time when processing the stream, which complicates setting a constant value for parallelism in a mapAsync
stage.
Introducing mapAsyncPartitioned
If we are using Akka 2.8 (part of the Akka 23.05 release), we can rewrite our code snippet to:
Function<CommittableMessage<String, byte[]> entityRefExtractor =
kafkaMessage -> {
var entityId = kafkaMessage.record().key();
return sharding.entityRefFor(entityTypeKey, entityId)
}
RestartSource.onFailuresWithBackoff(
restartSettings,
() ->
Consumer.committableSource(consumerSettings, subscription)
.mapAsyncPartitioned(
askParallelism, // overall maximum number of asks
// in-flight (across all entities)
2, // maximum asks to a particular entity
entityRefExtractor, // assigns incoming messages to entities
(kafkaMessage, entityRef) -> {
var content = kafkaMessage.record().value();
var offset = kafkaMessage.committableOffset();
return entityRef.ask(
replyTo -> new MyActor.MessageFromKafka(content, replyTo),
Duration.ofMillis(500)
).thenApply(done -> offset);
}
).via(Committer.flow(committerSettings)
)
What does this change mean? Well, we can see that we’ve extracted sharding.entityRefFor
and its dependency into a lambda expression and are passing that to mapAsyncPartitioned
as entityRefExtractor
. The mapAsyncPartitioned
stage will then pass the extracted entityRef
to the asking function alongside the kafkaMessage
that we passed. Between entityRefExtractor
and our new asking function, we have the same functionality as in our previous asking function.
But what about the 2? Here is the big difference between mapAsync
and mapAsyncPartitioned
: we might not do all of the work of that previous asking function all at once. The part that we now do in entityRefExtractor
is performed as soon as the stage receives an element from upstream. The entityRef
extracted is then used to assign the stream elements into a queue, with one queue for each individual entityRef
. Only when the element is one of the 2 elements at the head of the queue for that entityRef
is the new asking function executed, which actually performs the ask.
We can now describe the parallelism in this stage as “at most askParallelism
asks will be in flight, but no more than 2 asks will be in flight for any entityRef
”. So if askParallelism were 1000, but 1000 consecutive elements in the stream were for the same entityRef, the effective parallelism limit would be 2: the stream is reactively adapting the parallelism limit to the elements within itself at a particular moment.
Changing the benchmark only to use mapAsyncPartitioned
(with the mapAsync
results from above included for comparison):
askParallelism | Average throughput (mapAsync) | Average throughput (mapAsyncPartitioned) |
---|---|---|
100 | 755 | 750 |
200 | 1168 | 1192 |
400 | 1834 | 1748 |
800 | 2540 | 2528 |
1000 | 2457 | 2605 |
1200 | 2596 | 2628 |
1400 | 2515 | 2573 |
1600 | 2494 | 2809 |
1700 | 2632 | 2575 |
1800 | 2610 | 2696 |
1900 | 2681 | 2560 |
2000 | 2058 | 2640 |
2600 | 820 | 2406 |
3200 | 218 | 2527 |
At the lower levels of parallelism, we can see the effect of the extra overhead in mapAsyncPartitioned
: it does have to track the buffers for each partition. The shape of the graph is noticeably different, however: beyond a maximum parallelism of 800, throughput stays in a fairly narrow, predictable band. Since the benchmark only has 1000 entities, the realized parallelism is never actually ever greater than 2000: askParallelism
values greater than 2000 only cause the stage to take in more elements and give it more ability to extract parallelism when the elements in the stream aren’t particularly diverse.
Using mapAsyncPartitioned
mapAsyncPartitioned
is designed to be something of a drop-in replacement for mapAsync
. You’ll need a partitioning function: without one, there’s unlikely to be any benefit. For cluster sharding, the entity ID (or EntityRef
if using Akka Typed) is likely to be a reasonable choice, though the shard ID is also viable. In an Akka Projection, the persistence ID of each event or state update is likewise probably a reasonable default choice.
If you have a workable partitioning function, choosing parallelism limits is somewhat a matter of experimentation. For an ask, a per-partition limit of 1 or 2 (which is similar to the default parallelism for the ask operator in Akka Streams) is a reasonable default choice.
If your stream’s (or better, if your system’s) optimal throughput is reached with a low parallelism on mapAsync
, then it’s unlikely that mapAsyncPartitioned
will be worth its extra overhead; of course, if the reason the optimal throughput requires low parallelism is bunching of stream elements that contend, then that is exactly the sort of situation mapAsyncPartitioned
is intended to help.
mapAsyncPartitioned
is new in the Akka 23.05 release, as part of akka-stream
version 2.8.0 and later.