News & Articles

Full archive

October 13

2017

Akka Typed: New Persistence API

How to use Cluster Sharding with Akka Typed was shown in previous post. Sharding is typically used with persistent actors so in this post we will introduce the new APIs for typed persistent actors.

These features are still using the existing implementations from the untyped modules and they require that you run with the untyped ActorSystem as described in Akka Typed: Coexistence.

Akka Persistence enables stateful actors to persist their internal state so that it can be recovered when an actor is started, restarted after a JVM crash or by a supervisor, or migrated in a cluster. The key concept behind Akka persistence is that only changes to an actor’s internal state are persisted but never its current state directly (except for optional snapshots). Such stateful actors are recovered by replaying stored changes to these actors from which they can rebuild internal state.

This design of capturing all changes as domain events, which are immutable facts of things that have happened, is known as event sourcing.

Stub

This is what a typed PersistentActor (PersistentBehavior) looks like before filling in the implementation details:

import akka.typed.persistence.scaladsl.PersistentActor
import akka.typed.Behavior

object BlogPost1 {

  def behavior: Behavior[BlogCommand] =
    PersistentActor.immutable[BlogCommand, BlogEvent, BlogState](
      persistenceId = "abc",
      initialState = BlogState.empty,
      actions = PersistentActor.Actions { (ctx, cmd, state)  ??? },
      applyEvent = (evt, state)  ???)

}

There are 3 type parameters:

  • Command - the super class/interface of the commands
  • Event - the super class/interface of the events
  • State - the class of the state

and 4 parameters:

persistenceId is the unique identifier for the persistent actor.

initialState defines the State when the entity is first created.

actions defines command handlers and optional functions for other signals, e.g. Termination messages if watch is used.

applyEvent is the event handler that updates the current state when an event has been persisted.

Command Handlers

The commands for this example:

sealed trait BlogCommand extends Serializable

final case class AddPost(content: PostContent, replyTo: ActorRef[AddPostDone]) extends BlogCommand

final case class AddPostDone(postId: String)

final case class GetPost(replyTo: ActorRef[PostContent]) extends BlogCommand

final case class ChangeBody(newBody: String, replyTo: ActorRef[Done]) extends BlogCommand

final case class Publish(replyTo: ActorRef[Done]) extends BlogCommand

final case object PassivatePost extends BlogCommand

The function that process incoming commands is defined by the mandatory commandHandler parameter of the Actions.

  private val actions: Actions[BlogCommand, BlogEvent, BlogState] =
    Actions { (ctx, cmd, state) 
      cmd match {
        case AddPost(content, replyTo) 
          val evt = PostAdded(content.postId, content)
          Persist(evt).andThen { state2 
            // After persist is done additional side effects can be performed
            replyTo ! AddPostDone(content.postId)
          }
        case ChangeBody(newBody, replyTo) 
          val evt = BodyChanged(state.postId, newBody)
          Persist(evt).andThen { _ 
            replyTo ! Done
          }
        case Publish(replyTo) 
          Persist(Published(state.postId)).andThen { _ 
            replyTo ! Done
          }
        case GetPost(replyTo) 
          replyTo ! state.content.get
          PersistNothing()
        case PassivatePost =>
          Stop()
      }
    }

The command handler is a function with 3 parameters for the ActorContext, Command, and current State.

A command handler returns an Effect directive that defines what event or events, if any, to persist.

  • Persist will persist one single event
  • PersistAll will persist several events atomically, i.e. all events are stored or none of them are stored if there is an error
  • PersistNothing no events are to be persisted, for example a read-only command
  • Unhandled the command is unhandled (not supported) in current state

External side effects can be performed after successful persist with the andThen function. In the above example a reply is sent to the replyTo. Note that the new state after applying the event is passed as parameter to the andThen function.

Event Handlers

The events for this example:

sealed trait BlogEvent extends Serializable

final case class PostAdded(
  postId: String,
  content: PostContent) extends BlogEvent

final case class BodyChanged(
  postId: String,
  newBody: String) extends BlogEvent

final case class Published(postId: String) extends BlogEvent

When an event has been persisted successfully the current state is updated by applying the event to the current state with the applyEvent function. The event handler returns the new state, which must be immutable so you return a new instance of the state. The same event handler is also used when the entity is started up to recover its state from the stored events

  private def applyEvent(event: BlogEvent, state: BlogState): BlogState =
    event match {
      case PostAdded(postId, content) 
        state.withContent(content)

      case BodyChanged(_, newBody) 
        state.content match {
          case Some(c)  state.copy(content = Some(c.copy(body = newBody)))
          case None     state
        }

      case Published(_) 
        state.copy(published = true)
    }

State

The state for this example without further ado:


object BlogState {
  val empty = BlogState(None, published = false)
}

final case class BlogState(
  content: Option[PostContent],
  published: Boolean) {

  def withContent(newContent: PostContent): BlogState =
    copy(content = Some(newContent))

  def isEmpty: Boolean = content.isEmpty

  def postId: String = content match {
    case Some(c) => c.postId
    case None    => throw new IllegalStateException("postId unknown before post is created")
  }
}

final case class PostContent(postId: String, title: String, body: String)

Changing Behavior

