Note: Code examples in this blog post are out of date, see the Akka documentation for latest information on this topic.
This blog post is showcasing how nice it is to work with scheduled messages in Akka Typed. The ActorSystem
still has a Scheduler
and there is a schedule
method in the ActorContext
to send a message after a delay. However, none of those are particularly good when an actor needs to schedule messages to itself, which is rather common. There are problems like:
- It can be difficult to strictly cancel a scheduled message because it might already be enqueued in the mailbox.
- It’s easy to accidentally double the rate of periodic messages when the actor is restarted.
The FSM
in classic akka-actor
has nice timers and there is a similar API in Akka Typed.
The following example demonstrates how to use such timers. The Buncher
actor buffers a burst of incoming messages and delivers them as a batch after a timeout or when the number of batched messages exceeds a maximum size.
object Buncher {
trait Msg
final case class Batch(messages: Vector[Msg])
private case object TimerKey
private case object Timeout extends Msg
def behavior(target: ActorRef[Batch], after: FiniteDuration, maxSize: Int): Behavior[Msg] =
Actor.withTimers(timers => idle(timers, target, after, maxSize))
private def idle(timers: TimerScheduler[Msg], target: ActorRef[Batch],
after: FiniteDuration, maxSize: Int): Behavior[Msg] = {
Actor.immutable[Msg] { (ctx, msg) =>
timers.startSingleTimer(TimerKey, Timeout, after)
active(Vector(msg), timers, target, after, maxSize)
}
}
private def active(buffer: Vector[Msg], timers: TimerScheduler[Msg],
target: ActorRef[Batch], after: FiniteDuration, maxSize: Int): Behavior[Msg] = {
Actor.immutable[Msg] { (ctx, msg) =>
msg match {
case Timeout =>
target ! Batch(buffer)
idle(timers, target, after, maxSize)
case msg =>
val newBuffer = buffer :+ msg
if (newBuffer.size == maxSize) {
timers.cancel(TimerKey)
target ! Batch(newBuffer)
idle(timers, target, after, maxSize)
} else
active(newBuffer, timers, target, after, maxSize)
}
}
}
}
There are a few things worth noting here:
- To get access to the timers you start with
Actor.withTimers
that will pass aTimerScheduler
instance to the function. This can be used with any type ofBehavior
, such asimmutable
ormutable
. - Each timer has a key and if a new timer with same key is started the previous is cancelled and it’s guaranteed that a message from the previous timer is not received, even though it might already be enqueued in the mailbox when the new timer is started.
- Both periodic and single message timers are supported.
- The
TimerScheduler
is mutable in itself, because it performs and manages the side effects of registering the scheduled tasks. - The
TimerScheduler
is bound to the lifecycle of the actor that owns it and it’s cancelled automatically when the actor is stopped. Actor.withTimers
can also be used insideActor.supervise
and it will automatically cancel the started timers correctly when the actor is restarted, so that the new incarnation will not receive scheduled messages from previous incarnation.
This is the last post in the Introducing Akka Typed series. I hope you see the potential of Akka Typed. Next step will be to integrate Akka Typed with other parts of Akka, such as Cluster, Persistence, and Streams.
The full source code of these examples, including corresponding Java examples, are available in patriknw/akka-typed-blog.
This post is part of the "Introducing Akka Typed" series. Explore other posts in this series:
- Akka Typed: Hello World in the new API
- Akka Typed: Coexistence
- Akka Typed: Mutable vs. Immutable
- Akka Typed: Protocols
- Akka Typed: Supervision
- Akka Typed: Lifecycle and Watch
- → Akka Typed: Timers
- Akka Typed: New Cluster API
- Akka Typed: New Cluster Tools API
- Akka Typed: New Persistence API
- Announcing the course Programming Reactive Systems!
- Typed Supervision: why the changes?
- Akka 2.5.22 Brings Akka Typed To Production Ready
- Akka 2.6 roadmap
- Tour of Akka Typed