Choosing a Source Provider

A SourceProviderSourceProvider will provide the data to our projection. In Projections each element that’s processed is an Envelope and each Envelope contains an Event. An Envelope must include an Offset, but it can also contain other information such as creation timestamp, a topic name, an entity tag, etc. There are several supported Source Provider’s available (or you can build your own), but in this example we will use the Akka Persistence EventSourced Source Provider.

Add the following dependencies to your project:

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-projection-eventsourced" % "1.5.3"
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>com.lightbend.akka</groupId>
    <artifactId>akka-projection-eventsourced_${scala.binary.version}</artifactId>
    <version>1.5.3</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation "com.lightbend.akka:akka-projection-eventsourced_${versions.ScalaBinary}:1.5.3"
}

Add the following imports to ShoppingCartApp:

Scala
sourceimport akka.persistence.cassandra.query.scaladsl.CassandraReadJournal
import akka.persistence.query.Offset
import akka.projection.eventsourced.scaladsl.EventSourcedProvider
import akka.projection.scaladsl.SourceProvider
Java
sourceimport akka.persistence.cassandra.query.javadsl.CassandraReadJournal;
import akka.persistence.query.Offset;
import akka.projection.eventsourced.javadsl.EventSourcedProvider;
import akka.projection.javadsl.SourceProvider;

Create the SourceProviderSourceProvider. The Event Sourced Source Provider is using Akka Persistence internally (specifically the eventsByTag API). To initialize the Source Provider we need to set parameters to choose the Akka Persistence plugin (Cassandra) to use as well as the name of the tag used for events we’re interested in from the journal.

Setup the SourceProvider in the Guardian Behavior defined in ShoppingCartApp:

Scala
sourceval sourceProvider: SourceProvider[Offset, EventEnvelope[ShoppingCartEvents.Event]] =
  EventSourcedProvider
    .eventsByTag[ShoppingCartEvents.Event](
      system,
      readJournalPluginId = CassandraReadJournal.Identifier,
      tag = ShoppingCartTags.Single)
Java
sourceSourceProvider<Offset, EventEnvelope<ShoppingCartEvents.Event>> sourceProvider =
    EventSourcedProvider.eventsByTag(
        system, CassandraReadJournal.Identifier(), ShoppingCartTags.SINGLE);

Finally, we must configure Akka Persistence by adding a configuration file guide-shopping-cart-app.conf to the src/main/resources/ directory of the project:

sourcedatastax-java-driver {
  # basic.contact-points = ["127.0.0.1:9042"]
  # basic.load-balancing-policy.local-datacenter = "datacenter1"
  advanced {
    # reconnect to c* if down when app is started
    reconnect-on-init = true
  }
}

akka {
  loglevel = DEBUG

  persistence.journal {
    plugin = "akka.persistence.cassandra.journal"
    auto-start-journals = ["akka.persistence.cassandra.journal"]
  }
  persistence.snapshot-store.plugin = "akka.persistence.cassandra.snapshot"

  persistence {
    cassandra {
      journal {
        # to create the schema
        keyspace-autocreate = true
        tables-autocreate = true
      }

      snapshot {
        # to create the schema
        keyspace-autocreate = true
        tables-autocreate = true
      }

      query {
        refresh-interval = 2s
      }

      events-by-tag {
        # for lower latency
        eventual-consistency-delay = 25ms
        flush-interval = 25ms
        pubsub-notification = on
      }
    }
  }
}
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.