Saga patterns in Akka 3 (part 2) - read models
In Part 1 of this series, "Saga patterns in Akka - choreography," I showed how easy it is to build event-driven Saga choreography with Akka components. In this part, we continue our journey with the Saga pattern in Akka - read models.
Previously, our implementation has been left with a hardcoded showId
value. If you are used to CRUD-style applications with ORM frameworks like Hibernate, Spring Data, or similar, you might be surprised by the fact that you can only access Akka Entity by entity ID. It doesn’t matter if it's an Event Sourced Entity or a Key Value Entity (which emulates a CRUD-style approach).
Bridging the gap with CQRS
What may look like a limitation at first glance is actually a very conscious decision to promote a CQRS pattern in Akka from the very beginning. From a domain code perspective, it’s very healthy to have a separate model for your writes and a separate model for reads, while for Akka, CQRS allows for a very flexible scaling of our application under the hood. Different strategies can be used to scale the write and the read side of the system. Similar to the previous blog post, CQRS may sound like a lot of extra work, but with Akka components, it’s just a matter of connecting the dots.
Just to recap, we ended with this implementation:
public Effect confirmReservation(WalletCharged walletCharged) {
String reservationId = walletCharged.reservationId();
String showId = "show1";
return effects().asyncDone(confirmReservation(showId, reservationId));
}
public Effect cancelReservation(WalletChargeRejected walletChargeRejected) {
String reservationId = walletChargeRejected.reservationId();
String showId = "show1";
return effects().asyncDone(cancelReservation(showId, reservationId));
}
One of the solutions could be to extend the Wallet
events model with an additional showId
field, similar to reservationId
. This is tempting but from a system architecture perspective, it’s an implicit cyclic dependency between two separate domains. The Show
domain knows about Wallet
domain and vice versa. Changing one will affect the other. Cyclic dependencies should be avoided whenever possible. If we want to build a more generic Wallet
domain, then even a reservationId
is a cyclic dependency. Charging the wallet for something other than reservation, like groceries, would not have a reservationId
. A better solution would be to rename reservationId
to an expenseId
and map this value in the action implementation.
public Effect<String> confirmReservation(WalletCharged walletCharged) {
String reservationId = walletCharged.expenseId();
String showId = "show1";
return effects().forward(confirmReservation(showId, reservationId));
}
Now, the Wallet
domain is not aware of any reservation system and could be used for other purposes. If you are interested in learning more, please follow the Context Mapping topic from DDD.
Back to the main task: Since we only have an expenseId
(a.k.a reservationId
), a mapping from reservationId
to showId
is required to implement our Saga correctly. You’re working with Event Sourced Entities, so it’s very natural to build a read model based on the events stream. The Show
and ShowByReservation
are your write and read models (respectively) from the CQRS perspective.
View as a read model
A View component is the most natural way to achieve full CQRS in the Akka ecosystem. Each View
is optimized for a specific query (or queries). We can retrieve our data with SQL-like query syntax. It’s worth mentioning that a View can be updated using events emitted from an Event Sourced Entity or changes updates from a Key Value Entity. When it comes to the implementation, it’s just a matter of adding some annotations, event handlers, and exposing the query via an HTTP endpoint:
@ComponentId("show-by-reservation-view")
public class ShowByReservationView extends View {
@Query("SELECT * FROM show_by_reservation WHERE :reservationId = ANY(reservationIds)")
public QueryEffect<ShowByReservation> getShow(String reservationId) {
return queryResult();
}
@Consume.FromEventSourcedEntity(value = ShowEntity.class)
public static class ShowByReservationUpdater extends TableUpdater<ShowByReservation> {
public Effect<ShowByReservation> onEvent(ShowEvent showEvent) {
return switch (showEvent) {
case ShowCreated created ->
effects().updateRow(new ShowByReservation(created.showId(), new ArrayList<>()));
case SeatReserved reserved ->
effects().updateRow(rowState().add(reserved.reservationId()));
case SeatReservationPaid paid ->
effects().updateRow(rowState().remove(paid.reservationId()));
case SeatReservationCancelled cancelled ->
effects().updateRow(rowState().remove(cancelled.reservationId()));
};
}
}
}
The getShow
returns queryResult()
which is only a placeholder to associate a Query
annotation with View method. The actual implementation is done by the Akka engine. Let's have a detailed look at the ShowByReservation
implementation, which basically keeps the reservationIds
list up to date.
public record ShowByReservation(String showId, List reservationIds) {
public ShowByReservation(String showId, String reservationId) {
this(showId, new ArrayList<>());
reservationIds.add(reservationId);
}
public ShowByReservation add(String reservationId) {
if (!reservationIds.contains(reservationId)) {
reservationIds.add(reservationId);
}
return this;
}
public ShowByReservation remove(String reservationId) {
reservationIds.remove(reservationId);
return this;
}
}
Instead of simply mapping one showId
to onereservationId
, we have one showId
with many reservationIds
. That’s because there can be only one entry for a single entity ID in a view model. A collection like Java List
will materialize itself as an SQL-like array, which is why we use the ANY
operator in the query definition.
It’s a very specific view model, which may be difficult to understand at first glance. Keep in mind that there can only be one view entry for a single entity ID. However, if you're not comfortable with this solution or are concerned about query performance with larger collections (e.g., a show with 1000+ seats), I suggest a different approach.
Key Value Entity as a read model
The real power of Akka is the flexibility of how you can combine existing components. The View
component is a default candidate for a read model, but not the only one. A Key Value Entity could also be used as a read model. The domain part is much simpler:
record Reservation(String reservationId, String showId) {
}
After wrapping it with the KeyValueEntity
component:
@ComponentId("reservation")
public class ReservationEntity extends KeyValueEntity {
public Effect create(String showId) {
String reservationId = commandContext().entityId();
return effects().updateState(new Reservation(reservationId, showId)).thenReply(done());
}
public Effect get() {
if (currentState() == null) {
return effects().error("reservation not found");
} else {
return effects().reply(currentState());
}
}
public Effect delete() {
return effects().deleteEntity().thenReply(done());
}
}
The only missing part is the transformation of the Show
events into Reservation
:
@ComponentId("show-events-to-reservation-consumer")
@Consume.FromEventSourcedEntity(value = ShowEntity.class, ignoreUnknown = true)
public class ShowEventsToReservationConsumer extends Consumer {
public Effect onEvent(SeatReserved reserved) {
return effects().asyncDone(createReservation(reserved.reservationId(), reserved.showId()));
}
public Effect onEvent(SeatReservationPaid paid) {
return effects().asyncDone(deleteReservation(paid.reservationId()));
}
public Effect onEvent(SeatReservationCancelled cancelled) {
return effects().asyncDone(deleteReservation(cancelled.reservationId()));
}
}
Although it looks like more code, the solution is more explicit about what we want to achieve.
To finish our confirmation (or cancellation) Saga, we can use both solutions in the same way:
@Consume.FromEventSourcedEntity(value = WalletEntity.class, ignoreUnknown = true)
public class CompleteReservation extends Consumer {
public Effect confirmReservation(WalletCharged walletCharged) {
String reservationId = walletCharged.expenseId();
return effects().asyncDone(
getShowIdBy(reservationId)
.thenCompose(showId -> confirmReservation(showId, reservationId))
);
}
//Key Value Entity as a read model
private CompletionStage getShowIdBy(String reservationId) {
return componentClient.forKeyValueEntity(reservationId)
.method(ReservationEntity::get)
.invokeAsync()
.thenApply(Reservation::showId);
}
//View as a read model
private CompletionStage getShowIdBy2(String reservationId) {
return componentClient.forView()
.method(ShowByReservationView::getShow)
.invokeAsync(reservationId)
.thenApply(ShowByReservation::showId);
}
}
Summary
Today we introduced two more Akka components: the View
and the KeyValueEntity
. I hope you liked the flexibility of connecting different puzzle pieces together to deliver a complete asynchronous CQRS implementation.
The third post in this series will cover deduplication. All Akka Consumers (or TableUpdaters), in addition to preserving the order of events (per aggregate), guarantee at-least-once delivery semantics. This means that we should be ready for event/message duplicates in our Saga choreography. Check out part2 tag from the source code.
Posts by this author