akka.persistence
Interface AtLeastOnceDelivery

All Superinterfaces:
Actor, Processor, ProcessorImpl, Recovery, RequiresMessageQueue<DequeBasedMessageQueueSemantics>, Snapshotter, Stash, StashFactory, StashSupport, UnrestrictedStash
All Known Implementing Classes:
AbstractPersistentActorWithAtLeastOnceDelivery, UntypedPersistentActorWithAtLeastOnceDelivery

public interface AtLeastOnceDelivery
extends Processor

Mix-in this trait with your PersistentActor to send messages with at-least-once delivery semantics to destinations. It takes care of re-sending messages when they have not been confirmed within a configurable timeout. Use the deliver(akka.actor.ActorPath, scala.Function1) method to send a message to a destination. Call the confirmDelivery(long) method when the destination has replied with a confirmation message.

At-least-once delivery implies that original message send order is not always retained and the destination may receive duplicate messages due to possible resends.

The interval between redelivery attempts can be defined by redeliverInterval(). After a number of delivery attempts a AtLeastOnceDelivery.UnconfirmedWarning message will be sent to self. The re-sending will still continue, but you can choose to call confirmDelivery(long) to cancel the re-sending.

The AtLeastOnceDelivery trait has a state consisting of unconfirmed messages and a sequence number. It does not store this state itself. You must persist events corresponding to the deliver and confirmDelivery invocations from your PersistentActor so that the state can be restored by calling the same methods during the recovery phase of the PersistentActor. Sometimes these events can be derived from other business level events, and sometimes you must create separate events. During recovery calls to deliver will not send out the message, but it will be sent later if no matching confirmDelivery was performed.

Support for snapshots is provided by getDeliverySnapshot() and setDeliverySnapshot(akka.persistence.AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot). The AtLeastOnceDeliverySnapshot contains the full delivery state, including unconfirmed messages. If you need a custom snapshot for other parts of the actor state you must also include the AtLeastOnceDeliverySnapshot. It is serialized using protobuf with the ordinary Akka serialization mechanism. It is easiest to include the bytes of the AtLeastOnceDeliverySnapshot as a blob in your custom snapshot.


Nested Class Summary
static class AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot
          Snapshot of current AtLeastOnceDelivery state.
static class AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot$
           
static class AtLeastOnceDelivery.Internal$
           
static class AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException
           
static class AtLeastOnceDelivery.UnconfirmedDelivery
          Information about a message that has not been confirmed.
static class AtLeastOnceDelivery.UnconfirmedDelivery$
           
static class AtLeastOnceDelivery.UnconfirmedWarning
           
static class AtLeastOnceDelivery.UnconfirmedWarning$
           
 
Nested classes/interfaces inherited from interface akka.persistence.Recovery
Recovery.State
 
Nested classes/interfaces inherited from interface akka.actor.Actor
Actor.emptyBehavior$
 
Method Summary
 void aroundPostStop()
          INTERNAL API
 void aroundPreRestart(java.lang.Throwable reason, scala.Option<java.lang.Object> message)
          INTERNAL API
 void aroundReceive(scala.PartialFunction<java.lang.Object,scala.runtime.BoxedUnit> receive, java.lang.Object message)
          INTERNAL API
 boolean confirmDelivery(long deliveryId)
          Call this method when a message has been confirmed by the destination, or to abort re-sending.
 int defaultMaxUnconfirmedMessages()
           
 scala.concurrent.duration.FiniteDuration defaultRedeliverInterval()
           
 int defaultWarnAfterNumberOfUnconfirmedAttempts()
           
 void deliver(ActorPath destination, scala.Function1<java.lang.Object,java.lang.Object> deliveryIdToMessage)
          Scala API: Send the message created by the deliveryIdToMessage function to the destination actor.
 long deliverySequenceNr()
           
 AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot getDeliverySnapshot()
          Full state of the AtLeastOnceDelivery.
 int maxUnconfirmedMessages()
          Maximum number of unconfirmed messages that this actor is allowed to hold in memory.
 long nextDeliverySequenceNr()
           
 int numberOfUnconfirmed()
          Number of messages that have not been confirmed yet.
 scala.concurrent.duration.FiniteDuration redeliverInterval()
          Interval between redelivery attempts.
 void redeliverOverdue()
           
 Cancellable redeliverTask()
           
 void send(long deliveryId, akka.persistence.AtLeastOnceDelivery.Internal.Delivery d, long timestamp)
           
 void setDeliverySnapshot(AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot snapshot)
          If snapshot from getDeliverySnapshot() was saved it will be received during recovery in a SnapshotOffer message and should be set with this method.
 scala.collection.immutable.SortedMap<java.lang.Object,akka.persistence.AtLeastOnceDelivery.Internal.Delivery> unconfirmed()
           
 int warnAfterNumberOfUnconfirmedAttempts()
          After this number of delivery attempts a AtLeastOnceDelivery.UnconfirmedWarning message will be sent to self.
 
