Java API: an persistent actor - can be used to implement command or event sourcing.
Java API compatible with lambda expressions
Java API compatible with lambda expressions
Use this class instead of UntypedPersistentActor
to send messages
with at-least-once delivery semantics to destinations.
Full documentation in AtLeastOnceDelivery.
Java API: compatible with lambda expressions (to be used with akka.japi.pf.ReceiveBuilder)
Java API: compatible with lambda expressions (to be used with akka.japi.pf.ReceiveBuilder)
Mix-in this trait with your PersistentActor
to send messages with at-least-once
delivery semantics to destinations.
Mix-in this trait with your PersistentActor
to send messages with at-least-once
delivery semantics to destinations. It takes care of re-sending messages when they
have not been confirmed within a configurable timeout. Use the #deliver method to
send a message to a destination. Call the #confirmDelivery method when the destination
has replied with a confirmation message.
At-least-once delivery implies that original message send order is not always retained and the destination may receive duplicate messages due to possible resends.
The interval between redelivery attempts can be defined by #redeliverInterval.
After a number of delivery attempts a AtLeastOnceDelivery.UnconfirmedWarning message
will be sent to self
. The re-sending will still continue, but you can choose to call
#confirmDelivery to cancel the re-sending.
The AtLeastOnceDelivery
trait has a state consisting of unconfirmed messages and a
sequence number. It does not store this state itself. You must persist events corresponding
to the deliver
and confirmDelivery
invocations from your PersistentActor
so that the
state can be restored by calling the same methods during the recovery phase of the
PersistentActor
. Sometimes these events can be derived from other business level events,
and sometimes you must create separate events. During recovery calls to deliver
will not send out the message, but it will be sent later if no matching confirmDelivery
was performed.
Support for snapshots is provided by #getDeliverySnapshot and #setDeliverySnapshot.
The AtLeastOnceDeliverySnapshot
contains the full delivery state, including unconfirmed messages.
If you need a custom snapshot for other parts of the actor state you must also include the
AtLeastOnceDeliverySnapshot
. It is serialized using protobuf with the ordinary Akka
serialization mechanism. It is easiest to include the bytes of the AtLeastOnceDeliverySnapshot
as a blob in your custom snapshot.
Persistence extension.
Sent to a Processor if a journal fails to write a Persistent message.
Sent to a Processor if a journal fails to write a Persistent message. If
not handled, an akka.actor.ActorKilledException
is thrown by that processor.
payload of the persistent message.
sequence number of the persistent message.
failure cause.
Persistence configuration.
An persistent Actor - can be used to implement command or event sourcing.
Plugin API: representation of a persistent message in the journal plugin API.
Plugin API: representation of a persistent message in the journal plugin API.
A view replicates the persistent message stream of a PersistentActor.
A view replicates the persistent message stream of a PersistentActor. Implementation classes receive the message stream directly from the Journal. These messages can be processed to update internal state in order to maintain an (eventual consistent) view of the state of the corresponding persistent actor. A persistent view can also run on a different node, provided that a replicated journal is used.
Implementation classes refer to a persistent actors' message stream by implementing persistenceId
with the corresponding (shared) identifier value.
Views can also store snapshots of internal state by calling autoUpdate. The snapshots of a view are independent of those of the referenced persistent actor. During recovery, a saved snapshot is offered to the view with a SnapshotOffer message, followed by replayed messages, if any, that are younger than the snapshot. Default is to offer the latest saved snapshot.
By default, a view automatically updates itself with an interval returned by autoUpdateInterval
.
This method can be overridden by implementation classes to define a view instance-specific update
interval. The default update interval for all views of an actor system can be configured with the
akka.persistence.view.auto-update-interval
configuration key. Applications may trigger additional
view updates by sending the view Update requests. See also methods
Instructs a persistent actor to recover itself.
Instructs a persistent actor to recover itself. Recovery will start from a snapshot if the persistent actor has
previously saved one or more snapshots and at least one of these snapshots matches the specified
fromSnapshot
criteria. Otherwise, recovery will start from scratch by replaying all journaled
messages.
If recovery starts from a snapshot, the persistent actor is offered that snapshot with a SnapshotOffer
message, followed by replayed messages, if any, that are younger than the snapshot, up to the
specified upper sequence number bound (toSequenceNr
).
criteria for selecting a saved snapshot from which recovery should start. Default is latest (= youngest) snapshot.
upper sequence number bound (inclusive) for recovery. Default is no upper bound.
maximum number of messages to replay. Default is no limit.
Recovery state machine that loads snapshots and replays messages.
Recovery state machine that loads snapshots and replays messages.
Sent to a Processor if a journal fails to replay messages or fetch that processor's highest sequence number.
Sent to a Processor if a journal fails to replay messages or fetch that processor's highest sequence number. If not handled, the prossor will be stopped.
Sent to a PersistentActor after failed saving of a snapshot.
Sent to a PersistentActor after failed saving of a snapshot.
snapshot metadata.
failure cause.
Sent to a PersistentActor after successful saving of a snapshot.
Sent to a PersistentActor after successful saving of a snapshot.
snapshot metadata.
Plugin API: a selected snapshot matching SnapshotSelectionCriteria.
Plugin API: a selected snapshot matching SnapshotSelectionCriteria.
snapshot metadata.
snapshot.
Snapshot metadata.
Snapshot metadata.
id of persistent actor from which the snapshot was taken.
sequence number at which the snapshot was taken.
time at which the snapshot was saved.
Offers a PersistentActor a previously saved snapshot
during recovery.
Offers a PersistentActor a previously saved snapshot
during recovery. This offer is received
before any further replayed messages.
Selection criteria for loading and deleting snapshots.
Selection criteria for loading and deleting snapshots.
upper bound for a selected snapshot's sequence number. Default is no upper bound.
upper bound for a selected snapshot's timestamp. Default is no upper bound.
Snapshot API on top of the internal snapshot protocol.
Java API: an persistent actor - can be used to implement command or event sourcing.
Java API: Use this class instead of UntypedPersistentActor
to send messages
with at-least-once delivery semantics to destinations.
Java API: Use this class instead of UntypedPersistentActor
to send messages
with at-least-once delivery semantics to destinations.
Full documentation in AtLeastOnceDelivery.
Java API.
Java API.
Instructs a PersistentView to update itself.
Instructs a PersistentView to update itself. This will run a single incremental message replay with
all messages from the corresponding persistent id's journal that have not yet been consumed by the view.
To update a view with messages that have been written after handling this request, another Update
request must be sent to the view.
if true
, processing of further messages sent to the view will be delayed until the
incremental message replay, triggered by this update request, completes. If false
,
any message sent to the view may interleave with replayed Persistent message
stream.
maximum number of messages to replay when handling this update request. Defaults
to Long.MaxValue
(i.e. no limit).
Java API: compatible with lambda expressions (to be used with akka.japi.pf.ReceiveBuilder): command handler.
Java API: compatible with lambda expressions (to be used with akka.japi.pf.ReceiveBuilder):
command handler. Typically validates commands against current state (and/or by
communication with other actors). On successful validation, one or more events are
derived from a command and these events are then persisted by calling persist
.
Commands sent to event sourced processors must not be Persistent or
PersistentBatch messages. In this case an UnsupportedOperationException
is
thrown by the processor.
(Since version 2.3.4) AbstractEventsourcedProcessor will be removed in 2.4.x, instead extend the API equivalent akka.persistence.PersistentActor
Java API: compatible with lambda expressions
Java API: compatible with lambda expressions
An actor that persists (journals) messages of type Persistent. Messages of other types are not persisted.
Example:
class MyProcessor extends AbstractProcessor { public MyProcessor() { receive(ReceiveBuilder. match(Persistent.class, p -> { Object payload = p.payload(); Long sequenceNr = p.sequenceNr(); // ... }).build() ); } } // ... ActorRef processor = context().actorOf(Props.create(MyProcessor.class), "myProcessor"); processor.tell(Persistent.create("foo"), null); processor.tell("bar", null);
During start and restart, persistent messages are replayed to a processor so that it can recover internal state from these messages. New messages sent to a processor during recovery do not interfere with replayed messages, hence applications don't need to wait for a processor to complete its recovery.
Automated recovery can be turned off or customized by overriding the preStart and preRestart life cycle hooks. If automated recovery is turned off, an application can explicitly recover a processor by sending it a Recover message.
Persistent messages are assigned sequence numbers that are generated on a per-processor basis. A sequence
starts at 1L
and doesn't contain gaps unless a processor (logically) deletes a message.
During recovery, a processor internally buffers new messages until recovery completes, so that new messages
do not interfere with replayed messages. This internal buffer (the processor stash) is isolated from the
user stash inherited by akka.actor.Stash
. Processor
implementation classes can therefore use the
user stash for stashing/unstashing both persistent and transient messages.
Processors can also store snapshots of internal state by calling saveSnapshot. During recovery, a saved snapshot is offered to the processor with a SnapshotOffer message, followed by replayed messages, if any, that are younger than the snapshot. Default is to offer the latest saved snapshot.
(Since version 2.3.4) AbstractProcessor will be removed. Instead extend akka.persistence.AbstractPersistentActor
and use it's persistAsync(command)(callback)
method to get equivalent semantics.
Java API: compatible with lambda expressions (to be used with akka.japi.pf.ReceiveBuilder)
Java API: compatible with lambda expressions (to be used with akka.japi.pf.ReceiveBuilder)
(Since version 2.3.4) Use akka.persistence.AbstractPersistentView
instead.
A channel is used by Processors (and Views) for sending Persistent messages to destinations.
A channel is used by Processors (and Views) for sending Persistent messages to destinations. The main responsibility of a channel is to prevent redundant delivery of replayed messages to destinations when a processor is recovered.
A channel is instructed to deliver a persistent message to a destination with the Deliver command. A
destination is provided as ActorPath
and messages are sent via that path's ActorSelection
.
class ForwardExample extends Processor { val destination = context.actorOf(Props[MyDestination]) val channel = context.actorOf(Channel.props(), "myChannel") def receive = { case m @ Persistent(payload, _) => // forward modified message to destination channel forward Deliver(m.withPayload(s"fw: ${payload}"), destination.path) } }
To reply to the sender of a persistent message, the sender
reference should be used as channel
destination.
class ReplyExample extends Processor { val channel = context.actorOf(Channel.props(), "myChannel") def receive = { case m @ Persistent(payload, _) => // reply modified message to sender channel ! Deliver(m.withPayload(s"re: ${payload}"), sender.path) } }
Redundant delivery of messages to destinations is only prevented if the receipt of these messages
is explicitly confirmed. Therefore, persistent messages that are delivered via a channel are of type
ConfirmablePersistent. Their receipt can be confirmed by a destination by calling the confirm()
method on these messages.
class MyDestination extends Actor { def receive = { case cp @ ConfirmablePersistent(payload, sequenceNr, redeliveries) => cp.confirm() } }
If a destination does not confirm the receipt of a ConfirmablePersistent
message, it will be redelivered
by the channel according to the parameters in ChannelSettings. Redelivered messages have a redeliveries
value greater than zero.
If the maximum number of redeliveries is reached for certain messages, they are removed from the channel and
a redeliverFailureListener
(if specified, see ChannelSettings) is notified about these messages with a
RedeliverFailure message. Besides other application-specific tasks, this listener can restart the sending
processor to enforce a redelivery of these messages or confirm these messages to prevent further redeliveries.
(Since version 2.3.4) Channel will be removed, see akka.persistence.AtLeastOnceDelivery
instead.
A Channel configuration object.
A Channel configuration object.
Maximum number of redelivery attempts.
Interval between redelivery attempts.
Receiver of RedeliverFailure notifications which are sent when the number
of redeliveries reaches redeliverMax
for a sequence of messages. To enforce
a redelivery of these messages, the listener has to restart the sending processor.
Alternatively, it can also confirm these messages, preventing further redeliveries.
(Since version 2.3.4) Channel will be removed, see akka.persistence.AtLeastOnceDelivery
instead.
Persistent message that has been delivered by a Channel or PersistentChannel.
Persistent message that has been delivered by a Channel or PersistentChannel. Channel destinations that receive messages of this type can confirm their receipt by calling confirm.
(Since version 2.3.4) Use akka.persistence.PersistentActor instead
Instructs a Channel or PersistentChannel to deliver a persistent
message to
a destination
.
Instructs a Channel or PersistentChannel to deliver a persistent
message to
a destination
.
persistent message.
persistent message destination.
(Since version 2.3.4) Channel will be removed, see akka.persistence.AtLeastOnceDelivery
instead.
Plugin API: confirmation message generated by receivers of ConfirmablePersistent messages
by calling ConfirmablePersistent.confirm()
.
Plugin API: confirmation message generated by receivers of ConfirmablePersistent messages
by calling ConfirmablePersistent.confirm()
.
(Since version 2.3.4) Channel will be removed, see akka.persistence.AtLeastOnceDelivery
instead.
Plugin API.
Plugin API.
(Since version 2.3.4) Channel will be removed, see akka.persistence.AtLeastOnceDelivery
instead.
Plugin API.
Plugin API.
(Since version 2.3.4) PersistentChannel will be removed, see akka.persistence.AtLeastOnceDelivery
instead.
An event sourced processor.
An event sourced processor.
(Since version 2.3.4) EventsourcedProcessor will be removed in 2.4.x, instead extend the API equivalent akka.persistence.PersistentActor
Persistent message.
Persistent message.
(Since version 2.3.4) Use akka.persistence.PersistentActor instead.
Instructs a Processor to atomically write the contained Persistent messages to the journal.
Instructs a Processor to atomically write the contained Persistent messages to the journal. The processor receives the written messages individually as Persistent messages. During recovery, they are also replayed individually.
(Since version 2.3.4) Use akka.persistence.PersistentActor instead
A PersistentChannel implements the same functionality as a Channel but additionally persists Deliver requests before they are served.
A PersistentChannel implements the same functionality as a Channel but additionally persists
Deliver requests before they are served. Persistent channels are useful in combination with slow
destinations or destinations that are unavailable for a long time. Deliver
requests that have been
persisted by a persistent channel are deleted when destinations confirm the receipt of the corresponding
messages.
The number of pending confirmations can be limited by a persistent channel based on the parameters of
PersistentChannelSettings. It can suspend delivery when the number of pending confirmations reaches
pendingConfirmationsMax
and resume delivery again when this number falls below pendingConfirmationsMin
.
This prevents both flooding destinations with more messages than they can process and unlimited memory
consumption by the channel. A persistent channel continues to persist Deliver request even when
message delivery is temporarily suspended.
A persistent channel can also reply to Deliver senders if the request has been successfully persisted
or not (see replyPersistent
parameter in PersistentChannelSettings). In case of success, the channel
replies with the contained Persistent message, otherwise with a PersistenceFailure message.
(Since version 2.3.4) PersistentChannel will be removed, see akka.persistence.AtLeastOnceDelivery
instead.
A PersistentChannel configuration object.
A PersistentChannel configuration object.
Maximum number of redelivery attempts.
Interval between redelivery attempts.
Receiver of RedeliverFailure notifications which are sent when the number
of redeliveries reaches redeliverMax
for a sequence of messages. To enforce
a redelivery of these messages, the listener has to Reset the persistent
channel. Alternatively, it can also confirm these messages, preventing further
redeliveries.
If true
the sender will receive the successfully stored Persistent message that has
been submitted with a Deliver request, or a PersistenceFailure message in case of
a persistence failure.
Message delivery is suspended by a channel if the number of pending reaches the
specified value and is resumed again if the number of pending confirmations falls
below pendingConfirmationsMin
.
Message delivery is resumed if the number of pending confirmations falls below
this limit. It is suspended again if it reaches pendingConfirmationsMax
.
Message delivery is enabled for a channel if the number of pending confirmations
is below this limit, or, is resumed again if it falls below this limit.
Maximum interval between read attempts made by a persistent channel. This settings applies, for example, after a journal failed to serve a read request. The next read request is then made after the configured timeout.
(Since version 2.3.4) PersistentChannel will be removed, see akka.persistence.AtLeastOnceDelivery
instead.
Plugin API: confirmation entry written by journal plugins.
Plugin API: confirmation entry written by journal plugins.
(Since version 2.3.4) Channel will be removed, see akka.persistence.AtLeastOnceDelivery
instead.
Plugin API: persistent message identifier.
Plugin API: persistent message identifier.
(Since version 2.3.4) deleteMessages will be removed.
An actor that persists (journals) messages of type Persistent.
An actor that persists (journals) messages of type Persistent. Messages of other types are not persisted.
import akka.persistence.{ Persistent, Processor } class MyProcessor extends Processor { def receive = { case Persistent(payload, sequenceNr) => // message has been written to journal case other => // message has not been written to journal } } val processor = actorOf(Props[MyProcessor], name = "myProcessor") processor ! Persistent("foo") processor ! "bar"
During start and restart, persistent messages are replayed to a processor so that it can recover internal state from these messages. New messages sent to a processor during recovery do not interfere with replayed messages, hence applications don't need to wait for a processor to complete its recovery.
Automated recovery can be turned off or customized by overriding the preStart and preRestart life cycle hooks. If automated recovery is turned off, an application can explicitly recover a processor by sending it a Recover message.
Persistent messages are assigned sequence numbers that are generated on a per-processor basis. A sequence
starts at 1L
and doesn't contain gaps unless a processor (logically) deletes a message
During recovery, a processor internally buffers new messages until recovery completes, so that new messages
do not interfere with replayed messages. This internal buffer (the processor stash) is isolated from the
user stash inherited by akka.actor.Stash
. Processor
implementation classes can therefore use the
user stash for stashing/unstashing both persistent and transient messages.
Processors can also store snapshots of internal state by calling saveSnapshot. During recovery, a saved snapshot is offered to the processor with a SnapshotOffer message, followed by replayed messages, if any, that are younger than the snapshot. Default is to offer the latest saved snapshot.
(Since version 2.3.4) Processor will be removed. Instead extend akka.persistence.PersistentActor
and use it's persistAsync(command)(callback)
method to get equivalent semantics.
Notification message to inform channel listeners about messages that have reached the maximum number of redeliveries.
Notification message to inform channel listeners about messages that have reached the maximum number of redeliveries.
(Since version 2.3.4) Channel will be removed, see akka.persistence.AtLeastOnceDelivery
instead.
Exception thrown by a PersistentChannel child actor to re-initiate delivery.
Exception thrown by a PersistentChannel child actor to re-initiate delivery.
(Since version 2.3.4) PersistentChannel will be removed, see akka.persistence.AtLeastOnceDelivery
instead.
Java API: an event sourced processor.
Java API: an event sourced processor.
(Since version 2.3.4) UntypedEventsourcedProcessor will be removed in 2.4.x, instead extend the API equivalent akka.persistence.PersistentActor
Java API: an actor that persists (journals) messages of type Persistent.
Java API: an actor that persists (journals) messages of type Persistent. Messages of other types are not persisted.
import akka.persistence.Persistent; import akka.persistence.Processor; class MyProcessor extends UntypedProcessor { public void onReceive(Object message) throws Exception { if (message instanceof Persistent) { // message has been written to journal Persistent persistent = (Persistent)message; Object payload = persistent.payload(); Long sequenceNr = persistent.sequenceNr(); // ... } else { // message has not been written to journal } } } // ... ActorRef processor = getContext().actorOf(Props.create(MyProcessor.class), "myProcessor"); processor.tell(Persistent.create("foo"), null); processor.tell("bar", null);
During start and restart, persistent messages are replayed to a processor so that it can recover internal state from these messages. New messages sent to a processor during recovery do not interfere with replayed messages, hence applications don't need to wait for a processor to complete its recovery.
Automated recovery can be turned off or customized by overriding the preStart and preRestart life cycle hooks. If automated recovery is turned off, an application can explicitly recover a processor by sending it a Recover message.
Persistent messages are assigned sequence numbers that are generated on a per-processor basis. A sequence
starts at 1L
and doesn't contain gaps unless a processor (logically) deletes a message.
During recovery, a processor internally buffers new messages until recovery completes, so that new messages
do not interfere with replayed messages. This internal buffer (the processor stash) is isolated from the
user stash inherited by akka.actor.Stash
. Processor
implementation classes can therefore use the
user stash for stashing/unstashing both persistent and transient messages.
Processors can also store snapshots of internal state by calling saveSnapshot. During recovery, a saved snapshot is offered to the processor with a SnapshotOffer message, followed by replayed messages, if any, that are younger than the snapshot. Default is to offer the latest saved snapshot.
(Since version 2.3.4) UntypedProcessor will be removed. Instead extend akka.persistence.UntypedPersistentActor
and use it's persistAsync(command)(callback)
method to get equivalent semantics.
Java API.
Java API.
(Since version 2.3.4) Use akka.persistence.UntypedPersistentView instead.
A view replicates the persistent message stream of a processor.
A view replicates the persistent message stream of a processor. Implementation classes receive the
message stream as Persistent messages. These messages can be processed to update internal state
in order to maintain an (eventual consistent) view of the state of the corresponding processor. A
view can also run on a different node, provided that a replicated journal is used. Implementation
classes reference a processor by implementing persistenceId
.
Views can also store snapshots of internal state by calling #saveSnapshot. The snapshots of a view are independent of those of the referenced processor. During recovery, a saved snapshot is offered to the view with a SnapshotOffer message, followed by replayed messages, if any, that are younger than the snapshot. Default is to offer the latest saved snapshot.
By default, a view automatically updates itself with an interval returned by autoUpdateInterval
.
This method can be overridden by implementation classes to define a view instance-specific update
interval. The default update interval for all views of an actor system can be configured with the
akka.persistence.view.auto-update-interval
configuration key. Applications may trigger additional
view updates by sending the view Update requests. See also methods
Views can also use channels to communicate with destinations in the same way as processors can do.
(Since version 2.3.4) Use akka.persistence.PersistentView
instead.
Persistence extension.
Sent to a Processor when the journal replay has been finished.
Sent to a Processor when the journal replay has been finished.
(Since version 2.3.4) Channel will be removed, see akka.persistence.AtLeastOnceDelivery
instead.
(Since version 2.3.4) Channel will be removed, see akka.persistence.AtLeastOnceDelivery
instead.
(Since version 2.3.4) Use akka.persistence.PersistentActor instead
(Since version 2.3.4) Channel will be removed, see akka.persistence.AtLeastOnceDelivery
instead.
(Since version 2.3.4) Use akka.persistence.PersistentActor instead
(Since version 2.3.4) Use akka.persistence.PersistentActor instead
(Since version 2.3.4) Channel will be removed, see akka.persistence.AtLeastOnceDelivery
instead.
(Since version 2.3.4) PersistentChannel will be removed, see akka.persistence.AtLeastOnceDelivery
instead.
Resets a PersistentChannel, forcing it to redeliver all unconfirmed persistent messages.
Resets a PersistentChannel, forcing it to redeliver all unconfirmed persistent messages. This does not affect writing Deliver requests.
(Since version 2.3.4) PersistentChannel will be removed, see akka.persistence.AtLeastOnceDelivery
instead.