Events from Akka Persistence

A typical source for Projections is events stored with EventSourcedBehaviorEventSourcedBehavior in Akka Persistence. Events can be tagged and then consumed with the eventsByTag query.

Akka Projections has integration with eventsByTag, which is described here.

Dependencies

To use the Event Sourced module of Akka Projections add the following dependency in your project:

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-projection-eventsourced" % "1.1.0"
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>com.lightbend.akka</groupId>
    <artifactId>akka-projection-eventsourced_${scala.binary.version}</artifactId>
    <version>1.1.0</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation "com.lightbend.akka:akka-projection-eventsourced_${versions.ScalaBinary}:1.1.0"
}

Akka Projections require Akka 2.6.10 or later, see Akka version.

Project Info: Akka Projections Eventsourced
Artifact
com.lightbend.akka
akka-projection-eventsourced
1.1.0
JDK versions
AdoptOpenJDK 8
AdoptOpenJDK 11
Scala versions2.13.3, 2.12.11
JPMS module nameakka.projection.eventsourced
License
Readiness level
Since 1.0.0, 2020-09-10
Home pagehttps://akka.io
API documentation
Forums
Release notesGitHub releases
IssuesGitHub issues
Sourceshttps://github.com/akka/akka-projection

Transitive dependencies

The table below shows akka-projection-eventsourced’s direct dependencies and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersion
com.lightbend.akkaakka-projection-core_2.131.1.0
com.typesafe.akkaakka-persistence-query_2.132.6.10
org.scala-langscala-library2.13.3
Dependency tree
com.lightbend.akka    akka-projection-core_2.13    1.1.0
    com.typesafe.akka    akka-actor-typed_2.13    2.6.10    Apache-2.0
        com.typesafe.akka    akka-actor_2.13    2.6.10    Apache-2.0
            com.typesafe    config    1.4.0    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.13    0.9.0    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        com.typesafe.akka    akka-slf4j_2.13    2.6.10    Apache-2.0
            com.typesafe.akka    akka-actor_2.13    2.6.10    Apache-2.0
                com.typesafe    config    1.4.0    Apache-2.0
                org.scala-lang.modules    scala-java8-compat_2.13    0.9.0    Apache-2.0
                    org.scala-lang    scala-library    2.13.3    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.slf4j    slf4j-api    1.7.30
        org.scala-lang    scala-library    2.13.3    Apache-2.0
        org.slf4j    slf4j-api    1.7.30
    com.typesafe.akka    akka-persistence-query_2.13    2.6.10    Apache-2.0
        com.typesafe.akka    akka-persistence_2.13    2.6.10    Apache-2.0
            com.typesafe.akka    akka-actor_2.13    2.6.10    Apache-2.0
                com.typesafe    config    1.4.0    Apache-2.0
                org.scala-lang.modules    scala-java8-compat_2.13    0.9.0    Apache-2.0
                    org.scala-lang    scala-library    2.13.3    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            com.typesafe.akka    akka-stream_2.13    2.6.10    Apache-2.0
                com.typesafe.akka    akka-actor_2.13    2.6.10    Apache-2.0
                    com.typesafe    config    1.4.0    Apache-2.0
                    org.scala-lang.modules    scala-java8-compat_2.13    0.9.0    Apache-2.0
                        org.scala-lang    scala-library    2.13.3    Apache-2.0
                    org.scala-lang    scala-library    2.13.3    Apache-2.0
                com.typesafe.akka    akka-protobuf-v3_2.13    2.6.10    Apache-2.0
                com.typesafe    ssl-config-core_2.13    0.4.2    Apache-2.0
                    com.typesafe    config    1.4.0    Apache-2.0
                    org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                        org.scala-lang    scala-library    2.13.3    Apache-2.0
                    org.scala-lang    scala-library    2.13.3    Apache-2.0
                org.reactivestreams    reactive-streams    1.0.3    CC0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        com.typesafe.akka    akka-stream_2.13    2.6.10    Apache-2.0
            com.typesafe.akka    akka-actor_2.13    2.6.10    Apache-2.0
                com.typesafe    config    1.4.0    Apache-2.0
                org.scala-lang.modules    scala-java8-compat_2.13    0.9.0    Apache-2.0
                    org.scala-lang    scala-library    2.13.3    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            com.typesafe.akka    akka-protobuf-v3_2.13    2.6.10    Apache-2.0
            com.typesafe    ssl-config-core_2.13    0.4.2    Apache-2.0
                com.typesafe    config    1.4.0    Apache-2.0
                org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                    org.scala-lang    scala-library    2.13.3    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.reactivestreams    reactive-streams    1.0.3    CC0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        org.scala-lang    scala-library    2.13.3    Apache-2.0
    com.typesafe.akka    akka-protobuf-v3_2.13    2.6.10    Apache-2.0
    com.typesafe.akka    akka-stream_2.13    2.6.10    Apache-2.0
        com.typesafe.akka    akka-actor_2.13    2.6.10    Apache-2.0
            com.typesafe    config    1.4.0    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.13    0.9.0    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        com.typesafe.akka    akka-protobuf-v3_2.13    2.6.10    Apache-2.0
        com.typesafe    ssl-config-core_2.13    0.4.2    Apache-2.0
            com.typesafe    config    1.4.0    Apache-2.0
            org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        org.reactivestreams    reactive-streams    1.0.3    CC0
        org.scala-lang    scala-library    2.13.3    Apache-2.0
    org.scala-lang    scala-library    2.13.3    Apache-2.0