Methods inherited from interface akka.persistence.Processor
persistenceId
 
Methods inherited from interface akka.persistence.ProcessorImpl
_persistenceId, aroundPreStart, deleteMessage, deleteMessage, deleteMessages, deleteMessages, flushJournalBatch, initializing, instanceId, nextSequenceNr, onRecoveryCompleted, onRecoveryFailure, onReplayFailure, onReplaySuccess, preRestart, preRestartDefault, preStart, processing, processorBatch, processorId, recoveryFinished, recoveryRunning, sequenceNr, snapshotterId, unhandled, unstashFilterPredicate
 
Methods inherited from interface akka.persistence.Recovery
_currentPersistent, _currentState, _lastSequenceNr, _recoveryFailureCause, _recoveryFailureMessage, currentPersistentMessage, extension, getCurrentPersistentMessage, journal, lastSequenceNr, prepareRestart, receiverStash, recoveryPending, recoveryStarted, replayFailed, replayStarted, runReceive, snapshotSequenceNr, updateLastSequenceNr, updateLastSequenceNr, withCurrentPersistent
 
Methods inherited from interface akka.persistence.Snapshotter
deleteSnapshot, deleteSnapshots, loadSnapshot, saveSnapshot, snapshotStore
 
Methods inherited from interface akka.actor.UnrestrictedStash
postStop
 
Methods inherited from interface akka.actor.Actor
aroundPostRestart, context, postRestart, receive, self, sender, supervisorStrategy
 
Methods inherited from interface akka.actor.StashSupport
actorCell, capacity, clearStash, context, enqueueFirst, mailbox, prepend, self, stash, theStash, unstash, unstashAll, unstashAll
 
Methods inherited from interface akka.actor.StashFactory
createStash
 

Method Detail

redeliverInterval

scala.concurrent.duration.FiniteDuration redeliverInterval()
Interval between redelivery attempts.

The default value can be configured with the akka.persistence.at-least-once-delivery.redeliver-interval configuration key. This method can be overridden by implementation classes to return non-default values.

Returns:
(undocumented)

defaultRedeliverInterval

scala.concurrent.duration.FiniteDuration defaultRedeliverInterval()

warnAfterNumberOfUnconfirmedAttempts

int warnAfterNumberOfUnconfirmedAttempts()
After this number of delivery attempts a AtLeastOnceDelivery.UnconfirmedWarning message will be sent to self. The count is reset after a restart.

The default value can be configured with the akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts configuration key. This method can be overridden by implementation classes to return non-default values.

Returns:
(undocumented)

defaultWarnAfterNumberOfUnconfirmedAttempts

int defaultWarnAfterNumberOfUnconfirmedAttempts()

maxUnconfirmedMessages

int maxUnconfirmedMessages()
Maximum number of unconfirmed messages that this actor is allowed to hold in memory. If this number is exceed deliver(akka.actor.ActorPath, scala.Function1) will not accept more messages and it will throw AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException.

The default value can be configured with the akka.persistence.at-least-once-delivery.max-unconfirmed-messages configuration key. This method can be overridden by implementation classes to return non-default values.