After processing a message an ordinary, non-persistent, typed actor returns the Behavior that is used for next message. As you can see in the above examples that is not supported by typed persistent actors. Instead, the state is returned by applyEvent. The reason a new behavior can’t be returned is that behavior is part of the actor’s state and must also carefully be reconstructed during recovery. If it would have been supported it would mean that the behavior must be restored when replaying events and also encoded in the state anyway when snapshots are used. That would be very prone to mistakes.

For simple actors you can use the same set of command handlers independent of what state the entity is in, as shown in above example.

For more complex actors it’s useful to be able to change the behavior in the sense that different functions for processing commands may be defined depending on what state the actor is in. This is useful when implementing finite state machine (FSM) like entities. The Actions, the command handler, can be selected based on current state by using the Actions.byState factory method. It is a function from current State to Actions, which is called for each incoming command to select which Actions to use to process the command.

This is how to define different behavior for different State:

  private val actions: Actions[BlogCommand, BlogEvent, BlogState] = Actions.byState {
    case state if state.isEmpty   initial
    case state if !state.isEmpty  postAdded
  }

  private val initial: Actions[BlogCommand, BlogEvent, BlogState] =
    Actions { (ctx, cmd, state) 
      cmd match {
        case AddPost(content, replyTo) 
          val evt = PostAdded(content.postId, content)
          Persist(evt).andThen { state2 
            // After persist is done additional side effects can be performed
            replyTo ! AddPostDone(content.postId)
          }
        case PassivatePost =>
          Stop()
        case other 
          Unhandled()
      }
    }

  private val postAdded: Actions[BlogCommand, BlogEvent, BlogState] = {
    Actions { (ctx, cmd, state) 
      cmd match {
        case ChangeBody(newBody, replyTo) 
          val evt = BodyChanged(state.postId, newBody)
          Persist(evt).andThen { _ 
            replyTo ! Done
          }
        case Publish(replyTo) 
          Persist(Published(state.postId)).andThen { _ 
            println(s"Blog post ${state.postId} was published")
            replyTo ! Done
          }
        case GetPost(replyTo) 
          replyTo ! state.content.get
          PersistNothing()
        case _: AddPost 
          Unhandled()
        case PassivatePost =>
          Stop()
      }
    }
  }

The event handler is always the same independent of state. The main reason for not making the event handler part of the Actions is that all events must be handled and that is typically independent of what the current state is. The event handler can of course still decide what to do based on the state if that is needed.

Serialization

The same serialization mechanism as for untyped actors is also used in Akka Typed, also for persistent actors. When picking serialization solution for the events you should also consider that it must be possible read old events when the application has evolved. Strategies for that can be found in the Akka documentation.

The example code that comes with this blog post includes Protobuf serialization for the commands, events and state of the BlogPost example, see BlogSerializer.

Running

PersistentActor.immutable that was described in the beginning returns a PersistentBehavior that is a subclass of Behavior and that means that you can start the actor with the ordinary spawn method of the ActorContext.

You also have to configure an Akka Persistence journal. The full example code includes such configuration for akka-persistence-cassandra.

Previous post describes how to use Cluster Sharding. There is one thing to be aware of. When used with Cluster Sharding the persistenceId is not known until the actor is started and typically based on the entityId, which is the actor name. Therefore, with sharding PersistentActor.persistentEntity must be used instead of PersistentActor.immutable. It takes a function to create the persistenceId`.

import akka.typed.cluster.sharding.EntityTypeKey

object BlogPost {

  val ShardingTypeName = EntityTypeKey[BlogCommand]("BlogPost")
  
  def shardingBehavior: Behavior[BlogCommand] =
    PersistentActor.persistentEntity[BlogCommand, BlogEvent, BlogState](
      persistenceIdFromActorName = name => ShardingTypeName.name + "-" + name,
      initialState = BlogState.empty,
      actions = PersistentActor.Actions { (ctx, cmd, state)  ??? },
      applyEvent = (evt, state)  ???)

}

This also highlights a limitation with the current implementation. The PersistentBehavior can’t be wrapped in other behaviors, such as Actor.deferred. The reason for that is that we are running the PersistentBehavior with an untyped PersistentActor and that doesn’t allow such wrapping. We intend to re-implement this with a pure typed actor and then wrapping will be possible. The untyped journals, e.g. akka-persistence-cassandra, will still be possible to use as is. When this limitation is removed the above PersistentActor.persistentEntity will not be needed, because the persistenceId can be created from the actor name inside deferred.

Feedback wanted

The full source code of these examples, are available in patriknw/akka-typed-blog.

By this we are ending this series of blog posts about Akka Typed APIs for cluster and persistence. We expect a few more iterations to improve the APIs, so your feedback would be very valuable. We are currently working on other things but we will get back to Akka Typed soon and then we will focus on things like:

  • missing Java API, e.g. for Persistence
  • testing, testkit
  • documentation
  • general improvements of quality
  • and all those small things

Help is of course welcome.

This post is part of the "Akka Typed Cluster and Persistence" series. Explore other posts in this series:

  1. Akka Typed: New Cluster API
  2. Akka Typed: New Cluster Tools API
  3. → Akka Typed: New Persistence API