com.typesafe.akka    akka-persistence-query_2.13    2.6.10    Apache-2.0
    com.typesafe.akka    akka-persistence_2.13    2.6.10    Apache-2.0
        com.typesafe.akka    akka-actor_2.13    2.6.10    Apache-2.0
            com.typesafe    config    1.4.0    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.13    0.9.0    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        com.typesafe.akka    akka-stream_2.13    2.6.10    Apache-2.0
            com.typesafe.akka    akka-actor_2.13    2.6.10    Apache-2.0
                com.typesafe    config    1.4.0    Apache-2.0
                org.scala-lang.modules    scala-java8-compat_2.13    0.9.0    Apache-2.0
                    org.scala-lang    scala-library    2.13.3    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            com.typesafe.akka    akka-protobuf-v3_2.13    2.6.10    Apache-2.0
            com.typesafe    ssl-config-core_2.13    0.4.2    Apache-2.0
                com.typesafe    config    1.4.0    Apache-2.0
                org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                    org.scala-lang    scala-library    2.13.3    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.reactivestreams    reactive-streams    1.0.3    CC0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        org.scala-lang    scala-library    2.13.3    Apache-2.0
    com.typesafe.akka    akka-stream_2.13    2.6.10    Apache-2.0
        com.typesafe.akka    akka-actor_2.13    2.6.10    Apache-2.0
            com.typesafe    config    1.4.0    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.13    0.9.0    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        com.typesafe.akka    akka-protobuf-v3_2.13    2.6.10    Apache-2.0
        com.typesafe    ssl-config-core_2.13    0.4.2    Apache-2.0
            com.typesafe    config    1.4.0    Apache-2.0
            org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        org.reactivestreams    reactive-streams    1.0.3    CC0
        org.scala-lang    scala-library    2.13.3    Apache-2.0
    org.scala-lang    scala-library    2.13.3    Apache-2.0
org.scala-lang    scala-library    2.13.3    Apache-2.0

SourceProvider for eventsByTag

A SourceProviderSourceProvider defines the source of the event envelopes that the Projection will process. A SourceProvider for the eventsByTag query can be defined with the EventSourcedProviderEventSourcedProvider like this:

Scala
import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal
import akka.persistence.query.Offset
import akka.projection.eventsourced.scaladsl.EventSourcedProvider
import akka.projection.scaladsl.SourceProvider

val sourceProvider: SourceProvider[Offset, EventEnvelope[ShoppingCart.Event]] =
  EventSourcedProvider
    .eventsByTag[ShoppingCart.Event](system, readJournalPluginId = CassandraReadJournal.Identifier, tag = "carts-1")
Java
import akka.persistence.cassandra.query.javadsl.CassandraReadJournal;
import akka.persistence.query.Offset;
import akka.projection.eventsourced.EventEnvelope;
import akka.projection.eventsourced.javadsl.EventSourcedProvider;
import akka.projection.javadsl.SourceProvider;

SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider =
    EventSourcedProvider.eventsByTag(system, CassandraReadJournal.Identifier(), "carts-1");

This example is using the Cassandra plugin for Akka Persistence, but same code can be used for other Akka Persistence plugins by replacing the CassandraReadJournal.Identifier. For example the JDBC plugin can be used. You will use the same plugin as you have configured for the write side that is used by the EventSourcedBehavior.

This source is consuming all events from the ShoppingCart EventSourcedBehavior that are tagged with "cart-1".

The tags are assigned as described in Tagging Events in EventSourcedBehavior.

The EventEnvelope[ShoppingCart.Event]EventEnvelope<ShoppingCart.Event> is what the Projection handler will process. It contains the Event and additional meta data, such as the offset that will be stored by the Projection. See EventEnvelopeEventEnvelope for full details of what the envelope contains.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.