New to Akka? Start with the Akka SDK.

Journal plugin

The journal plugin enables storing and loading events for event sourced persistent actors.

Schema

The event_journal table and event_journal_slice_idx index need to be created in the configured database, see schema definition in Creating the schema.

The event_journal_slice_idx index is only needed if the slice based queries are used.

Configuration

To enable the journal plugin to be used by default, add the following line to your Akka application.conf:

akka.persistence.journal.plugin = "akka.persistence.r2dbc.journal"

It can also be enabled with the journalPluginId for a specific EventSourcedBehavior and multiple plugin configurations are supported.

See also Connection configuration.

Reference configuration

The following can be overridden in your application.conf for the journal specific settings:

sourceakka.persistence.r2dbc {
  journal {
    class = "akka.persistence.r2dbc.journal.R2dbcJournal"

    # name of the table to use for events
    table = "event_journal"

    # the column type to use for event payloads (BYTEA or JSONB)
    payload-column-type = "BYTEA"

    # Otherwise it would be a pinned dispatcher, see https://github.com/akka/akka/issues/31058
    plugin-dispatcher = "akka.actor.default-dispatcher"

    # event replay is using akka.persistence.r2dbc.query.buffer-size

    # Set this to off to disable publishing of of events as Akka messages to running
    # eventsBySlices queries.
    # Tradeoff is more CPU and network resources that are used. The events
    # must still be retrieved from the database, but at a lower polling frequency,
    # because delivery of published messages are not guaranteed.
    # When this feature is enabled it will measure the throughput and automatically
    # disable/enable if the throughput exceeds the configured threshold. See
    # publish-events-dynamic configuration.
    publish-events = on

    # When publish-events is enabled it will measure the throughput and automatically
    # disable/enable if the throughput exceeds the configured threshold.
    # This configuration cannot be defined per journal, but is global for the ActorSystem.
    publish-events-dynamic {
      # If exponentially weighted moving average of measured throughput exceeds this
      # threshold publishing of events is disabled. It is enabled again when lower than
      # the threshold.
      throughput-threshold = 400
      # The interval of the throughput measurements.
      throughput-collect-interval = 10 seconds
    }

    # Group the slices for an entity type into this number of topics. Most efficient is to use
    # the same number as number of projection instances. If configured to less than the number of
    # of projection instances the overhead is that events will be sent more than once and discarded
    # on the destination side. If configured to more than the number of projection instances
    # the events will only be sent once but there is a risk of exceeding the limits of number
    # of topics that PubSub can handle (e.g. OversizedPayloadException).
    # Must be between 1 and 1024 and a whole number divisor of 1024 (number of slices).
    # This configuration can be changed in a rolling update, but there might be some events
    # that are not delivered via the pub-sub path and instead delivered later by the queries.
    # This configuration cannot be defined per journal, but is global for the ActorSystem.
    publish-events-number-of-topics = 128

    # replay filter not needed for this plugin
    replay-filter.mode = off

    # Extract a field from the event and store in an additional database column.
    # Primary use case is for secondary indexes that can be queried.
    # Each entity type can have several additional columns.
    # The AdditionalColumn implementation may optionally define an ActorSystem
    # constructor parameter.
    additional-columns {
      #"<entity-type-name>" = ["<fqcn of journal AdditionalColumn implementation>"]
    }

    # Use another table for the given entity types. Typically used together with
    # additional-columns but can also be used without addition-columns.
    custom-table {
      #"<entity-type-name>" =  <other_event_journal_table>
    }

  }
}

Event serialization

The events are serialized with Akka Serialization and the binary representation is stored in the event_payload column together with information about what serializer that was used in the event_ser_id and event_ser_manifest columns.

For PostgreSQL the payload is stored as BYTEA type. Alternatively, you can use JSONB column type as described in PostgreSQL JSON.

Deletes

The journal supports deletes through hard deletes, which means the journal entries are actually deleted from the database. There is no materialized view with a copy of the event so make sure to not delete events too early if they are used from projections or queries.

For each persistent id one tombstone record is kept in the event journal when all events of a persistence id have been deleted. The reason for the tombstone record is to keep track of the latest sequence number so that subsequent events don’t reuse the same sequence numbers that have been deleted.

See the EventSourcedCleanup tool for more information about how to delete events, snapshots and tombstone records.

Storing query representation

Events can be looked up by their persistence id. Additional indexed data can be stored alongside each event so the event journal can be queried by a secondary index. The most common use case is a JSONB column that holds application-level metadata you want to query with a GIN or expression index.

