New to Akka? Start with the Akka SDK.
Journal plugin
The journal plugin enables storing and loading events for event sourced persistent actors.
Schema
The event_journal table and event_journal_slice_idx index need to be created in the configured database, see schema definition in Creating the schema.
The event_journal_slice_idx index is only needed if the slice based queries are used.
Configuration
To enable the journal plugin to be used by default, add the following line to your Akka application.conf:
akka.persistence.journal.plugin = "akka.persistence.r2dbc.journal"
It can also be enabled with the journalPluginId for a specific EventSourcedBehavior and multiple plugin configurations are supported.
See also Connection configuration.
Reference configuration
The following can be overridden in your application.conf for the journal specific settings:
sourceakka.persistence.r2dbc {
journal {
class = "akka.persistence.r2dbc.journal.R2dbcJournal"
# name of the table to use for events
table = "event_journal"
# the column type to use for event payloads (BYTEA or JSONB)
payload-column-type = "BYTEA"
# Otherwise it would be a pinned dispatcher, see https://github.com/akka/akka/issues/31058
plugin-dispatcher = "akka.actor.default-dispatcher"
# event replay is using akka.persistence.r2dbc.query.buffer-size
# Set this to off to disable publishing of of events as Akka messages to running
# eventsBySlices queries.
# Tradeoff is more CPU and network resources that are used. The events
# must still be retrieved from the database, but at a lower polling frequency,
# because delivery of published messages are not guaranteed.
# When this feature is enabled it will measure the throughput and automatically
# disable/enable if the throughput exceeds the configured threshold. See
# publish-events-dynamic configuration.
publish-events = on
# When publish-events is enabled it will measure the throughput and automatically
# disable/enable if the throughput exceeds the configured threshold.
# This configuration cannot be defined per journal, but is global for the ActorSystem.
publish-events-dynamic {
# If exponentially weighted moving average of measured throughput exceeds this
# threshold publishing of events is disabled. It is enabled again when lower than
# the threshold.
throughput-threshold = 400
# The interval of the throughput measurements.
throughput-collect-interval = 10 seconds
}
# Group the slices for an entity type into this number of topics. Most efficient is to use
# the same number as number of projection instances. If configured to less than the number of
# of projection instances the overhead is that events will be sent more than once and discarded
# on the destination side. If configured to more than the number of projection instances
# the events will only be sent once but there is a risk of exceeding the limits of number
# of topics that PubSub can handle (e.g. OversizedPayloadException).
# Must be between 1 and 1024 and a whole number divisor of 1024 (number of slices).
# This configuration can be changed in a rolling update, but there might be some events
# that are not delivered via the pub-sub path and instead delivered later by the queries.
# This configuration cannot be defined per journal, but is global for the ActorSystem.
publish-events-number-of-topics = 128
# replay filter not needed for this plugin
replay-filter.mode = off
# Extract a field from the event and store in an additional database column.
# Primary use case is for secondary indexes that can be queried.
# Each entity type can have several additional columns.
# The AdditionalColumn implementation may optionally define an ActorSystem
# constructor parameter.
additional-columns {
#"<entity-type-name>" = ["<fqcn of journal AdditionalColumn implementation>"]
}
# Use another table for the given entity types. Typically used together with
# additional-columns but can also be used without addition-columns.
custom-table {
#"<entity-type-name>" = <other_event_journal_table>
}
}
}
Event serialization
The events are serialized with Akka Serialization and the binary representation is stored in the event_payload column together with information about what serializer that was used in the event_ser_id and event_ser_manifest columns.
For PostgreSQL the payload is stored as BYTEA type. Alternatively, you can use JSONB column type as described in PostgreSQL JSON.
Deletes
The journal supports deletes through hard deletes, which means the journal entries are actually deleted from the database. There is no materialized view with a copy of the event so make sure to not delete events too early if they are used from projections or queries.
For each persistent id one tombstone record is kept in the event journal when all events of a persistence id have been deleted. The reason for the tombstone record is to keep track of the latest sequence number so that subsequent events don’t reuse the same sequence numbers that have been deleted.
See the EventSourcedCleanup tool for more information about how to delete events, snapshots and tombstone records.
Storing query representation
Events can be looked up by their persistence id. Additional indexed data can be stored alongside each event so the event journal can be queried by a secondary index. The most common use case is a JSONB column that holds application-level metadata you want to query with a GIN or expression index.
The journal supports this through AdditionalColumnAdditionalColumn (configured per entity type). For more elaborate query representations you can use a Projection to derive a separate query representation asynchronously.
Additional columns
For each event being persisted, an AdditionalColumn extracts a field from the event (and its metadata/tags) and binds the value to an extra column on the event journal table. The column can be queried independently of the primary key.
The configuration:
sourceakka.persistence.r2dbc.journal {
additional-columns {
"BlogPost" = ["docs.home.journal.EventTitleColumn"]
}
custom-table {
"BlogPost" = event_journal_blogpost
}
}
For each entity type you can define a list of fully qualified class names of AdditionalColumn implementations. The AdditionalColumn implementation may optionally define an ActorSystem constructor parameter.
An AdditionalColumn that stores the title of an event:
- Scala
-
source
import akka.persistence.r2dbc.journal.scaladsl.AdditionalColumn class EventTitleColumn extends AdditionalColumn[BlogPost.Event, String] { override val columnName: String = "title" override def bind(insert: AdditionalColumn.Insert[BlogPost.Event]): AdditionalColumn.Binding[String] = insert.value match { case BlogPost.PostAdded(_, title, _) => AdditionalColumn.BindValue(title) case _ => AdditionalColumn.BindNull } } - Java
-
source
import akka.persistence.r2dbc.journal.javadsl.AdditionalColumn; public class EventTitleColumn extends AdditionalColumn<BlogPost.Event, String> { @Override public Class<String> fieldClass() { return String.class; } @Override public String columnName() { return "title"; } @Override public Binding<String> bind(Insert<BlogPost.Event> insert) { BlogPost.Event event = insert.value(); if (event instanceof BlogPost.PostAdded) { BlogPost.PostAdded added = (BlogPost.PostAdded) event; return AdditionalColumn.bindValue(added.title); } else { return AdditionalColumn.bindNull(); } } }
From the bind method you can return one of:
AdditionalColumn.BindValueAdditionalColumn.bindValue— bind a value such as aStringorLongto the database columnAdditionalColumn.BindNullAdditionalColumn.bindNull— storeNULLin the database columnAdditionalColumn.SkipAdditionalColumn.skip— omit the column from the INSERT; the column’sDEFAULT(usuallyNULL) is used
The Insert payload passed to bind exposes persistenceId, entityType, slice, seqNr, the deserialized event value, the deserialized event metadata, and the event tags. Use these to build whatever column value the secondary query needs.
You would have to add the additional columns to the event_journal table definition and create the secondary database index. Unless you only have one entity type it is best to define a separate journal table with the custom-table configuration (see Custom journal table per entity type) so the extra column does not need to be added to the shared event_journal table.
A few things to call out:
- The additional column must be
NULL-able. Delete-marker (tombstone) rows do not invokebind, so they leave the column at itsDEFAULT. - Within an atomic persist of several events, all must produce the same shape (same
Skip/BindNull/BindValuedecisions per column). A mismatch fails the write fast with anIllegalArgumentException.
The events can be found by the additional column and deserialized like this:
- Scala
-
source
import scala.concurrent.Future import akka.actor.typed.ActorSystem import akka.persistence.r2dbc.session.scaladsl.R2dbcSession import akka.serialization.SerializationExtension class BlogPostQuery(system: ActorSystem[_]) { private val findByTitleSql = "SELECT persistence_id, seq_nr, event_ser_id, event_ser_manifest, event_payload " + "FROM event_journal_blogpost " + "WHERE title = $1 AND deleted = false " + "ORDER BY persistence_id, seq_nr" def findByTitle(title: String): Future[IndexedSeq[BlogPost.Event]] = { R2dbcSession.withSession(system) { session => session.select(session.createStatement(findByTitleSql).bind(0, title)) { row => val serializerId = row.get("event_ser_id", classOf[java.lang.Integer]) val serializerManifest = row.get("event_ser_manifest", classOf[String]) val payload = row.get("event_payload", classOf[Array[Byte]]) val event = SerializationExtension(system) .deserialize(payload, serializerId, serializerManifest) .get .asInstanceOf[BlogPost.Event] event } } } } - Java
-
source
import akka.actor.typed.ActorSystem; import akka.persistence.r2dbc.session.javadsl.R2dbcSession; import akka.serialization.SerializationExtension; import io.r2dbc.spi.Statement; import java.util.List; import java.util.concurrent.CompletionStage; public class BlogPostQuery { private final ActorSystem<?> system; public BlogPostQuery(ActorSystem<?> system) { this.system = system; } private final String findByTitleSql = "SELECT persistence_id, seq_nr, event_ser_id, event_ser_manifest, event_payload " + "FROM event_journal_blogpost " + "WHERE title = $1 AND deleted = false " + "ORDER BY persistence_id, seq_nr"; public CompletionStage<List<BlogPost.Event>> findByTitle(String title) { return R2dbcSession.withSession( system, session -> { Statement stmt = session.createStatement(findByTitleSql).bind(0, title); return session.select( stmt, row -> { int serializerId = row.get("event_ser_id", Integer.class); String serializerManifest = row.get("event_ser_manifest", String.class); byte[] payload = row.get("event_payload", byte[].class); BlogPost.Event event = (BlogPost.Event) SerializationExtension.get(system) .deserialize(payload, serializerId, serializerManifest) .get(); return event; }); }); } }
Additional column as PostgreSQL JSON
With PostgreSQL the additional column type can be JSONB to take advantage of PostgreSQL support for JSON Types. Wrap the string or byte array representation of the JSON in io.r2dbc.postgresql.codec.Json when binding the value:
- Scala
-
source
import io.r2dbc.postgresql.codec.Json class EventJsonColumn extends AdditionalColumn[BlogPost.Event, Json] { override val columnName: String = "event_json" override def bind(insert: AdditionalColumn.Insert[BlogPost.Event]): AdditionalColumn.Binding[Json] = insert.value match { case BlogPost.PostAdded(_, title, _) => // a json library would be used here val jsonString = s"""{"title": "$title", "published": false}""" AdditionalColumn.BindValue(Json.of(jsonString)) case _: BlogPost.Published => val jsonString = """{"published": true}""" AdditionalColumn.BindValue(Json.of(jsonString)) case _ => AdditionalColumn.BindNull } } - Java
-
source
import io.r2dbc.postgresql.codec.Json; public class EventJsonColumn extends AdditionalColumn<BlogPost.Event, Json> { @Override public Class<Json> fieldClass() { return Json.class; } @Override public String columnName() { return "event_json"; } @Override public Binding<Json> bind(Insert<BlogPost.Event> insert) { BlogPost.Event event = insert.value(); if (event instanceof BlogPost.PostAdded) { BlogPost.PostAdded added = (BlogPost.PostAdded) event; // a json library would be used here String jsonString = "{\"title\": \"" + added.title + "\", \"published\": false}"; return AdditionalColumn.bindValue(Json.of(jsonString)); } else if (event instanceof BlogPost.Published) { String jsonString = "{\"published\": true}"; return AdditionalColumn.bindValue(Json.of(jsonString)); } else { return AdditionalColumn.bindNull(); } } }
Add a JSONB column and a GIN index to the journal table, then query with the usual column @> '{...}'::jsonb or column->>'field' = ? operators.
See also PostgreSQL JSON.
Additional column schema with H2
As H2 runs in-process it is not possible to create the schema up front, so additional columns to the journal table can be created through the additional-init setting:
sourceakka.persistence.r2dbc.connection-factory = $${akka.persistence.r2dbc.h2}
akka.persistence.r2dbc.connection-factory {
protocol = "mem"
database = "h2-journal-additional-init-db"
additional-init = "alter table event_journal add if not exists title varchar(256)"
}
Custom journal table per entity type
For each entity type you can route writes and reads to a separate journal table. This is the recommended approach when the additional columns for an entity type should not be added to the shared event_journal table.
akka.persistence.r2dbc.journal {
custom-table {
"BlogPost" = event_journal_blogpost
}
}
The custom table must have the same column layout as the default event_journal table, plus any additional columns the entity type needs. Slice indexes, schema prefix, and data partition suffixes are applied the same way as on the default table.