akka.persistence
Class AbstractEventsourcedProcessor

java.lang.Object
  extended by akka.actor.AbstractActor
      extended by akka.persistence.AbstractEventsourcedProcessor
All Implemented Interfaces:
Actor, Stash, UnrestrictedStash, RequiresMessageQueue<DequeBasedMessageQueueSemantics>, EventsourcedProcessor, Processor, Recovery, Snapshotter

public abstract class AbstractEventsourcedProcessor
extends AbstractActor
implements EventsourcedProcessor

Java API: compatible with lambda expressions (to be used with ReceiveBuilder): command handler. Typically validates commands against current state (and/or by communication with other actors). On successful validation, one or more events are derived from a command and these events are then persisted by calling persist. Commands sent to event sourced processors must not be Persistent or PersistentBatch messages. In this case an UnsupportedOperationException is thrown by the processor.


Nested Class Summary
 
Nested classes/interfaces inherited from interface akka.actor.Actor
Actor.emptyBehavior$
 
Constructor Summary
AbstractEventsourcedProcessor()
           
 
Method Summary
 akka.actor.ActorCell actorCell()
           
 void aroundReceive(scala.PartialFunction<java.lang.Object,scala.runtime.BoxedUnit> receive, java.lang.Object message)
          INTERNAL API.
 int capacity()
           
 scala.collection.immutable.Vector<Envelope> clearStash()
          INTERNAL API.
 ActorContext context()
          INTERNAL API.
 Recovery.State currentState()
           
 void enqueueFirst(Envelope envelope)
          Enqueues envelope at the first position in the mailbox.
 scala.PartialFunction<java.lang.Object,scala.runtime.BoxedUnit> initialBehavior()
          INTERNAL API.
 DequeBasedMessageQueueSemantics mailbox()
          INTERNAL API.
<A> void
persist(A event, scala.Function1<A,scala.runtime.BoxedUnit> handler)
          Asynchronously persists event.
<A> void
persist(A event, Procedure<A> handler)
          Java API: asynchronously persists event.
<A> void
persist(java.lang.Iterable<A> events, Procedure<A> handler)
          Java API: asynchronously persists events in specified order.
<A> void
persist(scala.collection.immutable.Seq<A> events, scala.Function1<A,scala.runtime.BoxedUnit> handler)
          Asynchronously persists events in specified order.
 scala.collection.immutable.List<PersistentRepr> persistentEventBatch()
           
 Recovery.State persistingEvents()
          Event persisting state.
 scala.collection.immutable.List<scala.Tuple2<java.lang.Object,scala.Function1<java.lang.Object,scala.runtime.BoxedUnit>>> persistInvocations()
           
 void postStop()
          Calls super.postStop then unstashes all messages from the internal stash.
 void prepend(scala.collection.immutable.Seq<Envelope> others)
          Prepends others to this stash.
 void preRestart(java.lang.Throwable reason, scala.Option<java.lang.Object> message)
          Calls super.preRestart then unstashes all messages from the internal stash.
 Recovery.State processingCommands()
          Command processing state.
 akka.actor.StashSupport processorStash()
           
 scala.PartialFunction<java.lang.Object,scala.runtime.BoxedUnit> receive()
          This defines the initial actor behavior, it must return a partial function with the actor logic.
 void receive(scala.PartialFunction<java.lang.Object,scala.runtime.BoxedUnit> receive)
          Set up the initial receive behavior of the Actor.
 scala.PartialFunction<java.lang.Object,scala.runtime.BoxedUnit> receiveCommand()
          Command handler.
 scala.PartialFunction<java.lang.Object,scala.runtime.BoxedUnit> receiveRecover()
          Recovery handler that receives persisted events during recovery.
 Recovery.State recovering()
          Processor recovery state.
 scala.PartialFunction<java.lang.Object,scala.runtime.BoxedUnit> recoveryBehavior()
          INTERNAL API.
 ActorRef self()
          INTERNAL API.
 void stash()
          Adds the current message (the message that the actor received last) to the actor's stash.
 scala.collection.immutable.Vector<Envelope> theStash()
           
 void unstash()
          Prepends the oldest message in the stash to the mailbox, and then removes that message from the stash.
 void unstashAll()
          Prepends all messages in the stash to the mailbox, and then clears the stash.
 void unstashAll(scala.Function1<java.lang.Object,java.lang.Object> filterPredicate)
          INTERNAL API.
 
Methods inherited from class akka.actor.AbstractActor
emptyBehavior, getContext
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 
Methods inherited from interface akka.persistence.Processor
_processorId, aroundPostStop, aroundPreRestart, aroundPreStart, deleteMessage, deleteMessage, deleteMessages, deleteMessages, initializing, nextSequenceNr, onRecoveryFailure, onReplayFailure, onReplaySuccess, preRestartDefault, preStart, processing, processorBatch, processorId, recoveryFinished, recoveryRunning, sequenceNr, snapshotterId, unstashFilterPredicate
 