The journal supports this through AdditionalColumnAdditionalColumn (configured per entity type). For more elaborate query representations you can use a Projection to derive a separate query representation asynchronously.

Additional columns

For each event being persisted, an AdditionalColumn extracts a field from the event (and its metadata/tags) and binds the value to an extra column on the event journal table. The column can be queried independently of the primary key.

The configuration:

sourceakka.persistence.r2dbc.journal {
  additional-columns {
    "BlogPost" = ["docs.home.journal.EventTitleColumn"]
  }
  custom-table {
    "BlogPost" = event_journal_blogpost
  }
}

For each entity type you can define a list of fully qualified class names of AdditionalColumn implementations. The AdditionalColumn implementation may optionally define an ActorSystem constructor parameter.

An AdditionalColumn that stores the title of an event:

Scala
sourceimport akka.persistence.r2dbc.journal.scaladsl.AdditionalColumn

class EventTitleColumn extends AdditionalColumn[BlogPost.Event, String] {

  override val columnName: String = "title"

  override def bind(insert: AdditionalColumn.Insert[BlogPost.Event]): AdditionalColumn.Binding[String] =
    insert.value match {
      case BlogPost.PostAdded(_, title, _) => AdditionalColumn.BindValue(title)
      case _                               => AdditionalColumn.BindNull
    }
}
Java
sourceimport akka.persistence.r2dbc.journal.javadsl.AdditionalColumn;

public class EventTitleColumn extends AdditionalColumn<BlogPost.Event, String> {
  @Override
  public Class<String> fieldClass() {
    return String.class;
  }

  @Override
  public String columnName() {
    return "title";
  }

  @Override
  public Binding<String> bind(Insert<BlogPost.Event> insert) {
    BlogPost.Event event = insert.value();
    if (event instanceof BlogPost.PostAdded) {
      BlogPost.PostAdded added = (BlogPost.PostAdded) event;
      return AdditionalColumn.bindValue(added.title);
    } else {
      return AdditionalColumn.bindNull();
    }
  }
}

From the bind method you can return one of:

  • AdditionalColumn.BindValueAdditionalColumn.bindValue — bind a value such as a String or Long to the database column
  • AdditionalColumn.BindNullAdditionalColumn.bindNull — store NULL in the database column
  • AdditionalColumn.SkipAdditionalColumn.skip — omit the column from the INSERT; the column’s DEFAULT (usually NULL) is used

The Insert payload passed to bind exposes persistenceId, entityType, slice, seqNr, the deserialized event value, the deserialized event metadata, and the event tags. Use these to build whatever column value the secondary query needs.

You would have to add the additional columns to the event_journal table definition and create the secondary database index. Unless you only have one entity type it is best to define a separate journal table with the custom-table configuration (see Custom journal table per entity type) so the extra column does not need to be added to the shared event_journal table.

A few things to call out:

  • The additional column must be NULL-able. Delete-marker (tombstone) rows do not invoke bind, so they leave the column at its DEFAULT.
  • Within an atomic persist of several events, all must produce the same shape (same Skip/BindNull/BindValue decisions per column). A mismatch fails the write fast with an IllegalArgumentException.

The events can be found by the additional column and deserialized like this:

Scala
sourceimport scala.concurrent.Future

import akka.actor.typed.ActorSystem
import akka.persistence.r2dbc.session.scaladsl.R2dbcSession
import akka.serialization.SerializationExtension

class BlogPostQuery(system: ActorSystem[_]) {

  private val findByTitleSql =
    "SELECT persistence_id, seq_nr, event_ser_id, event_ser_manifest, event_payload " +
    "FROM event_journal_blogpost " +
    "WHERE title = $1 AND deleted = false " +
    "ORDER BY persistence_id, seq_nr"

  def findByTitle(title: String): Future[IndexedSeq[BlogPost.Event]] = {
    R2dbcSession.withSession(system) { session =>
      session.select(session.createStatement(findByTitleSql).bind(0, title)) { row =>
        val serializerId = row.get("event_ser_id", classOf[java.lang.Integer])
        val serializerManifest = row.get("event_ser_manifest", classOf[String])
        val payload = row.get("event_payload", classOf[Array[Byte]])
        val event = SerializationExtension(system)
          .deserialize(payload, serializerId, serializerManifest)
          .get
          .asInstanceOf[BlogPost.Event]
        event
      }
    }
  }

}
Java
sourceimport akka.actor.typed.ActorSystem;
import akka.persistence.r2dbc.session.javadsl.R2dbcSession;
import akka.serialization.SerializationExtension;
import io.r2dbc.spi.Statement;
import java.util.List;
import java.util.concurrent.CompletionStage;

