Introducing Akka Cloud to Edge Continuum. Build once for the Cloud. Seamlessly deploy to the Edge. Learn More
 

News & Articles

Full archive

May 19

2017

Akka Typed: Lifecycle and Watch

Note: Code examples in this blog post are out of date, see the Akka documentation for latest information on this topic.

In previous post we learned about supervision and how to restart a behavior in case of failures. Related to that is the actor’s lifecycle and how to watch an actor to be notified when it’s terminated.

Classic untyped actors have lifecycle hooks; preStart, preRestart, postRestart and postStop. Some of those call each other and your head is probably starting to spin immediately when you try to recall when each one is invoked. In Akka Typed these are represented as messages and simplified to only two messages; PreRestart and PostStop.

preStart and postRestart are replaced by placing such startup code in the constructor of the behavior, or more typically in a deferred behavior. Actor.deferred is like a factory for a behavior. Creation of the behavior instance is deferred until the actor is started, as opposed to Actor.immutable that creates the behavior instance immediately before the actor is running.

Obviously, PreRestart is received when the behavior is restarted and PostStop when it’s stopped, but it’s worth noting that PostStop is not signalled when the behavior is restarted.

The typical usage of PreRestart and PostStop is to close resources that the actor has been using. For example, the FlakyWorker from previous post can be expanded to write the jobs to a file, which must be closed when the actor is stopped or restarted.

object FlakyWorker2 {

  sealed trait Command
  final case class Job(payload: String) extends Command

  val workerBehavior: Behavior[Command] = Actor.deferred { ctx =>
    ctx.system.log.info("Worker {} is STARTED", ctx.self)
    val out = new PrintWriter(new FileWriter(
      s"target/out-${ctx.self.path.name}.txt", true))
    active(count = 1, out)
  }

  private def active(count: Int, out: PrintWriter): Behavior[Command] =
    Actor.immutable[Command] { (ctx, msg) =>
      msg match {
        case Job(payload) =>
          if (ThreadLocalRandom.current().nextInt(5) == 0)
            throw new RuntimeException("Bad luck")

          ctx.system.log.info("Worker {} got job {}, count {}", ctx.self, payload, count)
          out.println(s"Worker ${ctx.self} got job $payload, count $count")
          active(count + 1, out)
      }
    } onSignal {
      case (ctx, PreRestart) =>
        ctx.system.log.info("Worker {} is RESTARTED, count {}", ctx.self, count)
        out.close()
        Actor.same
      case (ctx, PostStop) =>
        ctx.system.log.info("Worker {} is STOPPED, count {}", ctx.self, count)
        out.close()
        Actor.same
    }
}

(Same example in Java)

Note how the file is opened in deferred when the actor is started and closed when receiving PreRestart and PostStop. I said that those are messages. Well, they are not conforming to the message type of the behavior so they can’t be handled in the ordinary onMessage function. Instead we call them signals and they are handled in a separate onSignal function. onSignal is a partial function, as opposed to the onMessage that is an ordinary total function, because you probably only want to handle a few of the defined signals. The total set of signal types are defined by Akka and more signal types may be added in future versions. You don’t want match errors because of unhandled signals.

Let’s expand the FlakyWorker example even further. We would like to have several workers and each one responsible for a “partition” of jobs to spread the load and in this specific case write jobs for different partitions to separate files. For this we introduce a parent manager actor that spawns workers depending on the given partition of an incoming job, and delegates jobs to the right worker.

The manager is supervising the workers and restarting them if they fail, but with some limits. After two failures within 1 second the worker is stopped instead of restarted.

restartWithLimit(maxNrOfRetries = 2, 1.second)

The fact that a worker has been stopped is something that the manager should know about. That is not solved by supervision. Instead it needs to watch the workers. You already know that concept from untyped actors. It is exactly the same with typed actors. The full manager looks like this:

object WorkerManager {

  sealed trait Command
  final case class Job(partition: Int, payload: String) extends Command

  private val strategy = SupervisorStrategy.restartWithLimit(maxNrOfRetries = 2, 1.second)
  private val worker: Behavior[FlakyWorker2.Command] =
    Actor.supervise(FlakyWorker2.workerBehavior).onFailure[RuntimeException](strategy)

  val workerManagerBehavior: Behavior[Command] =
    active(Map.empty)

  private def spawnWorker(partition: Int, ctx: ActorContext[Command]): ActorRef[FlakyWorker2.Command] = {
    val w = ctx.spawn(worker, s"worker-$partition")
    ctx.watch(w)
    w
  }

  private def active(workers: Map[Int, ActorRef[FlakyWorker2.Command]]): Behavior[Command] = {
    Actor.immutable[Command] { (ctx, msg) =>
      msg match {
        case job @ Job(partition, payload) =>
          val (w, newWorkers) = workers.get(partition) match {
            case Some(w) =>
              (w, workers)
            case None =>
              val w = spawnWorker(partition, ctx)
              (w, workers.updated(partition, w))
          }
          w ! FlakyWorker2.Job(payload)
          active(newWorkers)
      }
    } onSignal {
      case (ctx, Terminated(ref)) =>
        ctx.system.log.info("Worker {} is TERMINATED", ref)
        val newWorkers = workers.filterNot { case (_, w) => w == ref }
        active(newWorkers)
    }
  }

}

(Same example in Java)

Note how the worker is watched when it’s spawned and that the Terminated message is handled in onSignal. In this example it’s watching a child actor, but watch can be used on any ActorRef and not only children.

The above code is using filterNot because the partition number is not known and that would not be very efficient if there were many entries in the Map. We could maintain a bidirectional map (using two maps) but it would be better if the terminated signal could carry the partition number. There is a new feature in Akka typed that comes in handy for that. You can define an application specific message instead of Terminated. Such message can carry additional information such as the partition number in the above example. It is used like this:

  private final case class WorkerStopped(partition: Int) extends Command

  private def spawnWorker(partition: Int, ctx: ActorContext[Command]): ActorRef[FlakyWorker2.Command] = {
    val w = ctx.spawn(worker, s"worker-$partition")
    ctx.watchWith(w, WorkerStopped(partition))
    w
  }
  
  private def active(workers: Map[Int, ActorRef[FlakyWorker2.Command]]): Behavior[Command] = {
    Actor.immutable[Command] { (ctx, msg) =>
      msg match {
        case job @ Job(partition, payload) =>
          val (w, newWorkers) = workers.get(partition) match {
            case Some(w) =>
              (w, workers)
            case None =>
              val w = spawnWorker(partition, ctx)
              (w, workers.updated(partition, w))
          }
          w ! FlakyWorker2.Job(payload)
          active(newWorkers)
          
        case WorkerStopped(partition) =>
          ctx.system.log.info("Worker {} is TERMINATED", workers(partition))
          active(workers - partition)
      }
    }
  }

(Same example in Java)

A good exercise to fully understand the actor lifecycle concept is to run blog.typed.scaladsl.FlakyWorker2App in patriknw/akka-typed-blog and inspect the log output and the files it creates.

The full source code of these examples, including corresponding Java examples, are available in patriknw/akka-typed-blog.

Edit 2017-05-24: In Akka 2.5.2 Actor.restarter was renamed to Actor.supervise. This post and the example repository has been updated.