Saga patterns in Akka (part 4) - error handling, DLQ

5 minute read

In part 3,  we discussed strategies for achieving exactly-once delivery in distributed systems. While ensuring reliable message processing is crucial, handling failures gracefully is just as important.

The error handling topic is one of the most tricky parts when working with long-lived business processes. Even our basic example with just two steps can be a good playground to summarize what can go wrong.

The current implementation is pretty solid without any extra modifications. Thanks to the Akka Consumer component at-least-once delivery guarantees, the process will recover after any restarts or crashes.

The main green path contains three steps: seat reservation, charging, and confirmation (1).

saga-patterns-in-akka-part-4-1

In case of insufficient funds, we need to apply compensation action and cancel the reservation (2).

saga-patterns-in-akka-part-4-2

Due to the ordered event stream nature, we don’t have to worry about the reservation cancellation being consumed before the actual charge rejection. While ordered processing provides a significant advantage, it also presents a major challenge in choreography-based Sagas. 

Poison message

All SeatReserved events for a given Show will be processed one by one. Now, imagine that for some reason, charging a single wallet (or a small subset of wallets) keeps failing with an unexpected exception. This could be due to a bug in the code or an external service problem (in case we are using different wallets/payment providers). Although this error occurs relatively rarely, the whole process is blocked and can’t move forward. This is the definition of a poison message (event) problem, also called head-of-line blocking. We consume an ordered stream of events, so we can’t just skip an event in case of failure, because this could lead to an inconsistent system state, e.g., a paid reservation that was never confirmed.

We can minimize this problem with stream partitioning, similar to partitions in Apache Kafka. In case of an error, only one or some partitions are affected, and the rest can continue processing. In fact, Akka will do something like that for us under the hood.

Dead letter queue

That won’t solve the problem completely. Alternatively, we can leave the safe space of ordered event processing and address this issue with a dead-letter queue solution. After a few retries, if the error is still present, we divert the poison event for later processing and release the main processing stream. In Akka, so far, there is no built-in solution for the dead-letter queue, but you can easily simulate it with existing components.

saga-patterns-in-akka-part-4-error-handling-dlq-3

Our DLQ is based on another Event Sourced Entity—WalletFailureEntity. The implementation is simple: we just need to record the fact of the failure. An entity state, WalletFailureState, can be empty, but we can also use it to collect some statistics, like a failure counter. The event(s) should contain all the necessary information to recover after the failure. In our case, the WalletChargeFailureOccurred event reuses the ChargeWallet command (source of the problem) with an additional error message (command processing result) for debugging.

@ComponentId("wallet-failure")
public class WalletFailureEntity extends EventSourcedEntity<WalletFailureState, WalletFailureEvent> {

  public record WalletFailureState(int numberOfFailures) {
    public WalletFailureState inc() {
      return new WalletFailureState(numberOfFailures + 1);
    }
  }

  public record RegisterChargeFailure(ChargeWallet source, String msg) {
  }

  sealed interface WalletFailureEvent {
  }

  public record WalletChargeFailureOccurred(ChargeWallet source, String msg) implements WalletFailureEvent {
  }

  @Override
  public WalletFailureState emptyState() {
    return new WalletFailureState(0);
  }

  @Override
  public WalletFailureState applyEvent(WalletFailureEvent walletFailureEvent) {
    return currentState().inc();
  }

  public Effect registerFailure(RegisterChargeFailure failure) {
    return effects()
      .persist(new WalletChargeFailureOccurred(failure.source, failure.msg))
      .thenReply(__ -> done());
  }
}

Let’s focus on the ChargeForReservation Consumer. We don’t want to register a failure after the first attempt, because this could just be a temporal and recoverable glitch. It’s better to have a retry policy that will retry the call a few times before it finally gives up and registers a failure. For that, we can use Akka Future Patterns or any similar solution. 

@ComponentId("charge-for-reservation")
@Consume.FromEventSourcedEntity(value = ShowEntity.class, ignoreUnknown = true)
public class ChargeForReservation extends Consumer {

  public Effect charge(SeatReserved seatReserved) {
    ...

    return effects().asyncDone(
      Patterns.retry(() -> chargeWallet(walletId, chargeWallet),
          attempts,
          retryDelay,
          materializer.system())
        .exceptionallyComposeAsync(throwable ->
          registerFailure(throwable, walletId, chargeWallet)
        )
    );

  }
}

Processing DLQ events in HandleWalletFailures works like any other Consumer. In our case, we want to cancel the reservation.

@ComponentId("handle-wallet-failures")
@Consume.FromEventSourcedEntity(WalletFailureEntity.class)
public class HandleWalletFailures extends Consumer {

  public Effect handle(WalletChargeFailureOccurred walletChargeFailureOccurred) {
    logger.info("handling failure: {}", walletChargeFailureOccurred);

    String reservationId = walletChargeFailureOccurred.source().expenseId();

    return effects().asyncDone(getShowIdBy(reservationId).thenCompose(showId ->
      cancelReservation(reservationId, showId)
    ));
  }
}

And now the hard part. After applying the DLQ, we can’t be sure about the order of processing. Besides the basic scenarios mentioned above (1, 2), we need to be ready for more twisted flows, like reservation confirmation (wallet was charged) after the cancellation (3), or duplicated cancellation (4).

saga-patterns-in-akka-part-4-error-handling-dlq-4saga-patterns-in-akka-part-4-error-handling-dlq-5

This could happen in case of wallet timeouts. We don’t know what the ChargeWallet command result was, or if the command was processed at all. After the reservation cancellation, we can get the missing event—that the wallet was actually charged.

A refund action from the (3) flow is triggered by an additional CancelledReservationConfirmed show event.

@ComponentId("refund-for-reservation")
@Consume.FromEventSourcedEntity(value = ShowEntity.class, ignoreUnknown = true)
public class RefundForReservation extends Consumer {

  public Effect refund(CancelledReservationConfirmed cancelledReservationConfirmed) {
    …

    return effects().asyncDone(
      getReservation(cancelledReservationConfirmed.reservationId()).thenCompose(reservation ->
        refund(reservation.walletId(), reservation.price(), commandId)
      )
    );
  }
}

Implementing such flows requires additional changes in the event model, in the Show aggregate logic, and a more detailed Reservation entity (acting as a read model). All changes can be reviewed in this diff

Summary

Poison messages are one of the most complex errors to handle when working with choreography-based Sagas, especially when the implementation relies on the fact that the event stream is ordered. The proposed dead-letter queue solution based on an Event Sourced Entity helps us to solve the technical challenges, but that is not the only way to address this problem.

In the Akka ecosystem, there is a dedicated Workflow component for implementing orchestration-based Sagas, where the flow is more request-driven, and hence a single poison event can't stop the entire processing. However, there are other challenges that may surprise you, which will be the topic of the fifth part of the series. In the meantime, check out the part4 tag from the source code and play with shouldConfirmCancelledReservationAndRefund integration test.

Read part 5 - orchestration with workflows

Stay Responsive
to Change.