Saga patterns in Akka (part 3) - exactly-once delivery with deduplication
In part 2, we explored how read models support CQRS in Akka by separating write and read concerns, enabling more scalable and efficient data access. But distributed systems come with other fundamental challenges beyond just organizing data access—especially when it comes to handling duplicate messages. Let’s dive into this challenge and explore how Akka helps address it.
A good metaphor is worth a thousand words—better yet if it's funny. This famous tweet by Mathias Verraes is one of my favorites:
It's hard to find a better phrase to describe the fundamental challenges in a world of distributed systems.
The good news is that Akka solves one of those challenges—namely, the order of messages. As long as the source of your consumer is an Akka component like an Event Sourced Entity or a Key Value Entity, the order of events or state changes is preserved (similar to Apache Kafka partitions).
The second challenge—exactly-once delivery—cannot, in most cases, be generalized and solved by a one-size-fits-all implementation. The solution is usually a mix of business requirements and possible technical tricks in a given context. In practice, what we aim for is "effectively-once-delivery"—a pragmatic approach that ensures the end result is as if the operation happened only once, even if messages are retried or duplicated.
Where should I apply this solution? Anywhere there is a chance of duplication that could lead to an incorrect state—for example, an HTTP endpoint, an events/messages subscription, etc. Each situation is different, and each time, you can use a different strategy.
Let’s continue with the third part of this blog series and examine how we can achieve effectively-once delivery in our Akka demo application.
Idempotent updates
The easiest way to achieve exactly-once delivery semantics is to have idempotent updates. We already did this when we worked on ReservationEntity
(acting as a read model):
public class ReservationEntity extends KeyValueEntity {
public Effect create(String showId) {
String reservationId = commandContext().entityId();
return effects().updateState(new Reservation(reservationId, showId)).thenReply(done());
}
public Effect delete() {
return effects().deleteEntity().thenReply(done());
}
}
Running the same request more than once will not change anything. The outcome will be exactly the same. There's nothing to do here, so you can move on to the next case.
To achieve idempotent updates in the alternative read model approach, based on a View component (mentioned in part 2 - read models), could be improved by a simple change from a List
collection to a Set
:
public record ShowByReservation(String showId, Set reservationIds) {
public ShowByReservation(String showId) {
this(showId, new HashSet<>());
}
public ShowByReservation add(String reservationId) {
reservationIds.add(reservationId);
return this;
}
public ShowByReservation remove(String reservationId) {
reservationIds.remove(reservationId);
return this;
}
}
After that, updates are idempotent. Since the release of SDK 3.2.2, this fix is not mandatory because Akka Views have a built-in deduplication mechanism.
Show Entity deduplication
We need to protect ShowEntity
endpoints against duplicated requests. The thing is, for some duplicated requests, you may want to return an error (e.g., creating the same Show
), but for others, like reservation cancellation or confirmation, you may prefer to accept the request and make it idempotent. This way, the Consumer will continue to work in case of redeliveries or restarts.
To take it a step further, let’s assume you need to distinguish the situation where the record CancelSeatReservation(String reservationId)
is a duplicate and where it's an invalid command with a wrong reservationId
. For this, you need an additional finishedReservations
collection in the Show
domain class, which is the state of the ShowEntity
. It keeps not only all historical reservation IDs but also a reservation status (confirmed or canceled).
private Or<ShowCommandError, ShowEvent> handleCancellation(CancelSeatReservation cancelSeatReservation) {
String reservationId = cancelSeatReservation.reservationId();
return pendingReservations.get(reservationId).fold(
/*no reservation*/
() -> {
if (finishedReservations.get(reservationId).exists(FinishedReservation::isCancelled)) {
return left(DUPLICATED_COMMAND);
} else {
return left(RESERVATION_NOT_FOUND);
}
},
/*matching reservation*/
seatNumber -> //processed as before
}
The handleConfirmation
requires very similar modifications. I’m skipping this part, but you can examine the code yourself here.
Reusing an existing business field for deduplication requires extra caution. You need to block a reservation of different seats with the same reservationId
. Thus, we also need to check for duplicates in the handleReservation
by scanning two collections: pendingReservations
and finishedReservations
.
private boolean isDuplicate(String reservationId) {
return pendingReservations.containsKey(reservationId) ||
finishedReservations.get(reservationId).isDefined();
}
private Or<ShowCommandError, ShowEvent> handleReservation(ReserveSeat reserveSeat) {
int seatNumber = reserveSeat.seatNumber();
if (isDuplicate(reserveSeat.reservationId())) {
return left(DUPLICATED_COMMAND);
} else {
//continue with the reservation
}
}
From the domain perspective, duplication is like any other ShowCommandError
, while from the ShowEntity
layer, this could be translated into a positive response.
public class ShowEntity extends EventSourcedEntity<Show, ShowEvent> {
private Effect errorEffect(ShowCommandError error, ShowCommand showCommand){
if (error == DUPLICATED_COMMAND) {
return effects().reply(done());
} else {
return effects().error(error.name());
}
}
public Effect cancelReservation(CancelSeatReservation cancelSeatReservation) {
if (currentState() == null) {
return effects().error("show not exists");
} else {
return switch (currentState().process(cancelSeatReservation)) {
case Or.Left(var error) -> errorEffect(error, cancelSeatReservation);
case Or.Right(var event) -> persistEffect(event);
};
}
}
}
Wallet Entity deduplication
In the previous approach, we reused existing data (reservationId
) for deduplication. Even for this simple example, it's easy to forget about some scenarios and introduce a bug. Extending the commands/events model with a dedicated field, just for the purpose of deduplication, is (in most cases) a better option. This way we can apply a more generic deduplication mechanism and you are more resistant to domain changes.
For example, any wallet command that requires deduplication could extend the RequiresDeduplicationCommand
interface with an additional commandId
field.
public sealed interface WalletCommand {
sealed interface RequiresDeduplicationCommand extends WalletCommand {
String commandId();
}
record CreateWallet(String walletId, BigDecimal initialBalance) implements WalletCommand {
}
record ChargeWallet(BigDecimal amount, String reservationId,
String commandId) implements RequiresDeduplicationCommand {
}
record DepositFunds(BigDecimal amount, String commandId) implements RequiresDeduplicationCommand {
}
}
Based on this field, you can deduplicate the command before it hits the domain logic:
public Or<WalletCommandError, WalletEvent> process(WalletCommand command) {
if (isDuplicate(command)) {
return left(WalletCommandError.DUPLICATED_COMMAND);
} else {
return switch (command) {
case CreateWallet create -> handleCreate(create);
case ChargeWallet charge -> ifExists(() -> handleCharge(charge));
case DepositFunds deposit -> ifExists(() -> handleDeposit(deposit));
};
}
}
Proper deduplication requires persisting the state update with the deduplication data in one transaction. For an Event Sourced entity like WalletEntity
, this means that you need to store this information in the event payload. The recovered state (from events) must contain all used commandIds
. This brings us to a very specific problem: for long-lived entities with many potential commands, you can’t deduplicate all of them without running into time and/or space constraints. Based on the expected load, you can calculate how many command IDs you can keep in memory (per entity). Next, you might use more sophisticated logic and data structures than just a List
to keep space (e.g., last 1000 ids) and/or time (e.g., last 2 hours) boundaries.
Although a separate field for deduplication is a much cleaner solution, it has a drawback: What should its value be? The most common choice is some sort of UUID implementation. In most cases, this is fine, but if you want to fine-tune the solution, you could use a hash function that produces a shorter hash (less space). The most important aspect is that this value must be the same for the same payload. For HTTP requests, you can’t control this, but for our Saga stages, we are in charge of the payloads.
The UUID is generated from a combination of the event subject and sequence number will be unique and consistent. This way we get the same commandId
each time.
@Consume.FromEventSourcedEntity(value = ShowEntity.class, ignoreUnknown = true)
public class ChargeForReservation extends Consumer {
public Effect charge(SeatReserved seatReserved) {
String expenseId = seatReserved.reservationId();
String entityId = messageContext().eventSubject().get();
Long sequenceNum = messageContext().metadata().asCloudEvent().sequence().get();
String commandId = UUID.nameUUIDFromBytes((entityId + sequenceNum).getBytes()).toString();
var chargeWallet = new ChargeWallet(seatReserved.price(), expenseId, commandId);
var walletId = seatReserved.walletId();
return effects().asyncDone(componentClient.forEventSourcedEntity(walletId)
.method(WalletEntity::charge)
.invokeAsync(chargeWallet)
.thenApply(Response::toDone));
}
More about the deterministic hashing technique (and others) can be found in the official documentation.
Summary
I hope this part clarifies how to achieve exactly-once delivery semantics in Akka. Technically speaking, it’s not possible in a distributed world, but with at-least-once delivery and proper deduplication strategy, we can achieve effectively-once delivery. The main takeaway is that there is no silver bullet; each situation is different. However, having a rich toolbox helps a lot because you can tailor the solution to a given context.
In the next part of this series, we will focus on error handling—specifically, how to make our Saga process bulletproof. We will also talk about one of the biggest challenges of the Saga pattern: error handling and compensation. Check out the part3 tag from the source code.
Posts by this author