Saga patterns in Akka 3 (part 2) - read models

5 minute read

In Part 1 of this series, "Saga patterns in Akka - choreography," I showed how easy it is to build event-driven Saga choreography with Akka components. In this part, we continue our journey with the Saga pattern in Akka - read models.

Previously, our implementation has been left with a hardcoded showId value. If you are used to CRUD-style applications with ORM frameworks like Hibernate, Spring Data, or similar, you might be surprised by the fact that you can only access Akka Entity by entity ID. It doesn’t matter if it's an Event Sourced Entity or a Key Value Entity (which emulates a CRUD-style approach).

Bridging the gap with CQRS

What may look like a limitation at first glance is actually a very conscious decision to promote a CQRS pattern in Akka from the very beginning. From a domain code perspective, it’s very healthy to have a separate model for your writes and a separate model for reads, while for Akka, CQRS allows for a very flexible scaling of our application under the hood. Different strategies can be used to scale the write and the read side of the system. Similar to the previous blog post, CQRS may sound like a lot of extra work, but with Akka components, it’s just a matter of connecting the dots.

Just to recap, we ended with this implementation:

public Effect confirmReservation(WalletCharged walletCharged) {

    String reservationId = walletCharged.reservationId();
    String showId = "show1";

    return effects().asyncDone(confirmReservation(showId, reservationId));
  }

  public Effect cancelReservation(WalletChargeRejected walletChargeRejected) {

    String reservationId = walletChargeRejected.reservationId();
    String showId = "show1";

    return effects().asyncDone(cancelReservation(showId, reservationId));
  }

One of the solutions could be to extend the Wallet events model with an additional showId field, similar to reservationId. This is tempting but from a system architecture perspective, it’s an implicit cyclic dependency between two separate domains. The Show domain knows about Wallet domain and vice versa. Changing one will affect the other. Cyclic dependencies should be avoided whenever possible. If we want to build a more generic Wallet domain, then even a reservationId is a cyclic dependency. Charging the wallet for something other than reservation, like groceries, would not have a reservationId. A better solution would be to rename reservationId to an expenseId and map this value in the action implementation. 

public Effect<String> confirmReservation(WalletCharged walletCharged) {

    String reservationId = walletCharged.expenseId();
    String showId = "show1";

    return effects().forward(confirmReservation(showId, reservationId));
  }

Now, the Wallet domain is not aware of any reservation system and could be used for other purposes. If you are interested in learning more, please follow the Context Mapping topic from DDD.

Back to the main task: Since we only have an expenseId (a.k.a reservationId), a mapping from reservationId to showId is required to implement our Saga correctly. You’re working with Event Sourced Entities, so it’s very natural to build a read model based on the events stream. The Show and ShowByReservation are your write and read models (respectively) from the CQRS perspective.

unnamed

View as a read model

A View component is the most natural way to achieve full CQRS in the Akka ecosystem. Each View is optimized for a specific query (or queries). We can retrieve our data with SQL-like query syntax. It’s worth mentioning that a View can be updated using events emitted from an Event Sourced Entity or changes updates from a Key Value Entity. When it comes to the implementation, it’s just a matter of adding some annotations, event handlers, and exposing the query via an HTTP endpoint:

@ComponentId("show-by-reservation-view")
public class ShowByReservationView extends View {

  @Query("SELECT * FROM show_by_reservation WHERE :reservationId = ANY(reservationIds)")
  public QueryEffect<ShowByReservation> getShow(String reservationId) {
    return queryResult();
  }

  @Consume.FromEventSourcedEntity(value = ShowEntity.class)
  public static class ShowByReservationUpdater extends TableUpdater<ShowByReservation> {

    public Effect<ShowByReservation> onEvent(ShowEvent showEvent) {
      return switch (showEvent) {
        case ShowCreated created -> 
          effects().updateRow(new ShowByReservation(created.showId(), new ArrayList<>()));
        case SeatReserved reserved -> 
          effects().updateRow(rowState().add(reserved.reservationId()));
        case SeatReservationPaid paid -> 
          effects().updateRow(rowState().remove(paid.reservationId()));
        case SeatReservationCancelled cancelled -> 
          effects().updateRow(rowState().remove(cancelled.reservationId()));
      };
    }
  }
}

