Building Finite State Machine Actors (Java)

Building Finite State Machine Actors (Java)

Overview

The FSM (Finite State Machine) pattern is best described in the Erlang design principles. In short, it can be seen as a set of relations of the form:

State(S) x Event(E) -> Actions (A), State(S’)

These relations are interpreted as meaning:

If we are in state S and the event E occurs, we should perform the actions A and make a transition to the state S’.

While the Scala programming language enables the formulation of a nice internal DSL (domain specific language) for formulating finite state machines (see FSM), Java’s verbosity does not lend itself well to the same approach. This chapter describes ways to effectively achieve the same separation of concerns through self-discipline.

How State should be Handled

All mutable fields (or transitively mutable data structures) referenced by the FSM actor’s implementation should be collected in one place and only mutated using a small well-defined set of methods. One way to achieve this is to assemble all mutable state in a superclass which keeps it private and offers protected methods for mutating it.

import java.util.ArrayList;
import java.util.List;
import akka.actor.ActorRef;
static abstract class MyFSMBase extends UntypedActor {

  /*
   * This is the mutable state of this state machine.
   */
  protected enum State {
    IDLE, ACTIVE;
  }

  private State state = State.IDLE;
  private ActorRef target;
  private List<Object> queue;

  /*
   * Then come all the mutator methods:
   */
  protected void init(ActorRef target) {
    this.target = target;
    queue = new ArrayList<Object>();
  }

  protected void setState(State s) {
    if (state != s) {
      transition(state, s);
      state = s;
    }
  }

  protected void enqueue(Object o) {
    if (queue != null)
      queue.add(o);
  }

  protected List<Object> drainQueue() {
    final List<Object> q = queue;
    if (q == null)
      throw new IllegalStateException("drainQueue(): not yet initialized");
    queue = new ArrayList<Object>();
    return q;
  }

  /*
   * Here are the interrogation methods:
   */
  protected boolean isInitialized() {
    return target != null;
  }

  protected State getState() {
    return state;
  }

  protected ActorRef getTarget() {
    if (target == null)
      throw new IllegalStateException("getTarget(): not yet initialized");
    return target;
  }

  /*
   * And finally the callbacks (only one in this example: react to state change)
   */
  abstract protected void transition(State old, State next);
}

The benefit of this approach is that state changes can be acted upon in one central place, which makes it impossible to forget inserting code for reacting to state transitions when adding to the FSM’s machinery.

Message Buncher Example

The base class shown above is designed to support a similar example as for the Scala FSM documentation: an actor which receives and queues messages, to be delivered in batches to a configurable target actor. The messages involved are:

public static final class SetTarget {
  final ActorRef ref;

  public SetTarget(ActorRef ref) {
    this.ref = ref;
  }
}

public static final class Queue {
  final Object o;

  public Queue(Object o) {
    this.o = o;
  }
}

public static final Object flush = new Object();

public static final class Batch {
  final List<Object> objects;

  public Batch(List<Object> objects) {
    this.objects = objects;
  }
}

This actor has only the two states IDLE and ACTIVE, making their handling quite straight-forward in the concrete actor derived from the base class:

import akka.event.LoggingAdapter;
import akka.event.Logging;
import akka.actor.UntypedActor;
static public class MyFSM extends MyFSMBase {

  private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);

  @Override
  public void onReceive(Object o) {

    if (getState() == State.IDLE) {

      if (o instanceof SetTarget)
        init(((SetTarget) o).ref);

      else
        whenUnhandled(o);

    } else if (getState() == State.ACTIVE) {

      if (o == flush)
        setState(State.IDLE);

      else
        whenUnhandled(o);
    }
  }

  @Override
  public void transition(State old, State next) {
    if (old == State.ACTIVE) {
      getTarget().tell(new Batch(drainQueue()));
    }
  }

  private void whenUnhandled(Object o) {
    if (o instanceof Queue && isInitialized()) {
      enqueue(((Queue) o).o);
      setState(State.ACTIVE);

    } else {
      log.warning("received unknown message {} in state {}", o, getState());
    }
  }
}

The trick here is to factor out common functionality like whenUnhandled and transition in order to obtain a few well-defined points for reacting to change or insert logging.

State-Centric vs. Event-Centric

In the example above, the subjective complexity of state and events was roughly equal, making it a matter of taste whether to choose primary dispatch on either; in the example a state-based dispatch was chosen. Depending on how evenly the matrix of possible states and events is populated, it may be more practical to handle different events first and distinguish the states in the second tier. An example would be a state machine which has a multitude of internal states but handles only very few distinct events.

Contents