Methods inherited from interface akka.persistence.Recovery
_currentPersistent, _currentState, _lastSequenceNr, _recoveryFailureCause, _recoveryFailureMessage, currentPersistentMessage, extension, getCurrentPersistentMessage, journal, lastSequenceNr, prepareRestart, receiverStash, recoveryPending, recoveryStarted, replayFailed, replayStarted, snapshotSequenceNr
 
Methods inherited from interface akka.persistence.Snapshotter
deleteSnapshot, deleteSnapshots, loadSnapshot, saveSnapshot, snapshotStore
 
Methods inherited from interface akka.actor.Actor
aroundPostRestart, context, noSender, postRestart, self, sender, supervisorStrategy, unhandled
 

Constructor Detail

AbstractEventsourcedProcessor

public AbstractEventsourcedProcessor()
Method Detail

persist

public final <A> void persist(A event,
                              Procedure<A> handler)
Java API: asynchronously persists event. On successful persistence, handler is called with the persisted event. It is guaranteed that no new commands will be received by a processor between a call to persist and the execution of its handler. This also holds for multiple persist calls per received command. Internally, this is achieved by stashing new commands and unstashing them when the event has been persisted and handled. The stash used for that is an internal stash which doesn't interfere with the user stash inherited from UntypedProcessor.

An event handler may close over processor state and modify it. The getSender() of a persisted event is the sender of the corresponding command. This means that one can reply to a command sender within an event handler.

Within an event handler, applications usually update processor state using persisted event data, notify listeners and reply to command senders.

If persistence of an event fails, the processor will be stopped. This can be customized by handling PersistenceFailure in receiveCommand.

Parameters:
event - event to be persisted.
handler - handler for each persisted event

persist

public final <A> void persist(java.lang.Iterable<A> events,
                              Procedure<A> handler)
Java API: asynchronously persists events in specified order. This is equivalent to calling persist[A](event: A, handler: Procedure[A]) multiple times with the same handler, except that events are persisted atomically with this method.

Parameters:
events - events to be persisted.
handler - handler for each persisted events

receive

public scala.PartialFunction<java.lang.Object,scala.runtime.BoxedUnit> receive()
Description copied from interface: Actor
This defines the initial actor behavior, it must return a partial function with the actor logic.

Specified by:
receive in interface Actor
Specified by:
receive in interface EventsourcedProcessor
Overrides:
receive in class AbstractActor

receive

public void receive(scala.PartialFunction<java.lang.Object,scala.runtime.BoxedUnit> receive)
Description copied from class: AbstractActor
Set up the initial receive behavior of the Actor.

Overrides:
receive in class AbstractActor
Parameters:
receive - The receive behavior.

recovering

public Recovery.State recovering()
Processor recovery state. Waits for recovery completion and then changes to processingCommands


processingCommands

public Recovery.State processingCommands()
Command processing state. If event persistence is pending after processing a command, event persistence is triggered and state changes to persistingEvents.

There's no need to loop commands though the journal any more i.e. they can now be directly offered as LoopSuccess to the state machine implemented by Processor.


persistingEvents

public Recovery.State persistingEvents()
Event persisting state. Remains until pending events are persisted and then changes state to processingCommands. Only events to be persisted are processed. All other messages are stashed internally.


recoveryBehavior

public scala.PartialFunction<java.lang.Object,scala.runtime.BoxedUnit> recoveryBehavior()
INTERNAL API.

This is a def and not a val because of binary compatibility in 2.3.x. It is cached where it is used.


persistInvocations

public scala.collection.immutable.List<scala.Tuple2<java.lang.Object,scala.Function1<java.lang.Object,scala.runtime.BoxedUnit>>> persistInvocations()

persistentEventBatch

public scala.collection.immutable.List<PersistentRepr> persistentEventBatch()

currentState

public Recovery.State currentState()

processorStash

public akka.actor.StashSupport processorStash()

persist

public <A> void persist(A event,
                        scala.Function1<A,scala.runtime.BoxedUnit> handler)
Asynchronously persists event. On successful persistence, handler is called with the persisted event. It is guaranteed that no new commands will be received by a processor between a call to persist and the execution of its handler. This also holds for multiple persist calls per received command. Internally, this is achieved by stashing new commands and unstashing them when the event has been persisted and handled. The stash used for that is an internal stash which doesn't interfere with the user stash inherited from Processor.

An event handler may close over processor state and modify it. The sender of a persisted event is the sender of the corresponding command. This means that one can reply to a command sender within an event handler.

Within an event handler, applications usually update processor state using persisted event data, notify listeners and reply to command senders.

If persistence of an event fails, the processor will be stopped. This can be customized by handling PersistenceFailure in receiveCommand.

Parameters:
event - event to be persisted.
handler - handler for each persisted event

persist

