|
|||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | ||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |
public interface AtLeastOnceDelivery
Mix-in this trait with your PersistentActor
to send messages with at-least-once
delivery semantics to destinations. It takes care of re-sending messages when they
have not been confirmed within a configurable timeout. Use the deliver(akka.actor.ActorPath, scala.Function1
method to
send a message to a destination. Call the confirmDelivery(long)
method when the destination
has replied with a confirmation message.
At-least-once delivery implies that original message send order is not always retained and the destination may receive duplicate messages due to possible resends.
The interval between redelivery attempts can be defined by redeliverInterval()
.
After a number of delivery attempts a AtLeastOnceDelivery.UnconfirmedWarning
message
will be sent to self
. The re-sending will still continue, but you can choose to call
confirmDelivery(long)
to cancel the re-sending.
The AtLeastOnceDelivery
trait has a state consisting of unconfirmed messages and a
sequence number. It does not store this state itself. You must persist events corresponding
to the deliver
and confirmDelivery
invocations from your PersistentActor
so that the
state can be restored by calling the same methods during the recovery phase of the
PersistentActor
. Sometimes these events can be derived from other business level events,
and sometimes you must create separate events. During recovery calls to deliver
will not send out the message, but it will be sent later if no matching confirmDelivery
was performed.
Support for snapshots is provided by getDeliverySnapshot()
and setDeliverySnapshot(akka.persistence.AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot)
.
The AtLeastOnceDeliverySnapshot
contains the full delivery state, including unconfirmed messages.
If you need a custom snapshot for other parts of the actor state you must also include the
AtLeastOnceDeliverySnapshot
. It is serialized using protobuf with the ordinary Akka
serialization mechanism. It is easiest to include the bytes of the AtLeastOnceDeliverySnapshot
as a blob in your custom snapshot.
Nested Class Summary | |
---|---|
static class |
AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot
Snapshot of current AtLeastOnceDelivery state. |
static class |
AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot$
|
static class |
AtLeastOnceDelivery.Internal$
|
static class |
AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException
|
static class |
AtLeastOnceDelivery.UnconfirmedDelivery
Information about a message that has not been confirmed. |
static class |
AtLeastOnceDelivery.UnconfirmedDelivery$
|
static class |
AtLeastOnceDelivery.UnconfirmedWarning
|
static class |
AtLeastOnceDelivery.UnconfirmedWarning$
|
Nested classes/interfaces inherited from interface akka.persistence.Recovery |
---|
Recovery.State |
Nested classes/interfaces inherited from interface akka.actor.Actor |
---|
Actor.emptyBehavior$ |
Method Summary | |
---|---|
void |
aroundPostStop()
INTERNAL API |
void |
aroundPreRestart(java.lang.Throwable reason,
scala.Option<java.lang.Object> message)
INTERNAL API |
void |
aroundReceive(scala.PartialFunction<java.lang.Object,scala.runtime.BoxedUnit> receive,
java.lang.Object message)
INTERNAL API |
boolean |
confirmDelivery(long deliveryId)
Call this method when a message has been confirmed by the destination, or to abort re-sending. |
int |
defaultMaxUnconfirmedMessages()
|
scala.concurrent.duration.FiniteDuration |
defaultRedeliverInterval()
|
int |
defaultWarnAfterNumberOfUnconfirmedAttempts()
|
void |
deliver(ActorPath destination,
scala.Function1<java.lang.Object,java.lang.Object> deliveryIdToMessage)
Scala API: Send the message created by the deliveryIdToMessage function to
the destination actor. |
long |
deliverySequenceNr()
|
AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot |
getDeliverySnapshot()
Full state of the AtLeastOnceDelivery . |
int |
maxUnconfirmedMessages()
Maximum number of unconfirmed messages that this actor is allowed to hold in memory. |
long |
nextDeliverySequenceNr()
|
int |
numberOfUnconfirmed()
Number of messages that have not been confirmed yet. |
scala.concurrent.duration.FiniteDuration |
redeliverInterval()
Interval between redelivery attempts. |
void |
redeliverOverdue()
|
Cancellable |
redeliverTask()
|
void |
send(long deliveryId,
akka.persistence.AtLeastOnceDelivery.Internal.Delivery d,
long timestamp)
|
void |
setDeliverySnapshot(AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot snapshot)
If snapshot from getDeliverySnapshot() was saved it will be received during recovery
in a SnapshotOffer message and should be set with this method. |
scala.collection.immutable.SortedMap<java.lang.Object,akka.persistence.AtLeastOnceDelivery.Internal.Delivery> |
unconfirmed()
|
int |
warnAfterNumberOfUnconfirmedAttempts()
After this number of delivery attempts a AtLeastOnceDelivery.UnconfirmedWarning message
will be sent to self . |
Methods inherited from interface akka.persistence.Processor |
---|
persistenceId |
Methods inherited from interface akka.persistence.ProcessorImpl |
---|
_persistenceId, aroundPreStart, deleteMessage, deleteMessage, deleteMessages, deleteMessages, flushJournalBatch, initializing, instanceId, nextSequenceNr, onRecoveryCompleted, onRecoveryFailure, onReplayFailure, onReplaySuccess, preRestart, preRestartDefault, preStart, processing, processorBatch, processorId, recoveryFinished, recoveryRunning, sequenceNr, snapshotterId, unhandled, unstashFilterPredicate |
Methods inherited from interface akka.persistence.Recovery |
---|
_currentPersistent, _currentState, _lastSequenceNr, _recoveryFailureCause, _recoveryFailureMessage, currentPersistentMessage, extension, getCurrentPersistentMessage, journal, lastSequenceNr, prepareRestart, receiverStash, recoveryPending, recoveryStarted, replayFailed, replayStarted, runReceive, snapshotSequenceNr, updateLastSequenceNr, updateLastSequenceNr, withCurrentPersistent |
Methods inherited from interface akka.persistence.Snapshotter |
---|
deleteSnapshot, deleteSnapshots, loadSnapshot, saveSnapshot, snapshotStore |
Methods inherited from interface akka.actor.UnrestrictedStash |
---|
postStop |
Methods inherited from interface akka.actor.Actor |
---|
aroundPostRestart, context, postRestart, receive, self, sender, supervisorStrategy |
Methods inherited from interface akka.actor.StashSupport |
---|
actorCell, capacity, clearStash, context, enqueueFirst, mailbox, prepend, self, stash, theStash, unstash, unstashAll, unstashAll |
Methods inherited from interface akka.actor.StashFactory |
---|
createStash |
Method Detail |
---|
scala.concurrent.duration.FiniteDuration redeliverInterval()
The default value can be configured with the
akka.persistence.at-least-once-delivery.redeliver-interval
configuration key. This method can be overridden by implementation classes to return
non-default values.
scala.concurrent.duration.FiniteDuration defaultRedeliverInterval()
int warnAfterNumberOfUnconfirmedAttempts()
AtLeastOnceDelivery.UnconfirmedWarning
message
will be sent to self
. The count is reset after a restart.
The default value can be configured with the
akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts
configuration key. This method can be overridden by implementation classes to return
non-default values.
int defaultWarnAfterNumberOfUnconfirmedAttempts()
int maxUnconfirmedMessages()
deliver(akka.actor.ActorPath, scala.Function1)
will not accept more messages and it will throw
AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException
.
The default value can be configured with the
akka.persistence.at-least-once-delivery.max-unconfirmed-messages
configuration key. This method can be overridden by implementation classes to return
non-default values.
int defaultMaxUnconfirmedMessages()
Cancellable redeliverTask()
long deliverySequenceNr()
scala.collection.immutable.SortedMap<java.lang.Object,akka.persistence.AtLeastOnceDelivery.Internal.Delivery> unconfirmed()
long nextDeliverySequenceNr()
void deliver(ActorPath destination, scala.Function1<java.lang.Object,java.lang.Object> deliveryIdToMessage)
deliveryIdToMessage
function to
the destination
actor. It will retry sending the message until
the delivery is confirmed with confirmDelivery(long)
. Correlation
between deliver
and confirmDelivery
is performed with the
deliveryId
that is provided as parameter to the deliveryIdToMessage
function. The deliveryId
is typically passed in the message to the
destination, which replies with a message containing the same deliveryId
.
The deliveryId
is a strictly monotonically increasing sequence number without
gaps. The same sequence is used for all destinations of the actor, i.e. when sending
to multiple destinations the destinations will see gaps in the sequence if no
translation is performed.
During recovery this method will not send out the message, but it will be sent
later if no matching confirmDelivery
was performed.
This method will throw AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException
if numberOfUnconfirmed
is greater than or equal to maxUnconfirmedMessages()
.
destination
- (undocumented)deliveryIdToMessage
- (undocumented)boolean confirmDelivery(long deliveryId)
deliveryId
- (undocumented)
true
the first time the deliveryId
is confirmed, i.e. false
for duplicate confirmdeliver(akka.actor.ActorPath, scala.Function1)
int numberOfUnconfirmed()
void redeliverOverdue()
void send(long deliveryId, akka.persistence.AtLeastOnceDelivery.Internal.Delivery d, long timestamp)
AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot getDeliverySnapshot()
AtLeastOnceDelivery
. It can be saved with Snapshotter.saveSnapshot(java.lang.Object)
.
During recovery the snapshot received in SnapshotOffer
should be set
with setDeliverySnapshot(akka.persistence.AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot)
.
The AtLeastOnceDeliverySnapshot
contains the full delivery state, including unconfirmed messages.
If you need a custom snapshot for other parts of the actor state you must also include the
AtLeastOnceDeliverySnapshot
. It is serialized using protobuf with the ordinary Akka
serialization mechanism. It is easiest to include the bytes of the AtLeastOnceDeliverySnapshot
as a blob in your custom snapshot.
void setDeliverySnapshot(AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot snapshot)
getDeliverySnapshot()
was saved it will be received during recovery
in a SnapshotOffer
message and should be set with this method.
snapshot
- (undocumented)void aroundPreRestart(java.lang.Throwable reason, scala.Option<java.lang.Object> message)
aroundPreRestart
in interface Actor
aroundPreRestart
in interface ProcessorImpl
reason
- (undocumented)message
- (undocumented)void aroundPostStop()
aroundPostStop
in interface Actor
aroundPostStop
in interface ProcessorImpl
void aroundReceive(scala.PartialFunction<java.lang.Object,scala.runtime.BoxedUnit> receive, java.lang.Object message)
aroundReceive
in interface Actor
aroundReceive
in interface Recovery
receive
- (undocumented)message
- (undocumented)
|
|||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | ||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |