akka.persistence
Class UntypedProcessor

java.lang.Object
  extended by akka.actor.UntypedActor
      extended by akka.persistence.UntypedProcessor
All Implemented Interfaces:
Actor, Stash, UnrestrictedStash, RequiresMessageQueue<DequeBasedMessageQueueSemantics>, Processor, Recovery, Snapshotter
Direct Known Subclasses:
UntypedEventsourcedProcessor

public abstract class UntypedProcessor
extends UntypedActor
implements Processor

Java API: an actor that persists (journals) messages of type Persistent. Messages of other types are not persisted.


 import akka.persistence.Persistent;
 import akka.persistence.Processor;

 class MyProcessor extends UntypedProcessor {
     public void onReceive(Object message) throws Exception {
         if (message instanceof Persistent) {
             // message has been written to journal
             Persistent persistent = (Persistent)message;
             Object payload = persistent.payload();
             Long sequenceNr = persistent.sequenceNr();
             // ...
         } else {
             // message has not been written to journal
         }
     }
 }

 // ...

 ActorRef processor = getContext().actorOf(Props.create(MyProcessor.class), "myProcessor");

 processor.tell(Persistent.create("foo"), null);
 processor.tell("bar", null);
 

During start and restart, persistent messages are replayed to a processor so that it can recover internal state from these messages. New messages sent to a processor during recovery do not interfere with replayed messages, hence applications don't need to wait for a processor to complete its recovery.

Automated recovery can be turned off or customized by overriding the preStart and preRestart life cycle hooks. If automated recovery is turned off, an application can explicitly recover a processor by sending it a Recover message.

Persistent messages are assigned sequence numbers that are generated on a per-processor basis. A sequence starts at 1L and doesn't contain gaps unless a processor (logically) deletes a message.

During recovery, a processor internally buffers new messages until recovery completes, so that new messages do not interfere with replayed messages. This internal buffer (the ''processor stash'') is isolated from the ''user stash'' inherited by akka.actor.Stash. Processor implementation classes can therefore use the ''user stash'' for stashing/unstashing both persistent and transient messages.

Processors can also store snapshots of internal state by calling saveSnapshot. During recovery, a saved snapshot is offered to the processor with a SnapshotOffer message, followed by replayed messages, if any, that are younger than the snapshot. Default is to offer the latest saved snapshot.

See Also:
Processor, Recover, PersistentBatch

Nested Class Summary
 
Nested classes/interfaces inherited from interface akka.actor.Actor
Actor.emptyBehavior$
 
Constructor Summary
UntypedProcessor()
           
 
Method Summary
 akka.actor.ActorCell actorCell()
           
 int capacity()
           
 scala.collection.immutable.Vector<Envelope> clearStash()
          INTERNAL API.
 ActorContext context()
          INTERNAL API.
 void enqueueFirst(Envelope envelope)
          Enqueues envelope at the first position in the mailbox.
 DequeBasedMessageQueueSemantics mailbox()
          INTERNAL API.
 void prepend(scala.collection.immutable.Seq<Envelope> others)
          Prepends others to this stash.
 ActorRef self()
          INTERNAL API.
 void stash()
          Adds the current message (the message that the actor received last) to the actor's stash.
 scala.collection.immutable.Vector<Envelope> theStash()
           
 void unstash()
          Prepends the oldest message in the stash to the mailbox, and then removes that message from the stash.
 void unstashAll()
          Prepends all messages in the stash to the mailbox, and then clears the stash.
 void unstashAll(scala.Function1<java.lang.Object,java.lang.Object> filterPredicate)
          INTERNAL API.
 
Methods inherited from class akka.actor.UntypedActor
getContext, getSelf, getSender, onReceive, postRestart, postStop, preRestart, preStart, receive, supervisorStrategy, unhandled
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 
Methods inherited from interface akka.persistence.Processor
_processorId, aroundPostStop, aroundPreRestart, aroundPreStart, deleteMessage, deleteMessage, deleteMessages, deleteMessages, initializing, nextSequenceNr, onRecoveryFailure, onReplayFailure, onReplaySuccess, preRestart, preRestartDefault, preStart, processing, processorBatch, processorId, recoveryFinished, recoveryRunning, sequenceNr, snapshotterId, unstashFilterPredicate
 
Methods inherited from interface akka.persistence.Recovery
_currentPersistent, _currentState, _lastSequenceNr, _recoveryFailureCause, _recoveryFailureMessage, aroundReceive, currentPersistentMessage, extension, getCurrentPersistentMessage, journal, lastSequenceNr, prepareRestart, receiverStash, recoveryPending, recoveryStarted, replayFailed, replayStarted, snapshotSequenceNr
 
Methods inherited from interface akka.persistence.Snapshotter
deleteSnapshot, deleteSnapshots, loadSnapshot, saveSnapshot, snapshotStore
 
Methods inherited from interface akka.actor.UnrestrictedStash
postStop
 
Methods inherited from interface akka.actor.Actor
aroundPostRestart, context, noSender, postRestart, receive, self, sender, supervisorStrategy, unhandled
 

Constructor Detail

UntypedProcessor

public UntypedProcessor()
Method Detail

context

public ActorContext context()
INTERNAL API.

Context of the actor that uses this stash.


self

public ActorRef self()
INTERNAL API.

Self reference of the actor that uses this stash.


theStash

public scala.collection.immutable.Vector<Envelope> theStash()

actorCell

public akka.actor.ActorCell actorCell()

capacity

public int capacity()

mailbox

public DequeBasedMessageQueueSemantics mailbox()
INTERNAL API.

The actor's deque-based message queue. mailbox.queue is the underlying Deque.


stash

public void stash()
Adds the current message (the message that the actor received last) to the actor's stash.

Throws:
StashOverflowException - in case of a stash capacity violation
java.lang.IllegalStateException - if the same message is stashed more than once

prepend

public void prepend(scala.collection.immutable.Seq<Envelope> others)
Prepends others to this stash. This method is optimized for a large stash and small others.


unstash

public void unstash()
Prepends the oldest message in the stash to the mailbox, and then removes that message from the stash.

Messages from the stash are enqueued to the mailbox until the capacity of the mailbox (if any) has been reached. In case a bounded mailbox overflows, a MessageQueueAppendFailedException is thrown.

The unstashed message is guaranteed to be removed from the stash regardless if the unstash() call successfully returns or throws an exception.


unstashAll

public void unstashAll()
Prepends all messages in the stash to the mailbox, and then clears the stash.

Messages from the stash are enqueued to the mailbox until the capacity of the mailbox (if any) has been reached. In case a bounded mailbox overflows, a MessageQueueAppendFailedException is thrown.

The stash is guaranteed to be empty after calling unstashAll().


unstashAll

public void unstashAll(scala.Function1<java.lang.Object,java.lang.Object> filterPredicate)
INTERNAL API.

Prepends selected messages in the stash, applying filterPredicate, to the mailbox, and then clears the stash.

Messages from the stash are enqueued to the mailbox until the capacity of the mailbox (if any) has been reached. In case a bounded mailbox overflows, a MessageQueueAppendFailedException is thrown.

The stash is guaranteed to be empty after calling unstashAll(Any => Boolean).

Parameters:
filterPredicate - only stashed messages selected by this predicate are prepended to the mailbox.

clearStash

public scala.collection.immutable.Vector<Envelope> clearStash()
INTERNAL API.

Clears the stash and and returns all envelopes that have not been unstashed.


enqueueFirst

public void enqueueFirst(Envelope envelope)
Enqueues envelope at the first position in the mailbox. If the message contained in the envelope is a Terminated message, it will be ensured that it can be re-received by the actor.