public <A> void persist(scala.collection.immutable.Seq<A> events,
                        scala.Function1<A,scala.runtime.BoxedUnit> handler)
Asynchronously persists events in specified order. This is equivalent to calling persist[A](event: A)(handler: A => Unit) multiple times with the same handler, except that events are persisted atomically with this method.

Parameters:
events - events to be persisted.
handler - handler for each persisted events

receiveRecover

public scala.PartialFunction<java.lang.Object,scala.runtime.BoxedUnit> receiveRecover()
Recovery handler that receives persisted events during recovery. If a state snapshot has been captured and saved, this handler will receive a SnapshotOffer message followed by events that are younger than the offered snapshot.

This handler must not have side-effects other than changing processor state i.e. it should not perform actions that may fail, such as interacting with external services, for example.

See Also:
Recover

receiveCommand

public scala.PartialFunction<java.lang.Object,scala.runtime.BoxedUnit> receiveCommand()
Command handler. Typically validates commands against current state (and/or by communication with other actors). On successful validation, one or more events are derived from a command and these events are then persisted by calling persist. Commands sent to event sourced processors should not be Persistent messages.


unstashAll

public void unstashAll()
Prepends all messages in the stash to the mailbox, and then clears the stash.

Messages from the stash are enqueued to the mailbox until the capacity of the mailbox (if any) has been reached. In case a bounded mailbox overflows, a MessageQueueAppendFailedException is thrown.

The stash is guaranteed to be empty after calling unstashAll().


aroundReceive

public void aroundReceive(scala.PartialFunction<java.lang.Object,scala.runtime.BoxedUnit> receive,
                          java.lang.Object message)
INTERNAL API.

Specified by:
aroundReceive in interface Actor
Specified by:
aroundReceive in interface Recovery
Parameters:
receive - current behavior.
message - current message.

preRestart

public void preRestart(java.lang.Throwable reason,
                       scala.Option<java.lang.Object> message)
Calls super.preRestart then unstashes all messages from the internal stash.

Specified by:
preRestart in interface Actor
Specified by:
preRestart in interface Processor
Specified by:
preRestart in interface UnrestrictedStash
Parameters:
reason - the Throwable that caused the restart to happen
message - optionally the current message the actor processed when failing, if applicable

Is called on a crashed Actor right BEFORE it is restarted to allow clean up of resources before Actor is terminated.


postStop

public void postStop()
Calls super.postStop then unstashes all messages from the internal stash.

Specified by:
postStop in interface Actor
Specified by:
postStop in interface UnrestrictedStash

initialBehavior

public scala.PartialFunction<java.lang.Object,scala.runtime.BoxedUnit> initialBehavior()
INTERNAL API.

Only here for binary compatibility in 2.3.x.


context

public ActorContext context()
INTERNAL API.

Context of the actor that uses this stash.


self

public ActorRef self()
INTERNAL API.

Self reference of the actor that uses this stash.


theStash

public scala.collection.immutable.Vector<Envelope> theStash()

actorCell

public akka.actor.ActorCell actorCell()

capacity

public int capacity()

mailbox

public DequeBasedMessageQueueSemantics mailbox()
INTERNAL API.

The actor's deque-based message queue. mailbox.queue is the underlying Deque.


stash

public void stash()
Adds the current message (the message that the actor received last) to the actor's stash.

Throws:
StashOverflowException - in case of a stash capacity violation
java.lang.IllegalStateException - if the same message is stashed more than once

prepend

public void prepend(scala.collection.immutable.Seq<Envelope> others)
Prepends others to this stash. This method is optimized for a large stash and small others.


unstash

public void unstash()
Prepends the oldest message in the stash to the mailbox, and then removes that message from the stash.

Messages from the stash are enqueued to the mailbox until the capacity of the mailbox (if any) has been reached. In case a bounded mailbox overflows, a MessageQueueAppendFailedException is thrown.

The unstashed message is guaranteed to be removed from the stash regardless if the unstash() call successfully returns or throws an exception.


unstashAll

public void unstashAll(scala.Function1<java.lang.Object,java.lang.Object> filterPredicate)
INTERNAL API.

Prepends selected messages in the stash, applying filterPredicate, to the mailbox, and then clears the stash.

Messages from the stash are enqueued to the mailbox until the capacity of the mailbox (if any) has been reached. In case a bounded mailbox overflows, a MessageQueueAppendFailedException is thrown.

The stash is guaranteed to be empty after calling unstashAll(Any => Boolean).

Parameters:
filterPredicate - only stashed messages selected by this predicate are prepended to the mailbox.

clearStash

public scala.collection.immutable.Vector<Envelope> clearStash()
INTERNAL API.

Clears the stash and and returns all envelopes that have not been unstashed.


enqueueFirst

public void enqueueFirst(Envelope envelope)
Enqueues envelope at the first position in the mailbox. If the message contained in the envelope is a Terminated message, it will be ensured that it can be re-received by the actor.