The getShow returns queryResult() which is only a placeholder to associate a Query annotation with View method. The actual implementation is done by the Akka engine. Let's have a detailed look at the ShowByReservation implementation, which basically keeps the reservationIds list up to date.

public record ShowByReservation(String showId, List reservationIds) {

  public ShowByReservation(String showId, String reservationId) {
    this(showId, new ArrayList<>());
    reservationIds.add(reservationId);
  }

  public ShowByReservation add(String reservationId) {
    if (!reservationIds.contains(reservationId)) {
      reservationIds.add(reservationId);
    }
    return this;
  }

  public ShowByReservation remove(String reservationId) {
    reservationIds.remove(reservationId);
    return this;
  }
}

Instead of simply mapping one showId to onereservationId, we have one showId with many reservationIds. That’s because there can be only one entry for a single entity ID in a view model. A collection like Java List will materialize itself as an SQL-like array, which is why we use the ANY operator in the query definition.

It’s a very specific view model, which may be difficult to understand at first glance. Keep in mind that there can only be one view entry for a single entity ID. However, if you're not comfortable with this solution or are concerned about query performance with larger collections (e.g., a show with 1000+ seats), I suggest a different approach.

Key Value Entity as a read model

The real power of Akka is the flexibility of how you can combine existing components. The View component is a default candidate for a read model, but not the only one. A Key Value Entity could also be used as a read model. The domain part is much simpler:

record Reservation(String reservationId, String showId) {
}

After wrapping it with the KeyValueEntity component:

@ComponentId("reservation")
public class ReservationEntity extends KeyValueEntity {

  public Effect create(String showId) {
    String reservationId = commandContext().entityId();
    return effects().updateState(new Reservation(reservationId, showId)).thenReply(done());
  }

  public Effect get() {
    if (currentState() == null) {
      return effects().error("reservation not found");
    } else {
      return effects().reply(currentState());
    }
  }

  public Effect delete() {
    return effects().deleteEntity().thenReply(done());
  }
}

The only missing part is the transformation of  the Show events into Reservation:

@ComponentId("show-events-to-reservation-consumer")
@Consume.FromEventSourcedEntity(value = ShowEntity.class, ignoreUnknown = true)
public class ShowEventsToReservationConsumer extends Consumer {

  public Effect onEvent(SeatReserved reserved) {
    return effects().asyncDone(createReservation(reserved.reservationId(), reserved.showId()));
  }

  public Effect onEvent(SeatReservationPaid paid) {
    return effects().asyncDone(deleteReservation(paid.reservationId()));
  }

  public Effect onEvent(SeatReservationCancelled cancelled) {
    return effects().asyncDone(deleteReservation(cancelled.reservationId()));
  }
}

Although it looks like more code, the solution is more explicit about what we want to achieve.

To finish our confirmation (or cancellation) Saga, we can use both solutions in the same way:

@Consume.FromEventSourcedEntity(value = WalletEntity.class, ignoreUnknown = true)
public class CompleteReservation extends Consumer {

  public Effect confirmReservation(WalletCharged walletCharged) {

    String reservationId = walletCharged.expenseId();

    return effects().asyncDone(
      getShowIdBy(reservationId)
        .thenCompose(showId -> confirmReservation(showId, reservationId))
    );
  }

  //Key Value Entity as a read model
  private CompletionStage getShowIdBy(String reservationId) {
    return componentClient.forKeyValueEntity(reservationId)
      .method(ReservationEntity::get)
      .invokeAsync()
      .thenApply(Reservation::showId);
  }

  //View as a read model
  private CompletionStage getShowIdBy2(String reservationId) {
    return componentClient.forView()
      .method(ShowByReservationView::getShow)
      .invokeAsync(reservationId)
      .thenApply(ShowByReservation::showId);
  }
}

Summary

Today we introduced two more Akka components: the View and the KeyValueEntity. I hope you liked the flexibility of connecting different puzzle pieces together to deliver a complete asynchronous CQRS implementation.

The third post in this series will cover deduplication. All Akka Consumers (or TableUpdaters), in addition to preserving the order of events (per aggregate), guarantee at-least-once delivery semantics. This means that we should be ready for event/message duplicates in our Saga choreography. Check out part2 tag from the source code.

Stay Responsive
to Change.