The easy button for durable state queries
Durable State persistence was introduced over a year ago as a complement to Event Sourced persistence. It’s useful when only the latest state is of interest. The full state is stored and it overwrites previously stored state. In contrast, Event Sourcing stores all events (changes) leading up to the current state and recovers the state by replaying the events.
One of the limitations is that you can interact with individual durable state entities if you know the identifiers of the entities, but you can’t make queries involving more than one entity. For such queries you need to use Akka Projections and store a separate query representation for each query field. The projection is processing the durable state changes asynchronously by reading from the database. This is not always optimal, since it takes processing time, additional storage, and the asynchronous eventual consistency adds complexity.
With the new release of Akka Persistence R2DBC we provide a mechanism for storing a query representation of the durable state in the same transaction as the Durable State change. It is updated from the write side without an asynchronous projection. Some advantages of this approach are:
- exactly-once processing and atomic update with the Durable State change
- no eventual consistency delay from asynchronous Projection processing
- no need for Projection offset storage
An example could be a blog post that is stored with Durable State, and you would also be able to find blog posts by their title, which is not the same as the entity identifier. You would define an additional indexed column for the title in the database table where the durable state is stored. To populate this column when the durable state entity is updated you would define an AdditionalColumn, which is essentially a function from the state to the value that will be stored in the column.
class BlogPostTitleColumn extends AdditionalColumn[BlogPost.State, String] {
override val columnName: String = "title"
override def bind(upsert: AdditionalColumn.Upsert[BlogPost.State]): AdditionalColumn.Binding[String] =
AdditionalColumn.BindValue(s.content.title)
}
Full example can be found in the reference documentation.
Then you can query the table using the title column with ordinary SQL to find blog posts with matching titles.
For more advanced query representations than additional indexed columns it is possible to use a ChangeHandler, which is invoked for each update of the durable state. It can for example store a query representation or aggregated values in a separate table. Still in the same transaction as the durable state update.
An example of a change handler that keeps track of number of published blog posts:
class BlogPostCounts(system: ActorSystem[_]) extends ChangeHandler[BlogPost.State] {
private val incrementSql =
"INSERT INTO post_count (slice, cnt) VALUES ($1, 1) " +
"ON CONFLICT (slice) DO UPDATE SET cnt = excluded.cnt + 1"
private implicit val ec: ExecutionContext = system.executionContext
override def process(session: R2dbcSession, change: DurableStateChange[BlogPost.State]): Future[Done] = {
change match {
case upd: UpdatedDurableState[BlogPost.State] =>
if (upd.value.published) {
val slice = Persistence(system).sliceForPersistenceId(upd.persistenceId)
val stmt = session
.createStatement(incrementSql)
.bind(0, slice)
session.updateOne(stmt).map(_ => Done)
} else {
Future.successful(Done)
}
case _: DeletedDurableState[BlogPost.State] =>
// not used
Future.successful(Done)
}
}
}
Full example can be found in the reference documentation.
With these new features the Durable State persistence becomes easy to use for CRUD (create, read, update, delete) style persistence. Still with the nice characteristics of entities fully managed by Akka Cluster Sharding, but with the full power of a rich query model.
Posts by this author