Events from Akka Persistence
A typical source for Projections is events stored with EventSourcedBehavior
EventSourcedBehavior
in Akka Persistence. Events can be tagged and then consumed with the eventsByTag query.
Akka Projections has integration with eventsByTag
, which is described here.
Dependencies
To use the Event Sourced module of Akka Projections add the following dependency in your project:
- sbt
libraryDependencies += "com.lightbend.akka" %% "akka-projection-eventsourced" % "1.1.0"
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-projection-eventsourced_${scala.binary.version}</artifactId> <version>1.1.0</version> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation "com.lightbend.akka:akka-projection-eventsourced_${versions.ScalaBinary}:1.1.0" }
Akka Projections require Akka 2.6.10 or later, see Akka version.
Project Info: Akka Projections Eventsourced | |
---|---|
Artifact | com.lightbend.akka
akka-projection-eventsourced
1.1.0
|
JDK versions | AdoptOpenJDK 8 AdoptOpenJDK 11 |
Scala versions | 2.13.3, 2.12.11 |
JPMS module name | akka.projection.eventsourced |
License | |
Readiness level | Supported, Lightbend Subscription provides support
Since 1.0.0, 2020-09-10
|
Home page | https://akka.io |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | GitHub issues |
Sources | https://github.com/akka/akka-projection |
Transitive dependencies
The table below shows akka-projection-eventsourced
’s direct dependencies and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version com.lightbend.akka akka-projection-core_2.13 1.1.0 com.typesafe.akka akka-persistence-query_2.13 2.6.10 org.scala-lang scala-library 2.13.3 - Dependency tree
com.lightbend.akka akka-projection-core_2.13 1.1.0 com.typesafe.akka akka-actor-typed_2.13 2.6.10 Apache-2.0 com.typesafe.akka akka-actor_2.13 2.6.10 Apache-2.0 com.typesafe config 1.4.0 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 0.9.0 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 com.typesafe.akka akka-slf4j_2.13 2.6.10 Apache-2.0 com.typesafe.akka akka-actor_2.13 2.6.10 Apache-2.0 com.typesafe config 1.4.0 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 0.9.0 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 org.slf4j slf4j-api 1.7.30 org.scala-lang scala-library 2.13.3 Apache-2.0 org.slf4j slf4j-api 1.7.30 com.typesafe.akka akka-persistence-query_2.13 2.6.10 Apache-2.0 com.typesafe.akka akka-persistence_2.13 2.6.10 Apache-2.0 com.typesafe.akka akka-actor_2.13 2.6.10 Apache-2.0 com.typesafe config 1.4.0 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 0.9.0 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.6.10 Apache-2.0 com.typesafe.akka akka-actor_2.13 2.6.10 Apache-2.0 com.typesafe config 1.4.0 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 0.9.0 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.6.10 Apache-2.0 com.typesafe ssl-config-core_2.13 0.4.2 Apache-2.0 com.typesafe config 1.4.0 Apache-2.0 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 org.reactivestreams reactive-streams 1.0.3 CC0 org.scala-lang scala-library 2.13.3 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.6.10 Apache-2.0 com.typesafe.akka akka-actor_2.13 2.6.10 Apache-2.0 com.typesafe config 1.4.0 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 0.9.0 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.6.10 Apache-2.0 com.typesafe ssl-config-core_2.13 0.4.2 Apache-2.0 com.typesafe config 1.4.0 Apache-2.0 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 org.reactivestreams reactive-streams 1.0.3 CC0 org.scala-lang scala-library 2.13.3 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.6.10 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.6.10 Apache-2.0 com.typesafe.akka akka-actor_2.13 2.6.10 Apache-2.0 com.typesafe config 1.4.0 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 0.9.0 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.6.10 Apache-2.0 com.typesafe ssl-config-core_2.13 0.4.2 Apache-2.0 com.typesafe config 1.4.0 Apache-2.0 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 org.reactivestreams reactive-streams 1.0.3 CC0 org.scala-lang scala-library 2.13.3 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 com.typesafe.akka akka-persistence-query_2.13 2.6.10 Apache-2.0 com.typesafe.akka akka-persistence_2.13 2.6.10 Apache-2.0 com.typesafe.akka akka-actor_2.13 2.6.10 Apache-2.0 com.typesafe config 1.4.0 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 0.9.0 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.6.10 Apache-2.0 com.typesafe.akka akka-actor_2.13 2.6.10 Apache-2.0 com.typesafe config 1.4.0 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 0.9.0 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.6.10 Apache-2.0 com.typesafe ssl-config-core_2.13 0.4.2 Apache-2.0 com.typesafe config 1.4.0 Apache-2.0 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 org.reactivestreams reactive-streams 1.0.3 CC0 org.scala-lang scala-library 2.13.3 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.6.10 Apache-2.0 com.typesafe.akka akka-actor_2.13 2.6.10 Apache-2.0 com.typesafe config 1.4.0 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 0.9.0 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.6.10 Apache-2.0 com.typesafe ssl-config-core_2.13 0.4.2 Apache-2.0 com.typesafe config 1.4.0 Apache-2.0 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 org.reactivestreams reactive-streams 1.0.3 CC0 org.scala-lang scala-library 2.13.3 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0 org.scala-lang scala-library 2.13.3 Apache-2.0
SourceProvider for eventsByTag
A SourceProvider
SourceProvider
defines the source of the event envelopes that the Projection
will process. A SourceProvider
for the eventsByTag
query can be defined with the EventSourcedProvider
EventSourcedProvider
like this:
- Scala
-
import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal import akka.persistence.query.Offset import akka.projection.eventsourced.scaladsl.EventSourcedProvider import akka.projection.scaladsl.SourceProvider val sourceProvider: SourceProvider[Offset, EventEnvelope[ShoppingCart.Event]] = EventSourcedProvider .eventsByTag[ShoppingCart.Event](system, readJournalPluginId = CassandraReadJournal.Identifier, tag = "carts-1")
- Java
-
import akka.persistence.cassandra.query.javadsl.CassandraReadJournal; import akka.persistence.query.Offset; import akka.projection.eventsourced.EventEnvelope; import akka.projection.eventsourced.javadsl.EventSourcedProvider; import akka.projection.javadsl.SourceProvider; SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider = EventSourcedProvider.eventsByTag(system, CassandraReadJournal.Identifier(), "carts-1");
This example is using the Cassandra plugin for Akka Persistence, but same code can be used for other Akka Persistence plugins by replacing the CassandraReadJournal.Identifier
. For example the JDBC plugin can be used. You will use the same plugin as you have configured for the write side that is used by the EventSourcedBehavior
.
This source is consuming all events from the ShoppingCart
EventSourcedBehavior
that are tagged with "cart-1"
.
The tags are assigned as described in Tagging Events in EventSourcedBehavior.
The EventEnvelope[ShoppingCart.Event]
EventEnvelope<ShoppingCart.Event>
is what the Projection
handler will process. It contains the Event
and additional meta data, such as the offset that will be stored by the Projection
. See EventEnvelope
EventEnvelope
for full details of what the envelope contains.