Offset in a relational DB with Slick
The SlickProjection
SlickProjection
has support for storing the offset in a relational database using Slick (JDBC). This is only an option for Scala and for Java the offset can be stored in relational DB with JDBC.
The source of the envelopes can be events from Akka Persistence or any other SourceProvider
with supported offset types.
The envelope handler returns a DBIO
that will be run by the projection. This means that the target database operations can be run in the same transaction as the storage of the offset, which means that exactly-once processing semantics is supported. It also offers at-least-once semantics.
Prefer using the JDBC implementation to implement your projection handler. Slick support in akka-projection
is meant for users migrating from Lagom's Slick ReadSideProcessor
.
Dependencies
To use the Slick module of Akka Projections add the following dependency in your project:
- sbt
libraryDependencies += "com.lightbend.akka" %% "akka-projection-slick" % "1.0.0"
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-projection-slick_${scala.binary.version}</artifactId> <version>1.0.0</version> </dependency>
- Gradle
versions += [ ScalaBinary: "2.13" ] dependencies { compile group: 'com.lightbend.akka', name: "akka-projection-slick_${versions.ScalaBinary}", version: '1.0.0' }
Akka Projections require Akka 2.6.9 or later, see Akka version.
Project Info: Akka Projection Slick | |
---|---|
Artifact | com.lightbend.akka
akka-projection-slick
1.0.0 |
JDK versions | AdoptOpenJDK 8 AdoptOpenJDK 11 |
Scala versions | 2.13.3, 2.12.11 |
JPMS module name | akka.projection.slick |
License | |
Readiness level |
Since 0.0, 2020-04-01
|
Home page | https://akka.io |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | GitHub issues |
Sources | https://github.com/akka/akka-projection |
Transitive dependencies
The table below shows akka-projection-slick
’s direct dependencies and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version com.lightbend.akka akka-projection-core_2.13 1.0.0 com.lightbend.akka akka-projection-jdbc_2.13 1.0.0 com.typesafe.akka akka-persistence-query_2.13 2.6.9 com.typesafe.slick slick_2.13 3.3.3 org.scala-lang scala-library 2.13.3 - Dependency tree
com.lightbend.akka akka-projection-core_2.13 1.0.0 com.typesafe.akka akka-actor-typed_2.13 2.6.9 com.typesafe.akka akka-actor_2.13 2.6.9 com.typesafe config 1.4.0 org.scala-lang.modules scala-java8-compat_2.13 0.9.0 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 com.typesafe.akka akka-slf4j_2.13 2.6.9 com.typesafe.akka akka-actor_2.13 2.6.9 com.typesafe config 1.4.0 org.scala-lang.modules scala-java8-compat_2.13 0.9.0 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 org.slf4j slf4j-api 1.7.30 org.scala-lang scala-library 2.13.3 org.slf4j slf4j-api 1.7.30 com.typesafe.akka akka-persistence-query_2.13 2.6.9 com.typesafe.akka akka-persistence_2.13 2.6.9 com.typesafe.akka akka-actor_2.13 2.6.9 com.typesafe config 1.4.0 org.scala-lang.modules scala-java8-compat_2.13 0.9.0 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 com.typesafe.akka akka-stream_2.13 2.6.9 com.typesafe.akka akka-actor_2.13 2.6.9 com.typesafe config 1.4.0 org.scala-lang.modules scala-java8-compat_2.13 0.9.0 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 com.typesafe.akka akka-protobuf-v3_2.13 2.6.9 com.typesafe ssl-config-core_2.13 0.4.2 com.typesafe config 1.4.0 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 org.reactivestreams reactive-streams 1.0.3 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 com.typesafe.akka akka-stream_2.13 2.6.9 com.typesafe.akka akka-actor_2.13 2.6.9 com.typesafe config 1.4.0 org.scala-lang.modules scala-java8-compat_2.13 0.9.0 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 com.typesafe.akka akka-protobuf-v3_2.13 2.6.9 com.typesafe ssl-config-core_2.13 0.4.2 com.typesafe config 1.4.0 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 org.reactivestreams reactive-streams 1.0.3 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 com.typesafe.akka akka-protobuf-v3_2.13 2.6.9 com.typesafe.akka akka-stream_2.13 2.6.9 com.typesafe.akka akka-actor_2.13 2.6.9 com.typesafe config 1.4.0 org.scala-lang.modules scala-java8-compat_2.13 0.9.0 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 com.typesafe.akka akka-protobuf-v3_2.13 2.6.9 com.typesafe ssl-config-core_2.13 0.4.2 com.typesafe config 1.4.0 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 org.reactivestreams reactive-streams 1.0.3 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 com.lightbend.akka akka-projection-jdbc_2.13 1.0.0 com.lightbend.akka akka-projection-core_2.13 1.0.0 com.typesafe.akka akka-actor-typed_2.13 2.6.9 com.typesafe.akka akka-actor_2.13 2.6.9 com.typesafe config 1.4.0 org.scala-lang.modules scala-java8-compat_2.13 0.9.0 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 com.typesafe.akka akka-slf4j_2.13 2.6.9 com.typesafe.akka akka-actor_2.13 2.6.9 com.typesafe config 1.4.0 org.scala-lang.modules scala-java8-compat_2.13 0.9.0 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 org.slf4j slf4j-api 1.7.30 org.scala-lang scala-library 2.13.3 org.slf4j slf4j-api 1.7.30 com.typesafe.akka akka-persistence-query_2.13 2.6.9 com.typesafe.akka akka-persistence_2.13 2.6.9 com.typesafe.akka akka-actor_2.13 2.6.9 com.typesafe config 1.4.0 org.scala-lang.modules scala-java8-compat_2.13 0.9.0 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 com.typesafe.akka akka-stream_2.13 2.6.9 com.typesafe.akka akka-actor_2.13 2.6.9 com.typesafe config 1.4.0 org.scala-lang.modules scala-java8-compat_2.13 0.9.0 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 com.typesafe.akka akka-protobuf-v3_2.13 2.6.9 com.typesafe ssl-config-core_2.13 0.4.2 com.typesafe config 1.4.0 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 org.reactivestreams reactive-streams 1.0.3 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 com.typesafe.akka akka-stream_2.13 2.6.9 com.typesafe.akka akka-actor_2.13 2.6.9 com.typesafe config 1.4.0 org.scala-lang.modules scala-java8-compat_2.13 0.9.0 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 com.typesafe.akka akka-protobuf-v3_2.13 2.6.9 com.typesafe ssl-config-core_2.13 0.4.2 com.typesafe config 1.4.0 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 org.reactivestreams reactive-streams 1.0.3 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 com.typesafe.akka akka-protobuf-v3_2.13 2.6.9 com.typesafe.akka akka-stream_2.13 2.6.9 com.typesafe.akka akka-actor_2.13 2.6.9 com.typesafe config 1.4.0 org.scala-lang.modules scala-java8-compat_2.13 0.9.0 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 com.typesafe.akka akka-protobuf-v3_2.13 2.6.9 com.typesafe ssl-config-core_2.13 0.4.2 com.typesafe config 1.4.0 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 org.reactivestreams reactive-streams 1.0.3 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 com.typesafe.akka akka-persistence-query_2.13 2.6.9 com.typesafe.akka akka-persistence_2.13 2.6.9 com.typesafe.akka akka-actor_2.13 2.6.9 com.typesafe config 1.4.0 org.scala-lang.modules scala-java8-compat_2.13 0.9.0 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 com.typesafe.akka akka-stream_2.13 2.6.9 com.typesafe.akka akka-actor_2.13 2.6.9 com.typesafe config 1.4.0 org.scala-lang.modules scala-java8-compat_2.13 0.9.0 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 com.typesafe.akka akka-protobuf-v3_2.13 2.6.9 com.typesafe ssl-config-core_2.13 0.4.2 com.typesafe config 1.4.0 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 org.reactivestreams reactive-streams 1.0.3 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 com.typesafe.akka akka-stream_2.13 2.6.9 com.typesafe.akka akka-actor_2.13 2.6.9 com.typesafe config 1.4.0 org.scala-lang.modules scala-java8-compat_2.13 0.9.0 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 com.typesafe.akka akka-protobuf-v3_2.13 2.6.9 com.typesafe ssl-config-core_2.13 0.4.2 com.typesafe config 1.4.0 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 org.reactivestreams reactive-streams 1.0.3 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 com.typesafe.akka akka-persistence-query_2.13 2.6.9 com.typesafe.akka akka-persistence_2.13 2.6.9 com.typesafe.akka akka-actor_2.13 2.6.9 com.typesafe config 1.4.0 org.scala-lang.modules scala-java8-compat_2.13 0.9.0 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 com.typesafe.akka akka-stream_2.13 2.6.9 com.typesafe.akka akka-actor_2.13 2.6.9 com.typesafe config 1.4.0 org.scala-lang.modules scala-java8-compat_2.13 0.9.0 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 com.typesafe.akka akka-protobuf-v3_2.13 2.6.9 com.typesafe ssl-config-core_2.13 0.4.2 com.typesafe config 1.4.0 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 org.reactivestreams reactive-streams 1.0.3 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 com.typesafe.akka akka-stream_2.13 2.6.9 com.typesafe.akka akka-actor_2.13 2.6.9 com.typesafe config 1.4.0 org.scala-lang.modules scala-java8-compat_2.13 0.9.0 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 com.typesafe.akka akka-protobuf-v3_2.13 2.6.9 com.typesafe ssl-config-core_2.13 0.4.2 com.typesafe config 1.4.0 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 org.reactivestreams reactive-streams 1.0.3 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 com.typesafe.slick slick_2.13 3.3.3 com.typesafe config 1.4.0 org.reactivestreams reactive-streams 1.0.3 org.scala-lang.modules scala-collection-compat_2.13 2.0.0 org.scala-lang scala-library 2.13.3 org.scala-lang scala-library 2.13.3 org.slf4j slf4j-api 1.7.30 org.scala-lang scala-library 2.13.3
exactly-once
The offset is stored in the same transaction as the DBIO
returned from the handler
, which means exactly-once processing semantics if the projection is restarted from previously stored offset.
- Scala
-
import akka.projection.ProjectionId import akka.projection.slick.SlickProjection import slick.basic.DatabaseConfig import slick.dbio.DBIO import slick.jdbc.H2Profile implicit val ec = system.executionContext val projection = SlickProjection.exactlyOnce( projectionId = ProjectionId("ShoppingCarts", "carts-1"), sourceProvider, dbConfig, handler = () => new ShoppingCartHandler(repository))
The ShoppingCartHandler
is shown below.
at-least-once
The offset is stored after the envelope has been processed and giving at-least-once processing semantics. This means that if the projection is restarted from a previously stored offset some elements may be processed more than once. Therefore, the Handler code must be idempotent.
- Scala
-
implicit val ec = system.executionContext val projection = SlickProjection .atLeastOnce( projectionId = ProjectionId("ShoppingCarts", "carts-1"), sourceProvider, dbConfig, handler = () => new ShoppingCartHandler(repository)) .withSaveOffset(afterEnvelopes = 100, afterDuration = 500.millis)
The offset is stored after a time window, or limited by a number of envelopes, whatever happens first. This window can be defined with withSaveOffset
of the returned AtLeastOnceProjection
. The default settings for the window is defined in configuration section akka.projection.at-least-once
. There is a performance benefit of not storing the offset too often, but the drawback is that there can be more duplicates when the projection that will be processed again when the projection is restarted.
The ShoppingCartHandler
is shown below.
groupedWithin
The envelopes can be grouped before processing, which can be useful for batch updates.
- Scala
-
implicit val ec = system.executionContext val projection = SlickProjection .groupedWithin( projectionId = ProjectionId("ShoppingCarts", "carts-1"), sourceProvider, dbConfig, handler = () => new GroupedShoppingCartHandler(repository)) .withGroup(groupAfterEnvelopes = 20, groupAfterDuration = 500.millis)
The envelopes are grouped within a time window, or limited by a number of envelopes, whatever happens first. This window can be defined with withGroup
of the returned GroupedProjection
. The default settings for the window is defined in configuration section akka.projection.grouped
.
When using groupedWithin
the handler is a SlickHandler[immutable.Seq[EventEnvelope[ShoppingCart.Event]]]
. The GroupedShoppingCartHandler
is shown below.
The offset is stored in the same transaction as the DBIO
returned from the handler
, which means exactly-once processing semantics if the projection is restarted from previously stored offset.
Handler
It’s in the SlickHandler
SlickHandler
that you implement the processing of each envelope. It’s essentially a function from Envelope
to DBIO[Done]
. The returned DBIO
is run by the projection.
A handler that is consuming ShoppingCart.Event
from eventsByTag
can look like this:
- Scala
-
import scala.concurrent.Future import akka.Done import akka.projection.slick.SlickHandler import org.slf4j.LoggerFactory class ShoppingCartHandler(repository: OrderRepository)(implicit ec: ExecutionContext) extends SlickHandler[EventEnvelope[ShoppingCart.Event]] { private val logger = LoggerFactory.getLogger(getClass) override def process(envelope: EventEnvelope[ShoppingCart.Event]): DBIO[Done] = { envelope.event match { case ShoppingCart.CheckedOut(cartId, time) => logger.info(s"Shopping cart $cartId was checked out at $time") repository.save(Order(cartId, time)) case otherEvent => logger.debug(s"Shopping cart ${otherEvent.cartId} changed by $otherEvent") DBIO.successful(Done) } } }
Such simple handlers can also be defined as plain functions via the helper SlickHandler.apply
factory method.
where the OrderRepository
is:
- Scala
-
case class Order(id: String, time: Instant) class OrderRepository(val dbConfig: DatabaseConfig[H2Profile]) { import dbConfig.profile.api._ private class OrdersTable(tag: Tag) extends Table[Order](tag, "ORDERS") { def id = column[String]("CART_ID", O.PrimaryKey) def time = column[Instant]("TIME") def * = (id, time).mapTo[Order] } private val ordersTable = TableQuery[OrdersTable] def save(order: Order)(implicit ec: ExecutionContext) = { ordersTable.insertOrUpdate(order).map(_ => Done) } def createTable(): Future[Unit] = dbConfig.db.run(ordersTable.schema.createIfNotExists) }
with the Slick DatabaseConfig
:
- Scala
-
val dbConfig: DatabaseConfig[H2Profile] = DatabaseConfig.forConfig("akka.projection.slick", system.settings.config) val repository = new OrderRepository(dbConfig)
Grouped handler
When using SlickProjection.groupedWithin
the handler is processing a Seq
of envelopes.
- Scala
-
import scala.collection.immutable class GroupedShoppingCartHandler(repository: OrderRepository)(implicit ec: ExecutionContext) extends SlickHandler[immutable.Seq[EventEnvelope[ShoppingCart.Event]]] { private val logger = LoggerFactory.getLogger(getClass) override def process(envelopes: immutable.Seq[EventEnvelope[ShoppingCart.Event]]): DBIO[Done] = { val dbios = envelopes.map(_.event).map { case ShoppingCart.CheckedOut(cartId, time) => logger.info(s"Shopping cart $cartId was checked out at $time") repository.save(Order(cartId, time)) case otherEvent => logger.debug(s"Shopping cart ${otherEvent.cartId} changed by $otherEvent") DBIO.successful(Done) } DBIO.sequence(dbios).map(_ => Done) } }
Stateful handler
The SlickHandler
can be stateful, with variables and mutable data structures. It is invoked by the Projection
machinery one envelope at a time and visibility guarantees between the invocations are handled automatically, i.e. no volatile or other concurrency primitives are needed for managing the state as long as it’s not accessed by other threads than the one that called process
.
It is important that the Handler
instance is not shared between several Projection
instances, because then it would be invoked concurrently, which is not how it is intended to be used. Each Projection
instance should use a new Handler
instance.
Async handler
The Handler
Handler
can be used with SlickProjection.atLeastOnceAsync
and SlickProjection.groupedWithinAsync
if the handler is not storing the projection result in the database. The handler could send to a Kafka topic or integrate with something else.
There are several examples of such Handler
in the documentation for Cassandra Projections. Same type of handlers can be used with SlickProjection
instead of CassandraProjection
.
Actor handler
A good alternative for advanced state management is to implement the handler as an actor, which is described in Processing with Actor.
Flow handler
An Akka Streams FlowWithContext
can be used instead of a handler for processing the envelopes, which is described in Processing with Akka Streams.
Handler lifecycle
You can override the start
and stop
methods of the SlickHandler
SlickHandler
to implement initialization before first envelope is processed and resource cleanup when the projection is stopped. Those methods are also called when the Projection
is restarted after failure.
See also error handling.
Schema
The database schema for the offset storage table:
- PostgreSQL
-
create table if not exists "AKKA_PROJECTION_OFFSET_STORE" ( "PROJECTION_NAME" VARCHAR(255) NOT NULL, "PROJECTION_KEY" VARCHAR(255) NOT NULL, "CURRENT_OFFSET" VARCHAR(255) NOT NULL, "MANIFEST" VARCHAR(4) NOT NULL, "MERGEABLE" BOOLEAN NOT NULL, "LAST_UPDATED" BIGINT NOT NULL ); create index "PROJECTION_NAME_INDEX" on "AKKA_PROJECTION_OFFSET_STORE" ("PROJECTION_NAME"); alter table "AKKA_PROJECTION_OFFSET_STORE" add constraint "PK_PROJECTION_ID" primary key("PROJECTION_NAME","PROJECTION_KEY");
- MySQL
-
create table if not exists AKKA_PROJECTION_OFFSET_STORE ( PROJECTION_NAME VARCHAR(255) NOT NULL, PROJECTION_KEY VARCHAR(255) NOT NULL, CURRENT_OFFSET VARCHAR(255) NOT NULL, MANIFEST VARCHAR(4) NOT NULL, MERGEABLE BOOLEAN NOT NULL, LAST_UPDATED BIGINT NOT NULL ); create index PROJECTION_NAME_INDEX on AKKA_PROJECTION_OFFSET_STORE (PROJECTION_NAME); alter table AKKA_PROJECTION_OFFSET_STORE add constraint PK_PROJECTION_ID primary key(PROJECTION_NAME,PROJECTION_KEY);
- Microsoft SQL Server
-
IF NOT EXISTS (SELECT 1 FROM sys.objects WHERE object_id = OBJECT_ID(N'"AKKA_PROJECTION_OFFSET_STORE"') AND type in (N'U')) begin create table "AKKA_PROJECTION_OFFSET_STORE" ( "PROJECTION_NAME" VARCHAR(255) NOT NULL, "PROJECTION_KEY" VARCHAR(255) NOT NULL, "CURRENT_OFFSET" VARCHAR(255) NOT NULL, "MANIFEST" VARCHAR(4) NOT NULL, "MERGEABLE" BIT NOT NULL, "LAST_UPDATED" BIGINT NOT NULL ) alter table "AKKA_PROJECTION_OFFSET_STORE" add constraint "PK_PROJECTION_ID" primary key("PROJECTION_NAME","PROJECTION_KEY") create index "PROJECTION_NAME_INDEX" on "AKKA_PROJECTION_OFFSET_STORE" ("PROJECTION_NAME") end
- Oracle
-
BEGIN execute immediate 'create table "AKKA_PROJECTION_OFFSET_STORE" ("PROJECTION_NAME" VARCHAR2(255) NOT NULL,"PROJECTION_KEY" VARCHAR2(255) NOT NULL,"CURRENT_OFFSET" VARCHAR2(255) NOT NULL,"MANIFEST" VARCHAR2(4) NOT NULL,"MERGEABLE" CHAR(1) NOT NULL check ("MERGEABLE" in (0, 1)),"LAST_UPDATED" NUMBER(19) NOT NULL) '; execute immediate 'alter table "AKKA_PROJECTION_OFFSET_STORE" add constraint "PK_PROJECTION_ID" primary key("PROJECTION_NAME","PROJECTION_KEY") '; execute immediate 'create index "PROJECTION_NAME_INDEX" on "AKKA_PROJECTION_OFFSET_STORE" ("PROJECTION_NAME") '; EXCEPTION WHEN OTHERS THEN IF SQLCODE = -955 THEN NULL; -- suppresses ORA-00955 exception ELSE RAISE; END IF; END;
- H2
-
create table if not exists "AKKA_PROJECTION_OFFSET_STORE" ( "PROJECTION_NAME" VARCHAR(255) NOT NULL, "PROJECTION_KEY" VARCHAR(255) NOT NULL, "CURRENT_OFFSET" VARCHAR(255) NOT NULL, "MANIFEST" VARCHAR(4) NOT NULL, "MERGEABLE" BOOLEAN NOT NULL, "LAST_UPDATED" BIGINT NOT NULL ); create index "PROJECTION_NAME_INDEX" on "AKKA_PROJECTION_OFFSET_STORE" ("PROJECTION_NAME"); alter table "AKKA_PROJECTION_OFFSET_STORE" add constraint "PK_PROJECTION_ID" primary key("PROJECTION_NAME","PROJECTION_KEY");
The schema can be created using the method SlickProjection.createOffsetTableIfNotExists
. This is particularly useful when writting tests. For production enviornments, we recommend creating the schema before deploying the application.
Offset types
The supported offset types of the SlickProjection
are:
Offset
Offset
types from events from Akka PersistenceMergeableOffset
MergeableOffset
that is used for messages from KafkaString
Int
Long
- Any other type that has a configured Akka Serializer is stored with base64 encoding of the serialized bytes. For example the Akka Persistence Spanner offset is supported in this way.
Configuration
Make your edits/overrides in your application.conf.
The reference configuration file with the default values:
akka.projection.slick = {
# The Slick profile to use
# set to one of: slick.jdbc.DerbyProfile$, slick.jdbc.H2Profile$, slick.jdbc.HsqldbProfile$, slick.jdbc.MySQLProfile$,
# slick.jdbc.PostgresProfile$, slick.jdbc.SQLiteProfile$, slick.jdbc.OracleProfile$
#profile = <fill this with your profile of choice>
# add here your Slick db settings
db {
# url = "jdbc:h2:mem:test1"
# driver = org.h2.Driver
# connectionPool = disabled
# keepAliveConnection = true
}
offset-store {
# set this to your database schema if applicable, empty by default
schema = ""
# the database table name for the offset store
table = "AKKA_PROJECTION_OFFSET_STORE"
}
}