Scheduler
Loading

Scheduler

Sometimes the need for making things happen in the future arises, and where do you go look then? Look no further than ActorSystem! There you find the scheduler method that returns an instance of akka.actor.Scheduler, this instance is unique per ActorSystem and is used internally for scheduling things to happen at specific points in time.

You can schedule sending of messages to actors and execution of tasks (functions or Runnable). You will get a Cancellable back that you can call cancel on to cancel the execution of the scheduled operation.

Warning

The default implementation of Scheduler used by Akka is based on job buckets which are emptied according to a fixed schedule. It does not execute tasks at the exact time, but on every tick, it will run everything that is (over)due. The accuracy of the default Scheduler can be modified by the akka.scheduler.tick-duration configuration property.

Some examples

Schedule to send the "foo"-message to the testActor after 50ms:

import akka.actor.Props;
import scala.concurrent.duration.Duration;
import java.util.concurrent.TimeUnit;
system.scheduler().scheduleOnce(Duration.create(50, TimeUnit.MILLISECONDS),
  testActor, "foo", system.dispatcher(), null);

Schedule a Runnable, that sends the current time to the testActor, to be executed after 50ms:

system.scheduler().scheduleOnce(Duration.create(50, TimeUnit.MILLISECONDS),
  new Runnable() {
    @Override
    public void run() {
      testActor.tell(System.currentTimeMillis(), ActorRef.noSender());
    }
}, system.dispatcher());

Warning

If you schedule Runnable instances you should be extra careful to not pass or close over unstable references. In practice this means that you should not call methods on the enclosing Actor from within the Runnable. If you need to schedule an invocation it is better to use the schedule() variant accepting a message and an ActorRef to schedule a message to self (containing the necessary parameters) and then call the method when the message is received.

Schedule to send the "Tick"-message to the tickActor after 0ms repeating every 50ms:

import akka.actor.Props;
import scala.concurrent.duration.Duration;
import java.util.concurrent.TimeUnit;
import akka.actor.UntypedActor;
import akka.actor.Cancellable;
class Ticker extends UntypedActor {
  @Override
  public void onReceive(Object message) {
    if (message.equals("Tick")) {
      // Do someting
    } else {
      unhandled(message);
    }
  }
}

ActorRef tickActor = system.actorOf(Props.create(Ticker.class, this));

//This will schedule to send the Tick-message
//to the tickActor after 0ms repeating every 50ms
Cancellable cancellable = system.scheduler().schedule(Duration.Zero(),
Duration.create(50, TimeUnit.MILLISECONDS), tickActor, "Tick",
system.dispatcher(), null);

//This cancels further Ticks to be sent
cancellable.cancel();

From akka.actor.ActorSystem

/**
 * Light-weight scheduler for running asynchronous tasks after some deadline
 * in the future. Not terribly precise but cheap.
 */
def scheduler: Scheduler

The Scheduler Interface for Implementors

The actual scheduler implementation is loaded reflectively upon ActorSystem start-up, which means that it is possible to provide a different one using the akka.scheduler.implementation configuration property. The referenced class must implement the following interface:

/**
 * An Akka scheduler service. This one needs one special behavior: if
 * Closeable, it MUST execute all outstanding tasks upon .close() in order
 * to properly shutdown all dispatchers.
 *
 * Furthermore, this timer service MUST throw IllegalStateException if it
 * cannot schedule a task. Once scheduled, the task MUST be executed. If
 * executed upon close(), the task may execute before its timeout.
 *
 * Scheduler implementation are loaded reflectively at ActorSystem start-up
 * with the following constructor arguments:
 *  1) the system’s com.typesafe.config.Config (from system.settings.config)
 *  2) a akka.event.LoggingAdapter
 *  3) a java.util.concurrent.ThreadFactory
 */
public abstract class AbstractScheduler extends AbstractSchedulerBase {

  /**
   * Schedules a function to be run repeatedly with an initial delay and
   * a frequency. E.g. if you would like the function to be run after 2
   * seconds and thereafter every 100ms you would set delay = Duration(2,
   * TimeUnit.SECONDS) and interval = Duration(100, TimeUnit.MILLISECONDS)
   */
  @Override
  public abstract Cancellable schedule(FiniteDuration initialDelay,
      FiniteDuration interval, Runnable runnable, ExecutionContext executor);

  /**
   * Schedules a Runnable to be run once with a delay, i.e. a time period that
   * has to pass before the runnable is executed.
   */
  @Override
  public abstract Cancellable scheduleOnce(FiniteDuration delay, Runnable runnable,
      ExecutionContext executor);

  /**
   * The maximum supported task frequency of this scheduler, i.e. the inverse
   * of the minimum time interval between executions of a recurring task, in Hz.
   */
  @Override
  public abstract double maxFrequency();
}

The Cancellable interface

Scheduling a task will result in a Cancellable (or throw an IllegalStateException if attempted after the scheduler’s shutdown). This allows you to cancel something that has been scheduled for execution.

Warning

This does not abort the execution of the task, if it had already been started. Check the return value of cancel to detect whether the scheduled task was canceled or will (eventually) have run.

/**
 * Signifies something that can be cancelled
 * There is no strict guarantee that the implementation is thread-safe,
 * but it should be good practice to make it so.
 */
trait Cancellable {
  /**
   * Cancels this Cancellable and returns true if that was successful.
   * If this cancellable was (concurrently) cancelled already, then this method
   * will return false although isCancelled will return true.
   *
   * Java & Scala API
   */
  def cancel(): Boolean

  /**
   * Returns true if and only if this Cancellable has been successfully cancelled
   *
   * Java & Scala API
   */
  def isCancelled: Boolean
}

Contents