"; */ ?>

scalatest


11
Oct 11

AKKA Scheduler: Sending Message to Actor’s Self on Start

Akka has a little scheduler written using actors. This can be convenient if you want to schedule some periodic task for maintenance or similar. It allows you to register a message that you want to be sent to a specific actor at a periodic interval.

How Does AKKA Schedule Things?


Behind the scenes, AKKA scheduler relies on “ScheduledExecutorService” from the “java.util.concurrent” package. Hence when AKKA Scheduler needs to schedule “a message sent to an actor, given a certain initial delay and interval”, it just wraps the task of sending a message in a “java.lang.Runnable”, and uses a “ScheduledExecutorService” to schedule it:

service.scheduleAtFixedRate( createSendRunnable( receiver, message, true ), 
                             initialDelay, 
                             delay, 
                             timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]

“Heartbeat” Actor


Let’s look at the example of scheduling a message that should be sent to an Actor’s “self” as the Actor on start. Why? Because it is a cool use case : )

“Heartbeat” would be an ideal example of such use case => “When a ‘Hearbeat Actor’ starts, it should start sending heartbeats with a given interval (e.g. every 2 seconds)”

Creating a Message

First we need to create a message that will be scheduled to be sent every so often. We’ll call it a “SendHeartbeat” message:

sealed trait HeartbeatMessage
case object SendHeartbeat extends HeartbeatMessage
Heartbeat of the Hollywood

Since sending a heartbeat needs to be scheduled as the Actor starts, the scheduling logic should be placed in the AKKA “preStart()” hook, which is called right before the Actor is started:

override def preStart() {
 
  logger.debug( "scheduling a heartbeat to go out every " + interval + " seconds" )
 
  // scheduling the task (with the 'self') should be the last statement in preStart()
  scheduledTask = Scheduler.schedule( self, SendHeartbeat, 0, interval, TimeUnit.SECONDS )
}

Another thing to note, all the other non scheduling on start logic, if any, should go before the call to the scheduler, otherwise the task will not be scheduled.

Heartbeat should also be stoppable. We could have called “Scheduler.shutdown()” in Actor’s “postStop()”, but first, this would stop all the other tasks that were potentially scheduled by others, and second, it will result in a very dark AKKA magic behavior.

Instead, the heartbeat task itself should be cancelled => which is lot cleaner than calling for the dark magic for no good reason:

override def postStop() {
  scheduledTask.cancel( true )
}

Having the above two in mind here is the Hollywood Heartbeat himself:

class Heartbeat ( val interval: Long ) extends Actor {
 
  private val logger = LoggerFactory.getLogger( this.getClass )
  private var scheduledTask: ScheduledFuture[AnyRef] = null
 
  override def preStart() {
 
    logger.debug( "scheduling a heartbeat to go out every " + interval + " seconds" )
 
    // scheduling the task (with the 'self') should be the last statement in preStart()
    scheduledTask = Scheduler.schedule( self, SendHeartbeat, 0, interval, TimeUnit.SECONDS )
  }
 
  def receive = {
 
    case SendHeartbeat =>
 
      logger.debug( "Sending a hearbeat" )
      // sending a heartbeat here.. socket.write( heartbeatBytes )
      true
 
    case unknown =>
 
      throw new RuntimeException( "ERROR: Received unknown message [" + unknown + "], can't handle it" )
  }
 
  override def postStop() {
    scheduledTask.cancel( true )
  }
}
Making Sure the Heart is Beating

We’ll use ScalaTest to run the beast. This is more of a runner than a real test, since it does not really test for anything, but it proves the point:

class HeartbeatTest extends WordSpec  {
 
  val heartBeatInterval = 2
 
  "it" should {
 
    "send heartbeats every" + heartBeatInterval + " seconds" in {
       val heartbeat = actorOf ( new Heartbeat( heartBeatInterval ) ).start()
       Thread.sleep( heartBeatInterval * 1000 + 3000 )
       heartbeat.stop()
     }
  }
}

And.. We are “In the Money“:

17:02:54,118 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [STDOUT] to Logger[ROOT]
 
Sending a hearbeat
Sending a hearbeat
Sending a hearbeat
 
Process finished with exit code 0