Persistence Query

How to get the ReadJournal

The ReadJournal is retrieved via the akka.persistence.query.PersistenceQuery extension:

Scala
sourceimport akka.persistence.query.PersistenceQuery
import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal

val readJournal: JdbcReadJournal =
  PersistenceQuery(system).readJournalFor[JdbcReadJournal](JdbcReadJournal.Identifier)
Java
sourceimport akka.persistence.query.*;
import akka.persistence.jdbc.query.javadsl.JdbcReadJournal;

final JdbcReadJournal readJournal = PersistenceQuery.get(system)
        .getReadJournalFor(JdbcReadJournal.class, JdbcReadJournal.Identifier());

Persistence Query

The plugin supports the following queries:

AllPersistenceIdsQuery and CurrentPersistenceIdsQuery

allPersistenceIds and currentPersistenceIds are used for retrieving all persistenceIds of all persistent actors.

Scala
sourceimport akka.stream.scaladsl.Source
import akka.persistence.query.PersistenceQuery
import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal

val readJournal: JdbcReadJournal =
  PersistenceQuery(system).readJournalFor[JdbcReadJournal](JdbcReadJournal.Identifier)

val willNotCompleteTheStream: Source[String, NotUsed] = readJournal.persistenceIds()

val willCompleteTheStream: Source[String, NotUsed] = readJournal.currentPersistenceIds()
Java
sourceimport akka.stream.javadsl.Source;
import akka.persistence.query.PersistenceQuery;
import akka.persistence.jdbc.query.javadsl.JdbcReadJournal;

JdbcReadJournal readJournal = PersistenceQuery.get(system)
        .getReadJournalFor(JdbcReadJournal.class, JdbcReadJournal.Identifier());

Source<String, NotUsed> willNotCompleteTheStream = readJournal.persistenceIds();

Source<String, NotUsed> willCompleteTheStream = readJournal.currentPersistenceIds();

The returned event stream is unordered and you can expect different order for multiple executions of the query.

When using the persistenceIds query, the stream is not completed when it reaches the end of the currently used persistenceIds, but it continues to push new persistenceIds when new persistent actors are created.

When using the currentPersistenceIds query, the stream is completed when the end of the current list of persistenceIds is reached, thus it is not a live query.

The stream is completed with failure if there is a failure in executing the query in the backend journal.

EventsByPersistenceIdQuery and CurrentEventsByPersistenceIdQuery

eventsByPersistenceId and currentEventsByPersistenceId is used for retrieving events for a specific PersistentActor identified by persistenceId.

Scala
sourceimport akka.stream.scaladsl.Source
import akka.persistence.query.{ EventEnvelope, PersistenceQuery }
import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal

val readJournal: JdbcReadJournal =
  PersistenceQuery(system).readJournalFor[JdbcReadJournal](JdbcReadJournal.Identifier)

val willNotCompleteTheStream: Source[EventEnvelope, NotUsed] =
  readJournal.eventsByPersistenceId("some-persistence-id", 0L, Long.MaxValue)

val willCompleteTheStream: Source[EventEnvelope, NotUsed] =
  readJournal.currentEventsByPersistenceId("some-persistence-id", 0L, Long.MaxValue)
Java
sourceimport akka.stream.javadsl.Source;
import akka.persistence.query.PersistenceQuery;
import akka.persistence.query.EventEnvelope;
import akka.persistence.jdbc.query.javadsl.JdbcReadJournal;

JdbcReadJournal readJournal = PersistenceQuery.get(system)
        .getReadJournalFor(JdbcReadJournal.class, JdbcReadJournal.Identifier());

Source<EventEnvelope, NotUsed> willNotCompleteTheStream = readJournal
        .eventsByPersistenceId("some-persistence-id", 0L, Long.MAX_VALUE);

Source<EventEnvelope, NotUsed> willCompleteTheStream = readJournal
        .currentEventsByPersistenceId("some-persistence-id", 0L, Long.MAX_VALUE);

You can retrieve a subset of all events by specifying fromSequenceNr and toSequenceNr or use 0L and Long.MaxValue respectively to retrieve all events. Note that the corresponding sequence number of each event is provided in the EventEnvelope, which makes it possible to resume the stream at a later point from a given sequence number.

The returned event stream is ordered by sequence number, i.e. the same order as the PersistentActor persisted the events. The same prefix of stream elements (in same order) are returned for multiple executions of the query, except for when events have been deleted.

The stream is completed with failure if there is a failure in executing the query in the backend journal.

EventsByTag and CurrentEventsByTag

eventsByTag and currentEventsByTag are used for retrieving events that were marked with a given tag, e.g. all domain events of an Aggregate Root type.

Scala
sourceimport akka.stream.scaladsl.Source
import akka.persistence.query.{ EventEnvelope, PersistenceQuery }
import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal

val readJournal: JdbcReadJournal =
  PersistenceQuery(system).readJournalFor[JdbcReadJournal](JdbcReadJournal.Identifier)

val willNotCompleteTheStream: Source[EventEnvelope, NotUsed] = readJournal.eventsByTag("apple", 0L)

val willCompleteTheStream: Source[EventEnvelope, NotUsed] = readJournal.currentEventsByTag("apple", 0L)
Java
sourceimport akka.stream.javadsl.Source;
import akka.persistence.query.PersistenceQuery;
import akka.persistence.query.EventEnvelope;
import akka.persistence.jdbc.query.javadsl.JdbcReadJournal;

JdbcReadJournal readJournal = PersistenceQuery.get(system)
        .getReadJournalFor(JdbcReadJournal.class, JdbcReadJournal.Identifier());

Source<EventEnvelope, NotUsed> willNotCompleteTheStream = readJournal
        .eventsByTag("apple", Offset.sequence(0L));

Source<EventEnvelope, NotUsed> willCompleteTheStream = readJournal
        .currentEventsByTag("apple", Offset.sequence(0L));
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.