Persistence
Akka persistence enables stateful actors to persist their internal state so that it can be recovered when an actor is started, restarted after a JVM crash or by a supervisor, or migrated in a cluster. The key concept behind Akka persistence is that only changes to an actor's internal state are persisted but never its current state directly (except for optional snapshots). These changes are only ever appended to storage, nothing is ever mutated, which allows for very high transaction rates and efficient replication. Stateful actors are recovered by replaying stored changes to these actors from which they can rebuild internal state. This can be either the full history of changes or starting from a snapshot which can dramatically reduce recovery times. Akka persistence also provides point-to-point communication with at-least-once message delivery semantics.
Note
Java 8 lambda expressions are also supported. (See section Persistence (Java with Lambda Support))
Warning
This module is marked as “experimental” as of its introduction in Akka 2.3.0. We will continue to
improve this API based on our users’ feedback, which implies that while we try to keep incompatible
changes to a minimum the binary compatibility guarantee for maintenance releases does not apply to the
contents of the akka.persistence
package.
Akka persistence is inspired by and the official replacement of the eventsourced library. It follows the same concepts and architecture of eventsourced but significantly differs on API and implementation level. See also Migration Guide Eventsourced to Akka Persistence 2.3.x
Changes in Akka 2.3.4
In Akka 2.3.4 several of the concepts of the earlier versions were collapsed and simplified.
In essence; Processor
and EventsourcedProcessor
are replaced by PersistentActor
. Channel
and PersistentChannel
are replaced by AtLeastOnceDelivery
. View
is replaced by PersistentView
.
See full details of the changes in the Migration Guide Akka Persistence (experimental) 2.3.3 to 2.3.4 (and 2.4.x). The old classes are still included, and deprecated, for a while to make the transition smooth. In case you need the old documentation it is located here.
Dependencies
Akka persistence is a separate jar file. Make sure that you have the following dependency in your project:
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-persistence-experimental_2.10</artifactId>
<version>2.3.16</version>
</dependency>
Architecture
- UntypedPersistentActor: Is a persistent, stateful actor. It is able to persist events to a journal and can react to them in a thread-safe manner. It can be used to implement both command as well as event sourced actors. When a persistent actor is started or restarted, journaled messages are replayed to that actor, so that it can recover internal state from these messages.
- UntypedPersistentView: A view is a persistent, stateful actor that receives journaled messages that have been written by another persistent actor. A view itself does not journal new messages, instead, it updates internal state only from a persistent actor's replicated message stream.
- UntypedPersistentActorAtLeastOnceDelivery: To send messages with at-least-once delivery semantics to destinations, also in case of sender and receiver JVM crashes.
- Journal: A journal stores the sequence of messages sent to a persistent actor. An application can control which messages are journaled and which are received by the persistent actor without being journaled. The storage backend of a journal is pluggable. The default journal storage plugin writes to the local filesystem, replicated journals are available as Community plugins.
- Snapshot store: A snapshot store persists snapshots of a persistent actor's or a view's internal state. Snapshots are used for optimizing recovery times. The storage backend of a snapshot store is pluggable. The default snapshot storage plugin writes to the local filesystem.
Event sourcing
The basic idea behind Event Sourcing is quite simple. A persistent actor receives a (non-persistent) command which is first validated if it can be applied to the current state. Here, validation can mean anything, from simple inspection of a command message's fields up to a conversation with several external services, for example. If validation succeeds, events are generated from the command, representing the effect of the command. These events are then persisted and, after successful persistence, used to change the actor's state. When the persistent actor needs to be recovered, only the persisted events are replayed of which we know that they can be successfully applied. In other words, events cannot fail when being replayed to a persistent actor, in contrast to commands. Event sourced actors may of course also process commands that do not change application state, such as query commands, for example.
Akka persistence supports event sourcing with the UntypedPersistentActor
abstract class. An actor that extends this
class uses the persist
method to persist and handle events. The behavior of an UntypedPersistentActor
is defined by implementing receiveRecover
and receiveCommand
. This is demonstrated in the following example.
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.Procedure;
import akka.persistence.SnapshotOffer;
import akka.persistence.UntypedPersistentActor;
import java.io.Serializable;
import java.util.ArrayList;
import static java.util.Arrays.asList;
class Cmd implements Serializable {
private final String data;
public Cmd(String data) {
this.data = data;
}
public String getData() {
return data;
}
}
class Evt implements Serializable {
private final String data;
public Evt(String data) {
this.data = data;
}
public String getData() {
return data;
}
}
class ExampleState implements Serializable {
private final ArrayList<String> events;
public ExampleState() {
this(new ArrayList<String>());
}
public ExampleState(ArrayList<String> events) {
this.events = events;
}
public ExampleState copy() {
return new ExampleState(new ArrayList<String>(events));
}
public void update(Evt evt) {
events.add(evt.getData());
}
public int size() {
return events.size();
}
@Override
public String toString() {
return events.toString();
}
}
class ExamplePersistentActor extends UntypedPersistentActor {
@Override
public String persistenceId() { return "sample-id-1"; }
private ExampleState state = new ExampleState();
public int getNumEvents() {
return state.size();
}
@Override
public void onReceiveRecover(Object msg) {
if (msg instanceof Evt) {
state.update((Evt) msg);
} else if (msg instanceof SnapshotOffer) {
state = (ExampleState)((SnapshotOffer)msg).snapshot();
} else {
unhandled(msg);
}
}
@Override
public void onReceiveCommand(Object msg) {
if (msg instanceof Cmd) {
final String data = ((Cmd)msg).getData();
final Evt evt1 = new Evt(data + "-" + getNumEvents());
final Evt evt2 = new Evt(data + "-" + (getNumEvents() + 1));
persist(asList(evt1, evt2), new Procedure<Evt>() {
public void apply(Evt evt) throws Exception {
state.update(evt);
if (evt.equals(evt2)) {
getContext().system().eventStream().publish(evt);
}
}
});
} else if (msg.equals("snap")) {
// IMPORTANT: create a copy of snapshot
// because ExampleState is mutable !!!
saveSnapshot(state.copy());
} else if (msg.equals("print")) {
System.out.println(state);
} else {
unhandled(msg);
}
}
}
The example defines two data types, Cmd
and Evt
to represent commands and events, respectively. The
state
of the ExamplePersistentActor
is a list of persisted event data contained in ExampleState
.
The persistent actor's onReceiveRecover
method defines how state
is updated during recovery by handling Evt
and SnapshotOffer
messages. The persistent actor's onReceiveCommand
method is a command handler. In this example,
a command is handled by generating two events which are then persisted and handled. Events are persisted by calling
persist
with an event (or a sequence of events) as first argument and an event handler as second argument.
The persist
method persists events asynchronously and the event handler is executed for successfully persisted
events. Successfully persisted events are internally sent back to the persistent actor as individual messages that trigger
event handler executions. An event handler may close over persistent actor state and mutate it. The sender of a persisted
event is the sender of the corresponding command. This allows event handlers to reply to the sender of a command
(not shown).
The main responsibility of an event handler is changing persistent actor state using event data and notifying others about successful state changes by publishing events.
When persisting events with persist
it is guaranteed that the persistent actor will not receive further commands between
the persist
call and the execution(s) of the associated event handler. This also holds for multiple persist
calls in context of a single command.
The easiest way to run this example yourself is to download Typesafe Activator
and open the tutorial named Akka Persistence Samples with Java.
It contains instructions on how to run the PersistentActorExample
.
Note
It's also possible to switch between different command handlers during normal processing and recovery
with getContext().become()
and getContext().unbecome()
. To get the actor into the same state after
recovery you need to take special care to perform the same state transitions with become
and
unbecome
in the receiveRecover
method as you would have done in the command handler.
Identifiers
A persistent actor must have an identifier that doesn't change across different actor incarnations.
The identifier must be defined with the persistenceId
method.
@Override
public String persistenceId() {
return "my-stable-persistence-id";
}
Recovery
By default, a persistent actor is automatically recovered on start and on restart by replaying journaled messages. New messages sent to a persistent actor during recovery do not interfere with replayed messages. New messages will only be received by a persistent actor after recovery completes.
Recovery customization
Automated recovery on start can be disabled by overriding preStart
with an empty implementation.
@Override
public void preStart() {}
In this case, a persistent actor must be recovered explicitly by sending it a Recover
message.
processor.tell(Recover.create(), null);
If not overridden, preStart
sends a Recover
message to getSelf()
. Applications may also override
preStart
to define further Recover
parameters such as an upper sequence number bound, for example.
@Override
public void preStart() {
getSelf().tell(Recover.create(457L), null);
}
Upper sequence number bounds can be used to recover a persistent actor to past state instead of current state. Automated
recovery on restart can be disabled by overriding preRestart
with an empty implementation.
@Override
public void preRestart(Throwable reason, Option<Object> message) {}
Recovery status
A persistent actor can query its own recovery status via the methods
public boolean recoveryRunning();
public boolean recoveryFinished();
Sometimes there is a need for performing additional initialization when the
recovery has completed, before processing any other message sent to the persistent actor.
The persistent actor will receive a special RecoveryCompleted
message right after recovery
and before any other received messages.
If there is a problem with recovering the state of the actor from the journal, the actor will be
sent a RecoveryFailure
message that it can choose to handle in receiveRecover
. If the
actor doesn't handle the RecoveryFailure
message it will be stopped.
@Override
public void onReceiveRecover(Object message) {
if (message instanceof RecoveryCompleted) {
recoveryCompleted();
}
// ...
}
@Override
public void onReceiveCommand(Object message) throws Exception {
if (message instanceof String) {
// ...
} else {
unhandled(message);
}
}
private void recoveryCompleted() {
// perform init after recovery, before any other messages
// ...
}
Relaxed local consistency requirements and high throughput use-cases
If faced with relaxed local consistency requirements and high throughput demands sometimes PersistentActor
and it's
persist
may not be enough in terms of consuming incoming Commands at a high rate, because it has to wait until all
Events related to a given Command are processed in order to start processing the next Command. While this abstraction is
very useful for most cases, sometimes you may be faced with relaxed requirements about consistency – for example you may
want to process commands as fast as you can, assuming that Event will eventually be persisted and handled properly in
the background and retroactively reacting to persistence failures if needed.
The persistAsync
method provides a tool for implementing high-throughput persistent actors. It will not
stash incoming Commands while the Journal is still working on persisting and/or user code is executing event callbacks.
In the below example, the event callbacks may be called "at any time", even after the next Command has been processed. The ordering between events is still guaranteed ("evt-b-1" will be sent after "evt-a-2", which will be sent after "evt-a-1" etc.).
class MyPersistentActor extends UntypedPersistentActor {
@Override
public String persistenceId() { return "some-persistence-id"; }
@Override
public void onReceiveRecover(Object msg) {
// handle recovery here
}
@Override
public void onReceiveCommand(Object msg) {
sender().tell(msg, getSelf());
persistAsync(String.format("evt-%s-1", msg), new Procedure<String>(){
@Override
public void apply(String event) throws Exception {
sender().tell(event, self());
}
});
persistAsync(String.format("evt-%s-2", msg), new Procedure<String>(){
@Override
public void apply(String event) throws Exception {
sender().tell(event, self());
}
});
}
}
Note
In order to implement the pattern known as "command sourcing" simply persistAsync
all incoming events right away,
and handle them in the callback.
Warning
The callback will not be invoked if the actor is restarted (or stopped) in between the call to
persistAsync
and the journal has confirmed the write.
Deferring actions until preceding persist handlers have executed
Sometimes when working with persistAsync
you may find that it would be nice to define some actions in terms of
''happens-after the previous persistAsync
handlers have been invoked''. PersistentActor
provides an utility method
called defer
, which works similarily to persistAsync
yet does not persist the passed in event. It is recommended to
use it for read operations, and actions which do not have corresponding events in your domain model.
Using this method is very similar to the persist family of methods, yet it does not persist the passed in event. It will be kept in memory and used when invoking the handler.
class MyPersistentActor extends UntypedPersistentActor {
@Override
public String persistenceId() { return "some-persistence-id"; }
@Override
public void onReceiveRecover(Object msg) {
// handle recovery here
}
@Override
public void onReceiveCommand(Object msg) {
final Procedure<String> replyToSender = new Procedure<String>() {
@Override
public void apply(String event) throws Exception {
sender().tell(event, self());
}
};
persistAsync(String.format("evt-%s-1", msg), replyToSender);
persistAsync(String.format("evt-%s-2", msg), replyToSender);
defer(String.format("evt-%s-3", msg), replyToSender);
}
}
Notice that the sender()
is safe to access in the handler callback, and will be pointing to the original sender
of the command for which this defer
handler was called.
final ActorRef processor = system.actorOf(Props.create(MyPersistentActor.class));
processor.tell("a", null);
processor.tell("b", null);
// order of received messages:
// a
// b
// evt-a-1
// evt-a-2
// evt-a-3
// evt-b-1
// evt-b-2
// evt-b-3
Warning
The callback will not be invoked if the actor is restarted (or stopped) in between the call to
defer
and the journal has processed and confirmed all preceding writes.
Batch writes
To optimize throughput, a persistent actor internally batches events to be stored under high load before
writing them to the journal (as a single batch). The batch size dynamically grows from 1 under low and moderate loads
to a configurable maximum size (default is 200
) under high load. When using persistAsync
this increases
the maximum throughput dramatically.
akka.persistence.journal.max-message-batch-size = 200
A new batch write is triggered by a persistent actor as soon as a batch reaches the maximum size or if the journal completed writing the previous batch. Batch writes are never timer-based which keeps latencies at a minimum.
The batches are also used internally to ensure atomic writes of events. All events that are persisted in context
of a single command are written as a single batch to the journal (even if persist
is called multiple times per command).
The recovery of an UntypedPersistentActor
will therefore never be done partially (with only a subset of events persisted by a
single command).
Message deletion
To delete all messages (journaled by a single persistent actor) up to a specified sequence number,
persistent actors may call the deleteMessages
method.
An optional permanent
parameter specifies whether the message shall be permanently
deleted from the journal or only marked as deleted. In both cases, the message won't be replayed. Later extensions
to Akka persistence will allow to replay messages that have been marked as deleted which can be useful for debugging
purposes, for example.
Persistent Views
Persistent views can be implemented by extending the UntypedPersistentView
trait and implementing the onReceive
and the persistenceId
methods.
class MyView extends UntypedPersistentView {
@Override
public String persistenceId() { return "some-persistence-id"; }
@Override
public String viewId() { return "my-stable-persistence-view-id"; }
@Override
public void onReceive(Object message) throws Exception {
if (isPersistent()) {
// handle message from Journal...
} else if (message instanceof String) {
// handle message from user...
} else {
unhandled(message);
}
}
}
The persistenceId
identifies the persistent actor from which the view receives journaled messages. It is not necessary
the referenced persistent actor is actually running. Views read messages from a persistent actor's journal directly. When a
persistent actor is started later and begins to write new messages, the corresponding view is updated automatically, by
default.
It is possible to determine if a message was sent from the Journal or from another actor in user-land by calling the isPersistent
method. Having that said, very often you don't need this information at all and can simply apply the same logic to both cases
(skip the if isPersistent
check).
Updates
The default update interval of all persistent views of an actor system is configurable:
akka.persistence.view.auto-update-interval = 5s
UntypedPersistentView
implementation classes may also override the autoUpdateInterval
method to return a custom update
interval for a specific view class or view instance. Applications may also trigger additional updates at
any time by sending a view an Update
message.
final ActorRef view = system.actorOf(Props.create(MyView.class));
view.tell(Update.create(true), null);
If the await
parameter is set to true
, messages that follow the Update
request are processed when the
incremental message replay, triggered by that update request, completed. If set to false
(default), messages
following the update request may interleave with the replayed message stream. Automated updates always run with
await = false
.
Automated updates of all persistent views of an actor system can be turned off by configuration:
akka.persistence.view.auto-update = off
Implementation classes may override the configured default value by overriding the autoUpdate
method. To
limit the number of replayed messages per update request, applications can configure a custom
akka.persistence.view.auto-update-replay-max
value or override the autoUpdateReplayMax
method. The number
of replayed messages for manual updates can be limited with the replayMax
parameter of the Update
message.
Recovery
Initial recovery of persistent views works in the very same way as for a persistent actor (i.e. by sending a Recover
message
to self). The maximum number of replayed messages during initial recovery is determined by autoUpdateReplayMax
.
Further possibilities to customize initial recovery are explained in section Recovery.
Identifiers
A persistent view must have an identifier that doesn't change across different actor incarnations.
The identifier must be defined with the viewId
method.
The viewId
must differ from the referenced persistenceId
, unless Snapshots of a view and its
persistent actor shall be shared (which is what applications usually do not want).
Snapshots
Snapshots can dramatically reduce recovery times of persistent actor and views. The following discusses snapshots in context of persistent actor but this is also applicable to persistent views.
Persistent actor can save snapshots of internal state by calling the saveSnapshot
method. If saving of a snapshot
succeeds, the persistent actor receives a SaveSnapshotSuccess
message, otherwise a SaveSnapshotFailure
message
class MyProcessor extends UntypedProcessor {
private Object state;
@Override
public void onReceive(Object message) throws Exception {
if (message.equals("snap")) {
saveSnapshot(state);
} else if (message instanceof SaveSnapshotSuccess) {
SnapshotMetadata metadata = ((SaveSnapshotSuccess)message).metadata();
// ...
} else if (message instanceof SaveSnapshotFailure) {
SnapshotMetadata metadata = ((SaveSnapshotFailure)message).metadata();
// ...
}
}
}
During recovery, the persistent actor is offered a previously saved snapshot via a SnapshotOffer
message from
which it can initialize internal state.
class MyProcessor extends UntypedProcessor {
private Object state;
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof SnapshotOffer) {
state = ((SnapshotOffer)message).snapshot();
// ...
} else if (message instanceof Persistent) {
// ...
}
}
}
The replayed messages that follow the SnapshotOffer
message, if any, are younger than the offered snapshot.
They finally recover the persistent actor to its current (i.e. latest) state.
In general, a persistent actor is only offered a snapshot if that persistent actor has previously saved one or more snapshots
and at least one of these snapshots matches the SnapshotSelectionCriteria
that can be specified for recovery.
processor.tell(Recover.create(SnapshotSelectionCriteria.create(457L, System.currentTimeMillis())), null);
If not specified, they default to SnapshotSelectionCriteria.latest()
which selects the latest (= youngest) snapshot.
To disable snapshot-based recovery, applications should use SnapshotSelectionCriteria.none()
. A recovery where no
saved snapshot matches the specified SnapshotSelectionCriteria
will replay all journaled messages.
Snapshot deletion
A persistent actor can delete individual snapshots by calling the deleteSnapshot
method with the sequence number and the
timestamp of a snapshot as argument. To bulk-delete snapshots matching SnapshotSelectionCriteria
, persistent actors should
use the deleteSnapshots
method.
At-Least-Once Delivery
To send messages with at-least-once delivery semantics to destinations you can extend the UntypedPersistentActorWithAtLeastOnceDelivery
class instead of UntypedPersistentActor
on the sending side. It takes care of re-sending messages when they
have not been confirmed within a configurable timeout.
Note
At-least-once delivery implies that original message send order is not always preserved
and the destination may receive duplicate messages. That means that the
semantics do not match those of a normal ActorRef
send operation:
- it is not at-most-once delivery
- message order for the same sender–receiver pair is not preserved due to possible resends
- after a crash and restart of the destination messages are still delivered—to the new actor incarnation
These semantics is similar to what an ActorPath
represents (see
Actor Lifecycle), therefore you need to supply a path and not a
reference when delivering messages. The messages are sent to the path with
an actor selection.
Use the deliver
method to send a message to a destination. Call the confirmDelivery
method
when the destination has replied with a confirmation message.
class Msg implements Serializable {
public final long deliveryId;
public final String s;
public Msg(long deliveryId, String s) {
this.deliveryId = deliveryId;
this.s = s;
}
}
class Confirm implements Serializable {
public final long deliveryId;
public Confirm(long deliveryId) {
this.deliveryId = deliveryId;
}
}
class MsgSent implements Serializable {
public final String s;
public MsgSent(String s) {
this.s = s;
}
}
class MsgConfirmed implements Serializable {
public final long deliveryId;
public MsgConfirmed(long deliveryId) {
this.deliveryId = deliveryId;
}
}
class MyPersistentActor extends UntypedPersistentActorWithAtLeastOnceDelivery {
private final ActorPath destination;
public MyPersistentActor(ActorPath destination) {
this.destination = destination;
}
public void onReceiveCommand(Object message) {
if (message instanceof String) {
String s = (String) message;
persist(new MsgSent(s), new Procedure<MsgSent>() {
public void apply(MsgSent evt) {
updateState(evt);
}
});
} else if (message instanceof Confirm) {
Confirm confirm = (Confirm) message;
persist(new MsgConfirmed(confirm.deliveryId), new Procedure<MsgConfirmed>() {
public void apply(MsgConfirmed evt) {
updateState(evt);
}
});
} else {
unhandled(message);
}
}
public void onReceiveRecover(Object event) {
updateState(event);
}
void updateState(Object event) {
if (event instanceof MsgSent) {
final MsgSent evt = (MsgSent) event;
deliver(destination, new Function<Long, Object>() {
public Object apply(Long deliveryId) {
return new Msg(deliveryId, evt.s);
}
});
} else if (event instanceof MsgConfirmed) {
final MsgConfirmed evt = (MsgConfirmed) event;
confirmDelivery(evt.deliveryId);
}
}
}
class MyDestination extends UntypedActor {
public void onReceive(Object message) throws Exception {
if (message instanceof Msg) {
Msg msg = (Msg) message;
// ...
getSender().tell(new Confirm(msg.deliveryId), getSelf());
} else {
unhandled(message);
}
}
}
Correlation between deliver
and confirmDelivery
is performed with the deliveryId
that is provided
as parameter to the deliveryIdToMessage
function. The deliveryId
is typically passed in the message to the
destination, which replies with a message containing the same deliveryId
.
The deliveryId
is a strictly monotonically increasing sequence number without gaps. The same sequence is
used for all destinations of the actor, i.e. when sending to multiple destinations the destinations will see
gaps in the sequence if no translation is performed.
The UntypedPersistentActorWithAtLeastOnceDelivery
class has a state consisting of unconfirmed messages and a
sequence number. It does not store this state itself. You must persist events corresponding to the
deliver
and confirmDelivery
invocations from your PersistentActor
so that the state can
be restored by calling the same methods during the recovery phase of the PersistentActor
. Sometimes
these events can be derived from other business level events, and sometimes you must create separate events.
During recovery calls to deliver
will not send out the message, but it will be sent later
if no matching confirmDelivery
was performed.
Support for snapshots is provided by getDeliverySnapshot
and setDeliverySnapshot
.
The AtLeastOnceDeliverySnapshot
contains the full delivery state, including unconfirmed messages.
If you need a custom snapshot for other parts of the actor state you must also include the
AtLeastOnceDeliverySnapshot
. It is serialized using protobuf with the ordinary Akka
serialization mechanism. It is easiest to include the bytes of the AtLeastOnceDeliverySnapshot
as a blob in your custom snapshot.
The interval between redelivery attempts is defined by the redeliverInterval
method.
The default value can be configured with the akka.persistence.at-least-once-delivery.redeliver-interval
configuration key. The method can be overridden by implementation classes to return non-default values.
After a number of delivery attempts a AtLeastOnceDelivery.UnconfirmedWarning
message
will be sent to self
. The re-sending will still continue, but you can choose to call
confirmDelivery
to cancel the re-sending. The number of delivery attempts before emitting the
warning is defined by the warnAfterNumberOfUnconfirmedAttempts
method. The default value can be
configured with the akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts
configuration key. The method can be overridden by implementation classes to return non-default values.
The UntypedPersistentActorWithAtLeastOnceDelivery
class holds messages in memory until their successful delivery has been confirmed.
The limit of maximum number of unconfirmed messages that the actor is allowed to hold in memory
is defined by the maxUnconfirmedMessages
method. If this limit is exceed the deliver
method will
not accept more messages and it will throw AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException
.
The default value can be configured with the akka.persistence.at-least-once-delivery.max-unconfirmed-messages
configuration key. The method can be overridden by implementation classes to return non-default values.
Storage plugins
Storage backends for journals and snapshot stores are pluggable in Akka persistence. The default journal plugin writes messages to LevelDB (see Local LevelDB journal). The default snapshot store plugin writes snapshots as individual files to the local filesystem (see Local snapshot store). Applications can provide their own plugins by implementing a plugin API and activate them by configuration. Plugin development requires the following imports:
import akka.actor.*;
import akka.japi.Option;
import akka.japi.Procedure;
import akka.persistence.*;
import akka.persistence.japi.journal.JavaJournalSpec;
import akka.persistence.japi.snapshot.JavaSnapshotStoreSpec;
import akka.persistence.journal.japi.AsyncWriteJournal;
import akka.persistence.journal.leveldb.SharedLeveldbJournal;
import akka.persistence.journal.leveldb.SharedLeveldbStore;
import akka.persistence.snapshot.japi.SnapshotStore;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.iq80.leveldb.util.FileUtils;
import scala.concurrent.Future;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
Journal plugin API
A journal plugin either extends SyncWriteJournal
or AsyncWriteJournal
. SyncWriteJournal
is an
actor that should be extended when the storage backend API only supports synchronous, blocking writes. In this
case, the methods to be implemented are:
/**
* Java API, Plugin API: synchronously writes a batch of persistent messages to the
* journal. The batch write must be atomic i.e. either all persistent messages in the
* batch are written or none.
*/
void doWriteMessages(Iterable<PersistentRepr> messages);
/**
* Java API, Plugin API: synchronously writes a batch of delivery confirmations to
* the journal.
*
* @deprecated doWriteConfirmations will be removed, since Channels will be removed (since 2.3.4)
*/
@Deprecated void doWriteConfirmations(Iterable<PersistentConfirmation> confirmations);
/**
* Java API, Plugin API: synchronously deletes messages identified by `messageIds`
* from the journal. If `permanent` is set to `false`, the persistent messages are
* marked as deleted, otherwise they are permanently deleted.
*
* @deprecated doDeleteMessages will be removed (since 2.3.4)
*/
@Deprecated void doDeleteMessages(Iterable<PersistentId> messageIds, boolean permanent);
/**
* Java API, Plugin API: synchronously deletes all persistent messages up to
* `toSequenceNr`. If `permanent` is set to `false`, the persistent messages are
* marked as deleted, otherwise they are permanently deleted.
*
* @see AsyncRecoveryPlugin
*/
void doDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent);
AsyncWriteJournal
is an actor that should be extended if the storage backend API supports asynchronous,
non-blocking writes. In this case, the methods to be implemented are:
/**
* Java API, Plugin API: synchronously writes a batch of persistent messages to the
* journal. The batch write must be atomic i.e. either all persistent messages in the
* batch are written or none.
*/
Future<Void> doAsyncWriteMessages(Iterable<PersistentRepr> messages);
/**
* Java API, Plugin API: synchronously writes a batch of delivery confirmations to
* the journal.
*
* @deprecated doAsyncWriteConfirmations will be removed, since Channels will be removed (since 2.3.4)
*/
@Deprecated Future<Void> doAsyncWriteConfirmations(Iterable<PersistentConfirmation> confirmations);
/**
* Java API, Plugin API: synchronously deletes messages identified by `messageIds`
* from the journal. If `permanent` is set to `false`, the persistent messages are
* marked as deleted, otherwise they are permanently deleted.
*
* @deprecated doAsyncDeleteMessages will be removed (since 2.3.4)
*/
@Deprecated Future<Void> doAsyncDeleteMessages(Iterable<PersistentId> messageIds, boolean permanent);
/**
* Java API, Plugin API: synchronously deletes all persistent messages up to
* `toSequenceNr`. If `permanent` is set to `false`, the persistent messages are
* marked as deleted, otherwise they are permanently deleted.
*
* @see AsyncRecoveryPlugin
*/
Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent);
Message replays and sequence number recovery are always asynchronous, therefore, any journal plugin must implement:
/**
* Java API, Plugin API: asynchronously replays persistent messages.
* Implementations replay a message by calling `replayCallback`. The returned
* future must be completed when all messages (matching the sequence number
* bounds) have been replayed. The future must be completed with a failure if
* any of the persistent messages could not be replayed.
*
* The `replayCallback` must also be called with messages that have been marked
* as deleted. In this case a replayed message's `deleted` method must return
* `true`.
*
* The channel ids of delivery confirmations that are available for a replayed
* message must be contained in that message's `confirms` sequence.
*
* @param persistenceId processor id.
* @param fromSequenceNr sequence number where replay should start (inclusive).
* @param toSequenceNr sequence number where replay should end (inclusive).
* @param max maximum number of messages to be replayed.
* @param replayCallback called to replay a single message. Can be called from any
* thread.
*/
Future<Void> doAsyncReplayMessages(String persistenceId, long fromSequenceNr, long toSequenceNr, long max, Procedure<PersistentRepr> replayCallback);
/**
* Java API, Plugin API: asynchronously reads the highest stored sequence number
* for the given `persistenceId`.
*
* @param persistenceId processor id.
* @param fromSequenceNr hint where to start searching for the highest sequence
* number.
*/
Future<Long> doAsyncReadHighestSequenceNr(String persistenceId, long fromSequenceNr);
A journal plugin can be activated with the following minimal configuration:
# Path to the journal plugin to be used
akka.persistence.journal.plugin = "my-journal"
# My custom journal plugin
my-journal {
# Class name of the plugin.
class = "docs.persistence.MyJournal"
# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.actor.default-dispatcher"
}
The specified plugin class
must have a no-arg constructor. The plugin-dispatcher
is the dispatcher
used for the plugin actor. If not specified, it defaults to akka.persistence.dispatchers.default-plugin-dispatcher
for SyncWriteJournal
plugins and akka.actor.default-dispatcher
for AsyncWriteJournal
plugins.
Snapshot store plugin API
A snapshot store plugin must extend the SnapshotStore
actor and implement the following methods:
/**
* Java API, Plugin API: asynchronously loads a snapshot.
*
* @param persistenceId processor id.
* @param criteria selection criteria for loading.
*/
Future<Option<SelectedSnapshot>> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria);
/**
* Java API, Plugin API: asynchronously saves a snapshot.
*
* @param metadata snapshot metadata.
* @param snapshot snapshot.
*/
Future<Void> doSaveAsync(SnapshotMetadata metadata, Object snapshot);
/**
* Java API, Plugin API: called after successful saving of a snapshot.
*
* @param metadata snapshot metadata.
*/
void onSaved(SnapshotMetadata metadata) throws Exception;
/**
* Java API, Plugin API: deletes the snapshot identified by `metadata`.
*
* @param metadata snapshot metadata.
*/
void doDelete(SnapshotMetadata metadata) throws Exception;
/**
* Java API, Plugin API: deletes all snapshots matching `criteria`.
*
* @param persistenceId processor id.
* @param criteria selection criteria for deleting.
*/
void doDelete(String persistenceId, SnapshotSelectionCriteria criteria) throws Exception;
A snapshot store plugin can be activated with the following minimal configuration:
# Path to the snapshot store plugin to be used
akka.persistence.snapshot-store.plugin = "my-snapshot-store"
# My custom snapshot store plugin
my-snapshot-store {
# Class name of the plugin.
class = "docs.persistence.MySnapshotStore"
# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
}
The specified plugin class
must have a no-arg constructor. The plugin-dispatcher
is the dispatcher
used for the plugin actor. If not specified, it defaults to akka.persistence.dispatchers.default-plugin-dispatcher
.
Plugin TCK
In order to help developers build correct and high quality storage plugins, we provide an Technology Compatibility Kit (TCK for short).
The TCK is usable from Java as well as Scala projects, for Java you need to include the akka-persistence-tck-experimental dependency:
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-persistence-tck-experimental_${scala.version}</artifactId>
<version>2.3.5</version>
<scope>test</scope>
</dependency>
To include the Journal TCK tests in your test suite simply extend the provided JavaJournalSpec
:
class MyJournalSpecTest extends JavaJournalSpec {
public MyJournalSpecTest() {
super(ConfigFactory.parseString(
"persistence.journal.plugin = " +
"\"akka.persistence.journal.leveldb-shared\""));
}
}
We also provide a simple benchmarking class JavaJournalPerfSpec
which includes all the tests that JavaJournalSpec
has, and also performs some longer operations on the Journal while printing it's performance stats. While it is NOT aimed
to provide a proper benchmarking environment it can be used to get a rough feel about your journals performance in the most
typical scenarios.
In order to include the SnapshotStore
TCK tests in your test suite simply extend the SnapshotStoreSpec
:
class MySnapshotStoreTest extends JavaSnapshotStoreSpec {
public MySnapshotStoreTest() {
super(ConfigFactory.parseString(
"akka.persistence.snapshot-store.plugin = " +
"\"akka.persistence.snapshot-store.local\""));
}
}
In case your plugin requires some setting up (starting a mock database, removing temporary files etc.) you can override the
beforeAll
and afterAll
methods to hook into the tests lifecycle:
class MyJournalSpecTest extends JavaJournalSpec {
List<File> storageLocations = new ArrayList<File>();
public MyJournalSpecTest() {
super(ConfigFactory.parseString(
"persistence.journal.plugin = " +
"\"akka.persistence.journal.leveldb-shared\""));
Config config = system().settings().config();
storageLocations.add(new File(
config.getString("akka.persistence.journal.leveldb.dir")));
storageLocations.add(new File(
config.getString("akka.persistence.snapshot-store.local.dir")));
}
@Override
public void beforeAll() {
for (File storageLocation : storageLocations) {
FileUtils.deleteRecursively(storageLocation);
}
super.beforeAll();
}
@Override
public void afterAll() {
super.afterAll();
for (File storageLocation : storageLocations) {
FileUtils.deleteRecursively(storageLocation);
}
}
}
We highly recommend including these specifications in your test suite, as they cover a broad range of cases you might have otherwise forgotten to test for when writing a plugin from scratch.
Pre-packaged plugins
Local LevelDB journal
The default journal plugin is akka.persistence.journal.leveldb
which writes messages to a local LevelDB
instance. The default location of the LevelDB files is a directory named journal
in the current working
directory. This location can be changed by configuration where the specified path can be relative or absolute:
akka.persistence.journal.leveldb.dir = "target/journal"
With this plugin, each actor system runs its own private LevelDB instance.
Local snapshot store
The default snapshot store plugin is akka.persistence.snapshot-store.local
. It writes snapshot files to
the local filesystem. The default storage location is a directory named snapshots
in the current working
directory. This can be changed by configuration where the specified path can be relative or absolute:
akka.persistence.snapshot-store.local.dir = "target/snapshots"
Custom serialization
Serialization of snapshots and payloads of Persistent
messages is configurable with Akka's
Serialization infrastructure. For example, if an application wants to serialize
- payloads of type
MyPayload
with a customMyPayloadSerializer
and - snapshots of type
MySnapshot
with a customMySnapshotSerializer
it must add
akka.actor {
serializers {
my-payload = "docs.persistence.MyPayloadSerializer"
my-snapshot = "docs.persistence.MySnapshotSerializer"
}
serialization-bindings {
"docs.persistence.MyPayload" = my-payload
"docs.persistence.MySnapshot" = my-snapshot
}
}
to the application configuration. If not specified, a default serializer is used.
Testing
When running tests with LevelDB default settings in sbt
, make sure to set fork := true
in your sbt project
otherwise, you'll see an UnsatisfiedLinkError
. Alternatively, you can switch to a LevelDB Java port by setting
akka.persistence.journal.leveldb.native = off
or
akka.persistence.journal.leveldb-shared.store.native = off
in your Akka configuration. The LevelDB Java port is for testing purposes only.
Configuration
There are several configuration properties for the persistence module, please refer to the reference configuration.
Contents