public class BlogPostQuery {
  private final ActorSystem<?> system;

  public BlogPostQuery(ActorSystem<?> system) {
    this.system = system;
  }

  private final String findByTitleSql =
      "SELECT persistence_id, seq_nr, event_ser_id, event_ser_manifest, event_payload "
          + "FROM event_journal_blogpost "
          + "WHERE title = $1 AND deleted = false "
          + "ORDER BY persistence_id, seq_nr";

  public CompletionStage<List<BlogPost.Event>> findByTitle(String title) {
    return R2dbcSession.withSession(
        system,
        session -> {
          Statement stmt = session.createStatement(findByTitleSql).bind(0, title);
          return session.select(
              stmt,
              row -> {
                int serializerId = row.get("event_ser_id", Integer.class);
                String serializerManifest = row.get("event_ser_manifest", String.class);
                byte[] payload = row.get("event_payload", byte[].class);
                BlogPost.Event event =
                    (BlogPost.Event)
                        SerializationExtension.get(system)
                            .deserialize(payload, serializerId, serializerManifest)
                            .get();
                return event;
              });
        });
  }
}

Additional column as PostgreSQL JSON

With PostgreSQL the additional column type can be JSONB to take advantage of PostgreSQL support for JSON Types. Wrap the string or byte array representation of the JSON in io.r2dbc.postgresql.codec.Json when binding the value:

Scala
sourceimport io.r2dbc.postgresql.codec.Json

class EventJsonColumn extends AdditionalColumn[BlogPost.Event, Json] {

  override val columnName: String = "event_json"

  override def bind(insert: AdditionalColumn.Insert[BlogPost.Event]): AdditionalColumn.Binding[Json] =
    insert.value match {
      case BlogPost.PostAdded(_, title, _) =>
        // a json library would be used here
        val jsonString = s"""{"title": "$title", "published": false}"""
        AdditionalColumn.BindValue(Json.of(jsonString))
      case _: BlogPost.Published =>
        val jsonString = """{"published": true}"""
        AdditionalColumn.BindValue(Json.of(jsonString))
      case _ =>
        AdditionalColumn.BindNull
    }
}
Java
sourceimport io.r2dbc.postgresql.codec.Json;

public class EventJsonColumn extends AdditionalColumn<BlogPost.Event, Json> {
  @Override
  public Class<Json> fieldClass() {
    return Json.class;
  }

  @Override
  public String columnName() {
    return "event_json";
  }

  @Override
  public Binding<Json> bind(Insert<BlogPost.Event> insert) {
    BlogPost.Event event = insert.value();
    if (event instanceof BlogPost.PostAdded) {
      BlogPost.PostAdded added = (BlogPost.PostAdded) event;
      // a json library would be used here
      String jsonString = "{\"title\": \"" + added.title + "\", \"published\": false}";
      return AdditionalColumn.bindValue(Json.of(jsonString));
    } else if (event instanceof BlogPost.Published) {
      String jsonString = "{\"published\": true}";
      return AdditionalColumn.bindValue(Json.of(jsonString));
    } else {
      return AdditionalColumn.bindNull();
    }
  }
}

Add a JSONB column and a GIN index to the journal table, then query with the usual column @> '{...}'::jsonb or column->>'field' = ? operators.

See also PostgreSQL JSON.

Additional column schema with H2

As H2 runs in-process it is not possible to create the schema up front, so additional columns to the journal table can be created through the additional-init setting:

sourceakka.persistence.r2dbc.connection-factory = $${akka.persistence.r2dbc.h2}
akka.persistence.r2dbc.connection-factory {
  protocol = "mem"
  database = "h2-journal-additional-init-db"
  additional-init = "alter table event_journal add if not exists title varchar(256)"
}

Custom journal table per entity type

For each entity type you can route writes and reads to a separate journal table. This is the recommended approach when the additional columns for an entity type should not be added to the shared event_journal table.

akka.persistence.r2dbc.journal {
  custom-table {
    "BlogPost" = event_journal_blogpost
  }
}

The custom table must have the same column layout as the default event_journal table, plus any additional columns the entity type needs. Slice indexes, schema prefix, and data partition suffixes are applied the same way as on the default table.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.