Events from Akka Persistence
A typical source for Projections is events stored with EventSourcedBehavior
EventSourcedBehavior
in Akka Persistence. Events can be tagged and then consumed with the eventsByTag query.
Akka Projections has integration with eventsByTag
, which is described here.
Dependencies
The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.
- sbt
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
- Maven
<project> ... <repositories> <repository> <id>akka-repository</id> <name>Akka library repository</name> <url>https://repo.akka.io/maven</url> </repository> </repositories> </project>
- Gradle
repositories { mavenCentral() maven { url "https://repo.akka.io/maven" } }
To use the Event Sourced module of Akka Projections add the following dependency in your project:
- sbt
libraryDependencies += "com.lightbend.akka" %% "akka-projection-eventsourced" % "1.4.2"
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-projection-eventsourced_${scala.binary.version}</artifactId> <version>1.4.2</version> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation "com.lightbend.akka:akka-projection-eventsourced_${versions.ScalaBinary}:1.4.2" }
Akka Projections require Akka 2.8.1 or later, see Akka version.
Project Info: Akka Projections Eventsourced | |
---|---|
Artifact | com.lightbend.akka
akka-projection-eventsourced
1.4.2
|
JDK versions | AdoptOpenJDK 8 AdoptOpenJDK 11 |
Scala versions | 2.13.10, 2.12.17, 3.2.2 |
JPMS module name | akka.projection.eventsourced |
License | |
Readiness level | Supported, Lightbend Subscription provides support
Since 1.0.0, 2020-09-10
|
Home page | https://akka.io |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | GitHub issues |
Sources | https://github.com/akka/akka-projection |
Transitive dependencies
The table below shows akka-projection-eventsourced
’s direct dependencies and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version com.lightbend.akka akka-projection-core_2.13 1.4.2 com.typesafe.akka akka-persistence-query_2.13 2.8.1 org.scala-lang scala-library 2.13.10 - Dependency tree
com.lightbend.akka akka-projection-core_2.13 1.4.2 com.typesafe.akka akka-actor-typed_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-slf4j_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.slf4j slf4j-api 1.7.36 org.scala-lang scala-library 2.13.10 Apache-2.0 org.slf4j slf4j-api 1.7.36 com.typesafe.akka akka-persistence-query_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-persistence_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.6.1 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-stream_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.6.1 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-stream_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.6.1 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-persistence-query_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-persistence_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.6.1 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-stream_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.6.1 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0
SourceProvider for eventsByTag
A SourceProvider
SourceProvider
defines the source of the event envelopes that the Projection
will process. A SourceProvider
for the eventsByTag
query can be defined with the EventSourcedProvider
EventSourcedProvider
like this:
- Scala
-
source
import akka.projection.eventsourced.EventEnvelope import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal import akka.persistence.query.Offset import akka.projection.eventsourced.scaladsl.EventSourcedProvider import akka.projection.scaladsl.SourceProvider val sourceProvider: SourceProvider[Offset, EventEnvelope[ShoppingCart.Event]] = EventSourcedProvider .eventsByTag[ShoppingCart.Event](system, readJournalPluginId = CassandraReadJournal.Identifier, tag = "carts-1")
- Java
-
source
import akka.persistence.cassandra.query.javadsl.CassandraReadJournal; import akka.persistence.query.Offset; import akka.projection.eventsourced.EventEnvelope; import akka.projection.eventsourced.javadsl.EventSourcedProvider; import akka.projection.javadsl.SourceProvider; SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider = EventSourcedProvider.eventsByTag(system, CassandraReadJournal.Identifier(), "carts-1");
This example is using the Cassandra plugin for Akka Persistence, but same code can be used for other Akka Persistence plugins by replacing the CassandraReadJournal.Identifier
. For example the JDBC plugin can be used. You will use the same plugin as you have configured for the write side that is used by the EventSourcedBehavior
.
This source is consuming all events from the ShoppingCart
EventSourcedBehavior
that are tagged with "cart-1"
.
The tags are assigned as described in Tagging Events in EventSourcedBehavior.
The EventEnvelope[ShoppingCart.Event]
EventEnvelope<ShoppingCart.Event>
is what the Projection
handler will process. It contains the Event
and additional meta data, such as the offset that will be stored by the Projection
. See EventEnvelope
EventEnvelope
for full details of what the envelope contains.
SourceProvider for eventsBySlices
A SourceProvider
SourceProvider
defines the source of the event envelopes that the Projection
will process. A SourceProvider
for the eventsBySlices
query can be defined with the EventSourcedProvider
EventSourcedProvider
like this:
- Scala
-
source
import akka.persistence.query.typed.EventEnvelope import akka.persistence.query.Offset import akka.projection.eventsourced.scaladsl.EventSourcedProvider import akka.projection.scaladsl.SourceProvider // Slit the slices into 4 ranges val numberOfSliceRanges: Int = 4 val sliceRanges = EventSourcedProvider.sliceRanges(system, R2dbcReadJournal.Identifier, numberOfSliceRanges) // Example of using the first slice range val minSlice: Int = sliceRanges.head.min val maxSlice: Int = sliceRanges.head.max val entityType: String = "ShoppingCart" val sourceProvider: SourceProvider[Offset, EventEnvelope[ShoppingCart.Event]] = EventSourcedProvider .eventsBySlices[ShoppingCart.Event]( system, readJournalPluginId = R2dbcReadJournal.Identifier, entityType, minSlice, maxSlice)
- Java
-
source
import akka.japi.Pair; import akka.persistence.query.Offset; import akka.persistence.query.typed.EventEnvelope; import akka.projection.eventsourced.javadsl.EventSourcedProvider; import akka.projection.javadsl.SourceProvider; // Slit the slices into 4 ranges int numberOfSliceRanges = 4; List<Pair<Integer, Integer>> sliceRanges = EventSourcedProvider.sliceRanges( system, R2dbcReadJournal.Identifier(), numberOfSliceRanges); // Example of using the first slice range int minSlice = sliceRanges.get(0).first(); int maxSlice = sliceRanges.get(0).second(); String entityType = "MyEntity"; SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider = EventSourcedProvider.eventsBySlices( system, R2dbcReadJournal.Identifier(), entityType, minSlice, maxSlice);
This example is using the R2DBC plugin for Akka Persistence. You will use the same plugin as you have configured for the write side that is used by the EventSourcedBehavior
.
This source is consuming all events from the ShoppingCart
EventSourcedBehavior
for the given slice range. In a production application, you would need to start as many instances as the number of slice ranges. That way you consume the events from all entities.
The EventEnvelope[ShoppingCart.Event]
EventEnvelope<ShoppingCart.Event>
is what the Projection
handler will process. It contains the Event
and additional meta data, such as the offset that will be stored by the Projection
. See EventEnvelope
EventEnvelope
for full details of what the envelope contains.