Returns:
(undocumented)

defaultMaxUnconfirmedMessages

int defaultMaxUnconfirmedMessages()

redeliverTask

Cancellable redeliverTask()

deliverySequenceNr

long deliverySequenceNr()

unconfirmed

scala.collection.immutable.SortedMap<java.lang.Object,akka.persistence.AtLeastOnceDelivery.Internal.Delivery> unconfirmed()

nextDeliverySequenceNr

long nextDeliverySequenceNr()

deliver

void deliver(ActorPath destination,
             scala.Function1<java.lang.Object,java.lang.Object> deliveryIdToMessage)
Scala API: Send the message created by the deliveryIdToMessage function to the destination actor. It will retry sending the message until the delivery is confirmed with confirmDelivery(long). Correlation between deliver and confirmDelivery is performed with the deliveryId that is provided as parameter to the deliveryIdToMessage function. The deliveryId is typically passed in the message to the destination, which replies with a message containing the same deliveryId.

The deliveryId is a strictly monotonically increasing sequence number without gaps. The same sequence is used for all destinations of the actor, i.e. when sending to multiple destinations the destinations will see gaps in the sequence if no translation is performed.

During recovery this method will not send out the message, but it will be sent later if no matching confirmDelivery was performed.

This method will throw AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException if numberOfUnconfirmed is greater than or equal to maxUnconfirmedMessages().

Parameters:
destination - (undocumented)
deliveryIdToMessage - (undocumented)

confirmDelivery

boolean confirmDelivery(long deliveryId)
Call this method when a message has been confirmed by the destination, or to abort re-sending.

Parameters:
deliveryId - (undocumented)
Returns:
true the first time the deliveryId is confirmed, i.e. false for duplicate confirm
See Also:
deliver(akka.actor.ActorPath, scala.Function1)

numberOfUnconfirmed

int numberOfUnconfirmed()
Number of messages that have not been confirmed yet.

Returns:
(undocumented)

redeliverOverdue

void redeliverOverdue()

send

void send(long deliveryId,
          akka.persistence.AtLeastOnceDelivery.Internal.Delivery d,
          long timestamp)

getDeliverySnapshot

AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot getDeliverySnapshot()
Full state of the AtLeastOnceDelivery. It can be saved with Snapshotter.saveSnapshot(java.lang.Object). During recovery the snapshot received in SnapshotOffer should be set with setDeliverySnapshot(akka.persistence.AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot).

The AtLeastOnceDeliverySnapshot contains the full delivery state, including unconfirmed messages. If you need a custom snapshot for other parts of the actor state you must also include the AtLeastOnceDeliverySnapshot. It is serialized using protobuf with the ordinary Akka serialization mechanism. It is easiest to include the bytes of the AtLeastOnceDeliverySnapshot as a blob in your custom snapshot.

Returns:
(undocumented)

setDeliverySnapshot

void setDeliverySnapshot(AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot snapshot)
If snapshot from getDeliverySnapshot() was saved it will be received during recovery in a SnapshotOffer message and should be set with this method.

Parameters:
snapshot - (undocumented)

aroundPreRestart

void aroundPreRestart(java.lang.Throwable reason,
                      scala.Option<java.lang.Object> message)
INTERNAL API

Specified by:
aroundPreRestart in interface Actor
Specified by:
aroundPreRestart in interface ProcessorImpl
Parameters:
reason - (undocumented)
message - (undocumented)

aroundPostStop

void aroundPostStop()
INTERNAL API

Specified by:
aroundPostStop in interface Actor
Specified by:
aroundPostStop in interface ProcessorImpl

aroundReceive

void aroundReceive(scala.PartialFunction<java.lang.Object,scala.runtime.BoxedUnit> receive,
                   java.lang.Object message)
INTERNAL API

Specified by:
aroundReceive in interface Actor
Specified by:
aroundReceive in interface Recovery
Parameters:
receive - (undocumented)
message - (undocumented)