Introducing Akka Cloud to Edge Continuum. Build once for the Cloud. Seamlessly deploy to the Edge. Learn More
 

News & Articles

Full archive

May 26

2017

Akka Typed: Timers

Note: Code examples in this blog post are out of date, see the Akka documentation for latest information on this topic.

This blog post is showcasing how nice it is to work with scheduled messages in Akka Typed. The ActorSystem still has a Scheduler and there is a schedule method in the ActorContext to send a message after a delay. However, none of those are particularly good when an actor needs to schedule messages to itself, which is rather common. There are problems like:

  • It can be difficult to strictly cancel a scheduled message because it might already be enqueued in the mailbox.
  • It’s easy to accidentally double the rate of periodic messages when the actor is restarted.

The FSM in classic akka-actor has nice timers and there is a similar API in Akka Typed.

The following example demonstrates how to use such timers. The Buncher actor buffers a burst of incoming messages and delivers them as a batch after a timeout or when the number of batched messages exceeds a maximum size.

object Buncher {
  trait Msg
  final case class Batch(messages: Vector[Msg])

  private case object TimerKey
  private case object Timeout extends Msg

  def behavior(target: ActorRef[Batch], after: FiniteDuration, maxSize: Int): Behavior[Msg] =
    Actor.withTimers(timers => idle(timers, target, after, maxSize))

  private def idle(timers: TimerScheduler[Msg], target: ActorRef[Batch],
    after: FiniteDuration, maxSize: Int): Behavior[Msg] = {
    Actor.immutable[Msg] { (ctx, msg) =>
      timers.startSingleTimer(TimerKey, Timeout, after)
      active(Vector(msg), timers, target, after, maxSize)
    }
  }

  private def active(buffer: Vector[Msg], timers: TimerScheduler[Msg],
    target: ActorRef[Batch], after: FiniteDuration, maxSize: Int): Behavior[Msg] = {
    Actor.immutable[Msg] { (ctx, msg) =>
      msg match {
        case Timeout =>
          target ! Batch(buffer)
          idle(timers, target, after, maxSize)
        case msg =>
          val newBuffer = buffer :+ msg
          if (newBuffer.size == maxSize) {
            timers.cancel(TimerKey)
            target ! Batch(newBuffer)
            idle(timers, target, after, maxSize)
          } else
            active(newBuffer, timers, target, after, maxSize)
      }
    }
  }
}

(Same example in Java)

There are a few things worth noting here:

  • To get access to the timers you start with Actor.withTimers that will pass a TimerScheduler instance to the function. This can be used with any type of Behavior, such as immutable or mutable.
  • Each timer has a key and if a new timer with same key is started the previous is cancelled and it’s guaranteed that a message from the previous timer is not received, even though it might already be enqueued in the mailbox when the new timer is started.
  • Both periodic and single message timers are supported.
  • The TimerScheduler is mutable in itself, because it performs and manages the side effects of registering the scheduled tasks.
  • The TimerScheduler is bound to the lifecycle of the actor that owns it and it’s cancelled automatically when the actor is stopped.
  • Actor.withTimers can also be used inside Actor.supervise and it will automatically cancel the started timers correctly when the actor is restarted, so that the new incarnation will not receive scheduled messages from previous incarnation.

This is the last post in the Introducing Akka Typed series. I hope you see the potential of Akka Typed. Next step will be to integrate Akka Typed with other parts of Akka, such as Cluster, Persistence, and Streams.

The full source code of these examples, including corresponding Java examples, are available in patriknw/akka-typed-blog.