Stash

Dependency

To use Akka Actor Typed, you must add the following dependency in your project:

sbt
libraryDependencies += "com.typesafe.akka" %% "akka-actor-typed" % "2.5.18"
Maven
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-actor-typed_2.12</artifactId>
  <version>2.5.18</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.typesafe.akka', name: 'akka-actor-typed_2.12', version: '2.5.18'
}

Introduction

Stashing enables an actor to temporarily buffer all or some messages that cannot or should not be handled using the actor’s current behavior.

A typical example when this is useful is if the actor has too load some initial state or initialize some resources before it can accept the first real message. Another example is when the actor is waiting for something to complete before processing next message.

Let’s illustrate these two with an example. It’s an actor that is used like a single access point to a value stored in a database. When it’s started it loads current state from the database, and while waiting for that initial value all incoming messages are stashed.

When a new state is saved in the database it also stashes incoming messages to make the processing sequential, one after the other without multiple pending writes.

Scala
import akka.actor.typed.scaladsl.StashBuffer

trait DB {
  def save(id: String, value: String): Future[Done]
  def load(id: String): Future[String]
}

object DataAccess {
  trait Command
  final case class Save(value: String, replyTo: ActorRef[Done]) extends Command
  final case class Get(replyTo: ActorRef[String]) extends Command
  private final case class InitialState(value: String) extends Command
  private final case object SaveSuccess extends Command
  private final case class DBError(cause: Throwable) extends Command

  def behavior(id: String, db: DB): Behavior[Command] =
    Behaviors.setup[Command] { context ⇒

      val buffer = StashBuffer[Command](capacity = 100)

      def init(): Behavior[Command] =
        Behaviors.receive[Command] { (context, message) ⇒
          message match {
            case InitialState(value) ⇒
              // now we are ready to handle stashed messages if any
              buffer.unstashAll(context, active(value))
            case DBError(cause) ⇒
              throw cause
            case other ⇒
              // stash all other messages for later processing
              buffer.stash(other)
              Behaviors.same
          }
        }

      def active(state: String): Behavior[Command] =
        Behaviors.receive { (context, message) ⇒
          message match {
            case Get(replyTo) ⇒
              replyTo ! state
              Behaviors.same
            case Save(value, replyTo) ⇒
              import context.executionContext
              db.save(id, value).onComplete {
                case Success(_)     ⇒ context.self ! SaveSuccess
                case Failure(cause) ⇒ context.self ! DBError(cause)
              }
              saving(value, replyTo)
          }
        }

      def saving(state: String, replyTo: ActorRef[Done]): Behavior[Command] =
        Behaviors.receive[Command] { (context, message) ⇒
          message match {
            case SaveSuccess ⇒
              replyTo ! Done
              buffer.unstashAll(context, active(state))
            case DBError(cause) ⇒
              throw cause
            case other ⇒
              buffer.stash(other)
              Behaviors.same
          }
        }

      import context.executionContext
      db.load(id).onComplete {
        case Success(value) ⇒
          context.self ! InitialState(value)
        case Failure(cause) ⇒
          context.self ! DBError(cause)
      }

      init()
    }
}
Java
import akka.actor.typed.javadsl.StashBuffer;

interface DB {
  CompletionStage<Done> save(String id, String value);
  CompletionStage<String> load(String id);
}

public static class DataAccess {

  static interface Command {
  }

  public static class Save implements Command {
    public final String payload;
    public final ActorRef<Done> replyTo;

    public Save(String payload, ActorRef<Done> replyTo) {
      this.payload = payload;
      this.replyTo = replyTo;
    }
  }

  public static class Get implements Command {
    public final ActorRef<String> replyTo;

    public Get(ActorRef<String> replyTo) {
      this.replyTo = replyTo;
    }
  }

  static class InitialState implements Command {
    public final String value;

    InitialState(String value) {
      this.value = value;
    }
  }

  static class SaveSuccess implements Command {
    public static final SaveSuccess instance = new SaveSuccess();

    private SaveSuccess() {
    }
  }

  static class DBError implements Command {
    public final RuntimeException cause;

    public DBError(RuntimeException cause) {
      this.cause = cause;
    }
  }


  private final StashBuffer<Command> buffer = StashBuffer.create(100);
  private final String id;
  private final DB db;

  public DataAccess(String id, DB db) {
    this.id = id;
    this.db = db;
  }

  Behavior<Command> behavior() {
    return Behaviors.setup(context -> {
      db.load(id)
          .whenComplete((value, cause) -> {
          if (cause == null)
            context.getSelf().tell(new InitialState(value));
          else
            context.getSelf().tell(new DBError(asRuntimeException(cause)));
      });

      return init();
    });
  }

  private Behavior<Command> init() {
    return Behaviors.receive(Command.class)
        .onMessage(InitialState.class, (context, message) -> {
          // now we are ready to handle stashed messages if any
          return buffer.unstashAll(context, active(message.value));
        })
        .onMessage(DBError.class, (context, message) -> {
          throw message.cause;
        })
        .onMessage(Command.class, (context, message) -> {
          // stash all other messages for later processing
          buffer.stash(message);
          return Behaviors.same();
        })
        .build();
  }

  private Behavior<Command> active(String state) {
    return Behaviors.receive(Command.class)
        .onMessage(Get.class, (context, message) -> {
          message.replyTo.tell(state);
          return Behaviors.same();
        })
        .onMessage(Save.class, (context, message) -> {
          db.save(id, message.payload)
            .whenComplete((value, cause) -> {
              if (cause == null)
                context.getSelf().tell(SaveSuccess.instance);
              else
                context.getSelf().tell(new DBError(asRuntimeException(cause)));
            });
          return saving(message.payload, message.replyTo);
        })
        .build();
  }

  private Behavior<Command> saving(String state, ActorRef<Done> replyTo) {
    return Behaviors.receive(Command.class)
        .onMessageEquals(SaveSuccess.instance, context -> {
          replyTo.tell(Done.getInstance());
          return buffer.unstashAll(context, active(state));
        })
        .onMessage(DBError.class, (context, message) -> {
          throw message.cause;
        })
        .onMessage(Command.class, (context, message) -> {
          buffer.stash(message);
          return Behaviors.same();
        })
        .build();
  }


  private static RuntimeException asRuntimeException(Throwable t) {
    // can't throw Throwable in lambdas
    if (t instanceof RuntimeException) {
      return (RuntimeException) t;
    } else {
      return new RuntimeException(t);
    }
  }

}

One important thing to be aware of is that the StashBuffer is a buffer and stashed messages will be kept in memory until they are unstashed (or the actor is stopped and garbage collected). It’s recommended to avoid stashing too many messages to avoid too much memory usage and even risking OutOfMemoryError if many actors are stashing many messages. Therefore the StashBuffer is bounded and the capacity of how many messages it can hold must be specified when it’s created.

If you try to stash more messages than the capacity a StashOverflowException will be thrown. You can use StashBuffer.isFull before stashing a message to avoid that and take other actions, such as dropping the message.

When unstashing the buffered messages by calling unstashAll the messages will be processed sequentially in the order they were added and all are processed unless an exception is thrown. The actor is unresponsive to other new messages until unstashAll is completed. That is another reason for keeping the number of stashed messages low. Actors that hog the message processing thread for too long can result in starvation of other actors.

That can be mitigated by using the StashBuffer.unstash with numberOfMessages parameter and then send a message to context.selfcontext.getSelf before continuing unstashing more. That means that other new messages may arrive in-between and those must be stashed to keep the original order of messages. It becomes more complicated, so better keep the number of stashed messages low.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.