Events from Akka Persistence
A typical source for Projections is events stored with EventSourcedBehaviorEventSourcedBehavior in Akka Persistence. Events can be tagged and then consumed with the eventsByTag query.
Akka Projections has integration with eventsByTag, which is described here.
Dependencies
The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.
- sbt
resolvers += "Akka library repository".at("https://repo.akka.io/maven")- Maven
<project> ... <repositories> <repository> <id>akka-repository</id> <name>Akka library repository</name> <url>https://repo.akka.io/maven</url> </repository> </repositories> </project>- Gradle
repositories { mavenCentral() maven { url "https://repo.akka.io/maven" } }
To use the Event Sourced module of Akka Projections add the following dependency in your project:
- sbt
libraryDependencies += "com.lightbend.akka" %% "akka-projection-eventsourced" % "1.4.2"- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-projection-eventsourced_${scala.binary.version}</artifactId> <version>1.4.2</version> </dependency> </dependencies>- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation "com.lightbend.akka:akka-projection-eventsourced_${versions.ScalaBinary}:1.4.2" }
Akka Projections require Akka 2.8.1 or later, see Akka version.
| Project Info: Akka Projections Eventsourced | |
|---|---|
| Artifact | com.lightbend.akka
akka-projection-eventsourced
1.4.2
|
| JDK versions | AdoptOpenJDK 8 AdoptOpenJDK 11 |
| Scala versions | 2.13.10, 2.12.17, 3.2.2 |
| JPMS module name | akka.projection.eventsourced |
| License | |
| Readiness level | Supported, Lightbend Subscription provides support
Since 1.0.0, 2020-09-10
|
| Home page | https://akka.io |
| API documentation | |
| Forums | |
| Release notes | GitHub releases |
| Issues | GitHub issues |
| Sources | https://github.com/akka/akka-projection |
Transitive dependencies
The table below shows akka-projection-eventsourced’s direct dependencies and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version com.lightbend.akka akka-projection-core_2.13 1.4.2 com.typesafe.akka akka-persistence-query_2.13 2.8.1 org.scala-lang scala-library 2.13.10 - Dependency tree
com.lightbend.akka akka-projection-core_2.13 1.4.2 com.typesafe.akka akka-actor-typed_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-slf4j_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.slf4j slf4j-api 1.7.36 org.scala-lang scala-library 2.13.10 Apache-2.0 org.slf4j slf4j-api 1.7.36 com.typesafe.akka akka-persistence-query_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-persistence_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.6.1 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-stream_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.6.1 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-stream_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.6.1 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-persistence-query_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-persistence_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.6.1 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-stream_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.6.1 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0
SourceProvider for eventsByTag
A SourceProviderSourceProvider defines the source of the event envelopes that the Projection will process. A SourceProvider for the eventsByTag query can be defined with the EventSourcedProviderEventSourcedProvider like this:
- Scala
-
source
import akka.projection.eventsourced.EventEnvelope import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal import akka.persistence.query.Offset import akka.projection.eventsourced.scaladsl.EventSourcedProvider import akka.projection.scaladsl.SourceProvider val sourceProvider: SourceProvider[Offset, EventEnvelope[ShoppingCart.Event]] = EventSourcedProvider .eventsByTag[ShoppingCart.Event](system, readJournalPluginId = CassandraReadJournal.Identifier, tag = "carts-1") - Java
-
source
import akka.persistence.cassandra.query.javadsl.CassandraReadJournal; import akka.persistence.query.Offset; import akka.projection.eventsourced.EventEnvelope; import akka.projection.eventsourced.javadsl.EventSourcedProvider; import akka.projection.javadsl.SourceProvider; SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider = EventSourcedProvider.eventsByTag(system, CassandraReadJournal.Identifier(), "carts-1");
This example is using the Cassandra plugin for Akka Persistence, but same code can be used for other Akka Persistence plugins by replacing the CassandraReadJournal.Identifier. For example the JDBC plugin can be used. You will use the same plugin as you have configured for the write side that is used by the EventSourcedBehavior.
This source is consuming all events from the ShoppingCart EventSourcedBehavior that are tagged with "cart-1".
The tags are assigned as described in Tagging Events in EventSourcedBehavior.
The EventEnvelope[ShoppingCart.Event]EventEnvelope<ShoppingCart.Event> is what the Projection handler will process. It contains the Event and additional meta data, such as the offset that will be stored by the Projection. See EventEnvelopeEventEnvelope for full details of what the envelope contains.
SourceProvider for eventsBySlices
A SourceProviderSourceProvider defines the source of the event envelopes that the Projection will process. A SourceProvider for the eventsBySlices query can be defined with the EventSourcedProviderEventSourcedProvider like this:
- Scala
-
source
import akka.persistence.query.typed.EventEnvelope import akka.persistence.query.Offset import akka.projection.eventsourced.scaladsl.EventSourcedProvider import akka.projection.scaladsl.SourceProvider // Slit the slices into 4 ranges val numberOfSliceRanges: Int = 4 val sliceRanges = EventSourcedProvider.sliceRanges(system, R2dbcReadJournal.Identifier, numberOfSliceRanges) // Example of using the first slice range val minSlice: Int = sliceRanges.head.min val maxSlice: Int = sliceRanges.head.max val entityType: String = "ShoppingCart" val sourceProvider: SourceProvider[Offset, EventEnvelope[ShoppingCart.Event]] = EventSourcedProvider .eventsBySlices[ShoppingCart.Event]( system, readJournalPluginId = R2dbcReadJournal.Identifier, entityType, minSlice, maxSlice) - Java
-
source
import akka.japi.Pair; import akka.persistence.query.Offset; import akka.persistence.query.typed.EventEnvelope; import akka.projection.eventsourced.javadsl.EventSourcedProvider; import akka.projection.javadsl.SourceProvider; // Slit the slices into 4 ranges int numberOfSliceRanges = 4; List<Pair<Integer, Integer>> sliceRanges = EventSourcedProvider.sliceRanges( system, R2dbcReadJournal.Identifier(), numberOfSliceRanges); // Example of using the first slice range int minSlice = sliceRanges.get(0).first(); int maxSlice = sliceRanges.get(0).second(); String entityType = "MyEntity"; SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider = EventSourcedProvider.eventsBySlices( system, R2dbcReadJournal.Identifier(), entityType, minSlice, maxSlice);
This example is using the R2DBC plugin for Akka Persistence. You will use the same plugin as you have configured for the write side that is used by the EventSourcedBehavior.
This source is consuming all events from the ShoppingCart EventSourcedBehavior for the given slice range. In a production application, you would need to start as many instances as the number of slice ranges. That way you consume the events from all entities.
The EventEnvelope[ShoppingCart.Event]EventEnvelope<ShoppingCart.Event> is what the Projection handler will process. It contains the Event and additional meta data, such as the offset that will be stored by the Projection. See EventEnvelopeEventEnvelope for full details of what the envelope contains.