akka.persistence
Class UntypedProcessor
java.lang.Object
akka.actor.UntypedActor
akka.persistence.UntypedProcessor
- All Implemented Interfaces:
- Actor, Stash, StashFactory, StashSupport, UnrestrictedStash, RequiresMessageQueue<DequeBasedMessageQueueSemantics>, Processor, ProcessorImpl, Recovery, Snapshotter
public abstract class UntypedProcessor
- extends UntypedActor
- implements Processor
Java API: an actor that persists (journals) messages of type Persistent
. Messages of other types
are not persisted.
import akka.persistence.Persistent;
import akka.persistence.Processor;
class MyProcessor extends UntypedProcessor {
public void onReceive(Object message) throws Exception {
if (message instanceof Persistent) {
// message has been written to journal
Persistent persistent = (Persistent)message;
Object payload = persistent.payload();
Long sequenceNr = persistent.sequenceNr();
// ...
} else {
// message has not been written to journal
}
}
}
// ...
ActorRef processor = getContext().actorOf(Props.create(MyProcessor.class), "myProcessor");
processor.tell(Persistent.create("foo"), null);
processor.tell("bar", null);
During start and restart, persistent messages are replayed to a processor so that it can recover internal
state from these messages. New messages sent to a processor during recovery do not interfere with replayed
messages, hence applications don't need to wait for a processor to complete its recovery.
Automated recovery can be turned off or customized by overriding the preStart
and preRestart
life
cycle hooks. If automated recovery is turned off, an application can explicitly recover a processor by
sending it a Recover
message.
Persistent
messages are assigned sequence numbers that are generated on a per-processor basis. A sequence
starts at 1L
and doesn't contain gaps unless a processor (logically) deletes a message.
During recovery, a processor internally buffers new messages until recovery completes, so that new messages
do not interfere with replayed messages. This internal buffer (the ''processor stash'') is isolated from the
''user stash'' inherited by akka.actor.Stash
. Processor
implementation classes can therefore use the
''user stash'' for stashing/unstashing both persistent and transient messages.
Processors can also store snapshots of internal state by calling saveSnapshot
. During recovery, a saved
snapshot is offered to the processor with a SnapshotOffer
message, followed by replayed messages, if any,
that are younger than the snapshot. Default is to offer the latest saved snapshot.
- See Also:
Processor
,
Recover
,
PersistentBatch
Methods inherited from class akka.actor.UntypedActor |
getContext, getSelf, getSender, onReceive, postRestart, postStop, preRestart, preStart, receive, supervisorStrategy, unhandled |
Methods inherited from class java.lang.Object |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
Methods inherited from interface akka.persistence.ProcessorImpl |
_persistenceId, aroundPostStop, aroundPreRestart, aroundPreStart, deleteMessage, deleteMessage, deleteMessages, deleteMessages, flushJournalBatch, initializing, instanceId, nextSequenceNr, onRecoveryCompleted, onRecoveryFailure, onReplayFailure, onReplaySuccess, preRestart, preRestartDefault, preStart, processing, processorBatch, processorId, recoveryFinished, recoveryRunning, sequenceNr, snapshotterId, unhandled, unstashFilterPredicate |
Methods inherited from interface akka.persistence.Recovery |
_currentPersistent, _currentState, _lastSequenceNr, _recoveryFailureCause, _recoveryFailureMessage, aroundReceive, currentPersistentMessage, extension, getCurrentPersistentMessage, journal, lastSequenceNr, prepareRestart, receiverStash, recoveryPending, recoveryStarted, replayFailed, replayStarted, runReceive, snapshotSequenceNr, updateLastSequenceNr, updateLastSequenceNr, withCurrentPersistent |
Methods inherited from interface akka.actor.StashSupport |
actorCell, capacity, clearStash, context, enqueueFirst, mailbox, prepend, self, stash, theStash, unstash, unstashAll, unstashAll |
UntypedProcessor
public UntypedProcessor()