Offset in a relational DB with JDBC

The JdbcProjectionJdbcProjection has support for storing the offset in a relational database using JDBC.

The source of the envelopes can be events from Akka Persistence or any other SourceProvider with supported offset types.

A JdbcHandlerJdbcHandler receives a JdbcSessionJdbcSession instance and an envelope. The JdbcSession provides the means to access an open JDBC connection that can be used to process the envelope. The target database operations can be run in the same transaction as the storage of the offset, which means that exactly-once processing semantics is supported. It also offers at-least-once semantics.

Dependencies

To use the JDBC module of Akka Projections add the following dependency in your project:

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-projection-jdbc" % "1.2.4"
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>com.lightbend.akka</groupId>
    <artifactId>akka-projection-jdbc_${scala.binary.version}</artifactId>
    <version>1.2.4</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation "com.lightbend.akka:akka-projection-jdbc_${versions.ScalaBinary}:1.2.4"
}

Akka Projections require Akka 2.6.18 or later, see Akka version.

Project Info: Akka Projections JDBC
Artifact
com.lightbend.akka
akka-projection-jdbc
1.2.4
JDK versions
AdoptOpenJDK 8
AdoptOpenJDK 11
Scala versions2.13.3, 2.12.15
JPMS module nameakka.projection.jdbc
License
Readiness level
Since 1.0.0, 2020-09-10
Home pagehttps://akka.io
API documentation
Forums
Release notesGitHub releases
IssuesGitHub issues
Sourceshttps://github.com/akka/akka-projection

Transitive dependencies

The table below shows akka-projection-jdbc’s direct dependencies, and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersion
com.lightbend.akkaakka-projection-core_2.131.2.4
com.typesafe.akkaakka-persistence-query_2.132.6.18
org.scala-langscala-library2.13.3
Dependency tree
com.lightbend.akka    akka-projection-core_2.13    1.2.4
    com.typesafe.akka    akka-actor-typed_2.13    2.6.18    Apache-2.0
        com.typesafe.akka    akka-actor_2.13    2.6.18    Apache-2.0
            com.typesafe    config    1.4.0    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        com.typesafe.akka    akka-slf4j_2.13    2.6.18    Apache-2.0
            com.typesafe.akka    akka-actor_2.13    2.6.18    Apache-2.0
                com.typesafe    config    1.4.0    Apache-2.0
                org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                    org.scala-lang    scala-library    2.13.3    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.slf4j    slf4j-api    1.7.32
        org.scala-lang    scala-library    2.13.3    Apache-2.0
        org.slf4j    slf4j-api    1.7.32
    com.typesafe.akka    akka-persistence-query_2.13    2.6.18    Apache-2.0
        com.typesafe.akka    akka-persistence_2.13    2.6.18    Apache-2.0
            com.typesafe.akka    akka-actor_2.13    2.6.18    Apache-2.0
                com.typesafe    config    1.4.0    Apache-2.0
                org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                    org.scala-lang    scala-library    2.13.3    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            com.typesafe.akka    akka-stream_2.13    2.6.18    Apache-2.0
                com.typesafe.akka    akka-actor_2.13    2.6.18    Apache-2.0
                    com.typesafe    config    1.4.0    Apache-2.0
                    org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                        org.scala-lang    scala-library    2.13.3    Apache-2.0
                    org.scala-lang    scala-library    2.13.3    Apache-2.0
                com.typesafe.akka    akka-protobuf-v3_2.13    2.6.18    Apache-2.0
                com.typesafe    ssl-config-core_2.13    0.4.2    Apache-2.0
                    com.typesafe    config    1.4.0    Apache-2.0
                    org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                        org.scala-lang    scala-library    2.13.3    Apache-2.0
                    org.scala-lang    scala-library    2.13.3    Apache-2.0
                org.reactivestreams    reactive-streams    1.0.3    CC0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        com.typesafe.akka    akka-protobuf-v3_2.13    2.6.18    Apache-2.0
        com.typesafe.akka    akka-stream_2.13    2.6.18    Apache-2.0
            com.typesafe.akka    akka-actor_2.13    2.6.18    Apache-2.0
                com.typesafe    config    1.4.0    Apache-2.0
                org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                    org.scala-lang    scala-library    2.13.3    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            com.typesafe.akka    akka-protobuf-v3_2.13    2.6.18    Apache-2.0
            com.typesafe    ssl-config-core_2.13    0.4.2    Apache-2.0
                com.typesafe    config    1.4.0    Apache-2.0
                org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                    org.scala-lang    scala-library    2.13.3    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.reactivestreams    reactive-streams    1.0.3    CC0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        org.scala-lang    scala-library    2.13.3    Apache-2.0
    com.typesafe.akka    akka-protobuf-v3_2.13    2.6.18    Apache-2.0
    com.typesafe.akka    akka-stream_2.13    2.6.18    Apache-2.0
        com.typesafe.akka    akka-actor_2.13    2.6.18    Apache-2.0
            com.typesafe    config    1.4.0    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        com.typesafe.akka    akka-protobuf-v3_2.13    2.6.18    Apache-2.0
        com.typesafe    ssl-config-core_2.13    0.4.2    Apache-2.0
            com.typesafe    config    1.4.0    Apache-2.0
            org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        org.reactivestreams    reactive-streams    1.0.3    CC0
        org.scala-lang    scala-library    2.13.3    Apache-2.0
    org.scala-lang    scala-library    2.13.3    Apache-2.0
com.typesafe.akka    akka-persistence-query_2.13    2.6.18    Apache-2.0
    com.typesafe.akka    akka-persistence_2.13    2.6.18    Apache-2.0
        com.typesafe.akka    akka-actor_2.13    2.6.18    Apache-2.0
            com.typesafe    config    1.4.0    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        com.typesafe.akka    akka-stream_2.13    2.6.18    Apache-2.0
            com.typesafe.akka    akka-actor_2.13    2.6.18    Apache-2.0
                com.typesafe    config    1.4.0    Apache-2.0
                org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                    org.scala-lang    scala-library    2.13.3    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            com.typesafe.akka    akka-protobuf-v3_2.13    2.6.18    Apache-2.0
            com.typesafe    ssl-config-core_2.13    0.4.2    Apache-2.0
                com.typesafe    config    1.4.0    Apache-2.0
                org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                    org.scala-lang    scala-library    2.13.3    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.reactivestreams    reactive-streams    1.0.3    CC0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        org.scala-lang    scala-library    2.13.3    Apache-2.0
    com.typesafe.akka    akka-protobuf-v3_2.13    2.6.18    Apache-2.0
    com.typesafe.akka    akka-stream_2.13    2.6.18    Apache-2.0
        com.typesafe.akka    akka-actor_2.13    2.6.18    Apache-2.0
            com.typesafe    config    1.4.0    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        com.typesafe.akka    akka-protobuf-v3_2.13    2.6.18    Apache-2.0
        com.typesafe    ssl-config-core_2.13    0.4.2    Apache-2.0
            com.typesafe    config    1.4.0    Apache-2.0
            org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        org.reactivestreams    reactive-streams    1.0.3    CC0
        org.scala-lang    scala-library    2.13.3    Apache-2.0
    org.scala-lang    scala-library    2.13.3    Apache-2.0
org.scala-lang    scala-library    2.13.3    Apache-2.0

Required configuration settings

There are two settings that need to be set beforehand in your application.conf file.

  • akka.projection.jdbc.dialect - The dialect type indicating your database of choice. Supported dialects are: mysql-dialect, postgres-dialect, mssql-dialect, oracle-dialect or h2-dialect (testing).
  • akka.projection.jdbc.blocking-jdbc-dispatcher.thread-pool-executor.fixed-pool-size indicating the size of the blocking JDBC dispatcher. See also Blocking JDBC Dispatcher.

Defining a JdbcSession

Before using Akka Projections JDBC you must implement a JdbcSession traitinterface. JdbcSession is used to open a connection and start a transaction. A new JdbcSession will be created for each call to the handler. At the end of the processing, the transaction will be committed (or rolled back).

When using JdbcProjection.exactlyOnce, the JdbcSession that is passed to the handler will be used to save the offset behind the scenes. Therefore, it’s extremely important to disable auto-commit (eg: setAutoCommit(false)), otherwise the two operations won’t participate on the same transaction.

Scala
sourceimport java.sql.Connection
import java.sql.DriverManager
import akka.projection.jdbc.JdbcSession

class PlainJdbcSession extends JdbcSession {

  lazy val conn = {
    Class.forName("org.h2.Driver")
    val c = DriverManager.getConnection("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1")
    c.setAutoCommit(false)
    c
  }
  override def withConnection[Result](func: function.Function[Connection, Result]): Result =
    func(conn)
  override def commit(): Unit = conn.commit()
  override def rollback(): Unit = conn.rollback()
  override def close(): Unit = conn.close()
}
Java
sourceimport akka.projection.jdbc.JdbcSession;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Connection;

class PlainJdbcSession implements JdbcSession {

  private final Connection connection;

  public PlainJdbcSession() {
    try {
      Class.forName("org.h2.Driver");
      this.connection = DriverManager.getConnection("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1");
      connection.setAutoCommit(false);
    } catch (ClassNotFoundException | SQLException e) {
      throw new RuntimeException(e);
    }
  }

  @Override
  public <Result> Result withConnection(Function<Connection, Result> func) throws Exception {
    return func.apply(connection);
  }

  @Override
  public void commit() throws SQLException {
    connection.commit();
  }

  @Override
  public void rollback() throws SQLException {
    connection.rollback();
  }

  @Override
  public void close() throws SQLException {
    connection.close();
  }
}
Note

It’s highly recommended configuring it with a connection pool, for example HikariCP.

When declaring a JdbcProjection you must provide a factory for the JdbcSession. The factory will be used to create new instances whenever needed.

An alternative Hibernate based implementation would look like this:

Java
sourceimport org.hibernate.Session;
import javax.persistence.EntityManager;
import javax.persistence.EntityTransaction;
import java.sql.Connection;
import java.sql.SQLException;

public class HibernateJdbcSession implements JdbcSession {

  public final EntityManager entityManager;
  private final EntityTransaction transaction;

  public HibernateJdbcSession(EntityManager entityManager) {
    this.entityManager = entityManager;
    this.transaction = this.entityManager.getTransaction();
    this.transaction.begin();
  }

  @Override
  public <Result> Result withConnection(Function<Connection, Result> func) {
    Session hibernateSession = entityManager.unwrap(Session.class);
    return hibernateSession.doReturningWork(
        connection -> {
          try {
            return func.apply(connection);
          } catch (SQLException e) {
            throw e;
          } catch (Exception e) {
            throw new SQLException(e);
          }
        });
  }

  @Override
  public void commit() {
    transaction.commit();
  }

  @Override
  public void rollback() {
    // propagates rollback call if transaction is active
    if (transaction.isActive()) transaction.rollback();
  }

  @Override
  public void close() {
    this.entityManager.close();
  }
}

And a special factory that initializes the EntityManagerFactory and builds the JdbcSession instance:

Java
sourceimport javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;

public class HibernateSessionFactory {
  private final EntityManagerFactory entityManagerFactory;

  public HibernateSessionFactory() {
    this.entityManagerFactory = Persistence.createEntityManagerFactory("akka-projection-hibernate");
  }

  public HibernateJdbcSession newInstance() {
    return new HibernateJdbcSession(entityManagerFactory.createEntityManager());
  }
}

Blocking JDBC Dispatcher

JDBC APIs are blocking by design, therefore Akka Projections JDBC will use a dedicated dispatcher to run all JDBC calls. It’s important to configure the dispatcher to have the same size as the connection pool.

Each time the projection handler is called one thread and one database connection will be used. If your connection pool is smaller than the number of threads, the thread can potentially block while waiting for the connection pool to provide a connection.

The dispatcher pool size can be configured through the akka.projection.jdbc.blocking-jdbc-dispatcher.thread-pool-executor.fixed-pool-size settings. See Configuration section below.

Note

Most applications will use database connections to read data, for instance to read a projected model upon user request. This means that other parts of the application will be competing for a connection. It’s recommend to configure a connection pool dedicated to the projections and use a different one in other parts of the application.

exactly-once

The offset is stored in the same transaction used for the user defined handler, which means exactly-once processing semantics if the projection is restarted from previously stored offset.

Scala
sourceimport akka.projection.ProjectionId
import akka.projection.jdbc.scaladsl.JdbcProjection

val projection =
  JdbcProjection
    .exactlyOnce(
      projectionId = ProjectionId("ShoppingCarts", "carts-1"),
      sourceProvider,
      () => new PlainJdbcSession, // JdbcSession Factory
      handler = () => new ShoppingCartHandler(orderRepository))
Java
sourcefinal HibernateSessionFactory sessionProvider = new HibernateSessionFactory();

Projection<EventEnvelope<ShoppingCart.Event>> projection =
    JdbcProjection.exactlyOnce(
        ProjectionId.of("shopping-carts", "carts-1"),
        sourceProvider,
        sessionProvider::newInstance,
        ShoppingCartHandler::new,
        system);

The ShoppingCartHandler is shown below.

at-least-once

The offset is stored after the envelope has been processed and giving at-least-once processing semantics. This means that if the projection is restarted from a previously stored offset some elements may be processed more than once. Therefore, the Handler code must be idempotent.

Scala
sourceimport akka.projection.ProjectionId
import akka.projection.jdbc.scaladsl.JdbcProjection

val projection =
  JdbcProjection
    .atLeastOnce(
      projectionId = ProjectionId("ShoppingCarts", "carts-1"),
      sourceProvider,
      () => new PlainJdbcSession, // JdbcSession Factory
      handler = () => new ShoppingCartHandler(orderRepository))
    .withSaveOffset(afterEnvelopes = 100, afterDuration = 500.millis)
Java
sourcefinal HibernateSessionFactory sessionProvider = new HibernateSessionFactory();
int saveOffsetAfterEnvelopes = 100;
Duration saveOffsetAfterDuration = Duration.ofMillis(500);

Projection<EventEnvelope<ShoppingCart.Event>> projection =
    JdbcProjection.atLeastOnce(
            ProjectionId.of("shopping-carts", "carts-1"),
            sourceProvider,
            sessionProvider::newInstance,
            ShoppingCartHandler::new,
            system)
        .withSaveOffset(saveOffsetAfterEnvelopes, saveOffsetAfterDuration);

The offset is stored after a time window, or limited by a number of envelopes, whatever happens first. This window can be defined with withSaveOffset of the returned AtLeastOnceProjection. The default settings for the window is defined in configuration section akka.projection.at-least-once. There is a performance benefit of not storing the offset too often, but the drawback is that there can be more duplicates when the projection that will be processed again when the projection is restarted.

The ShoppingCartHandler is shown below.

groupedWithin

The envelopes can be grouped before processing, which can be useful for batch updates.

Scala
sourceimport akka.projection.ProjectionId
import akka.projection.jdbc.scaladsl.JdbcProjection

val projection =
  JdbcProjection
    .groupedWithin(
      projectionId = ProjectionId("ShoppingCarts", "carts-1"),
      sourceProvider,
      () => new PlainJdbcSession, // JdbcSession Factory
      handler = () => new GroupedShoppingCartHandler(orderRepository))
    .withGroup(groupAfterEnvelopes = 20, groupAfterDuration = 500.millis)
Java
sourcefinal HibernateSessionFactory sessionProvider = new HibernateSessionFactory();
int saveOffsetAfterEnvelopes = 100;
Duration saveOffsetAfterDuration = Duration.ofMillis(500);

Projection<EventEnvelope<ShoppingCart.Event>> projection =
    JdbcProjection.groupedWithin(
            ProjectionId.of("shopping-carts", "carts-1"),
            sourceProvider,
            sessionProvider::newInstance,
            GroupedShoppingCartHandler::new,
            system)
        .withGroup(saveOffsetAfterEnvelopes, saveOffsetAfterDuration);

The envelopes are grouped within a time window, or limited by a number of envelopes, whatever happens first. This window can be defined with withGroup of the returned GroupedProjection. The default settings for the window is defined in configuration section akka.projection.grouped.

When using groupedWithin the handler is a JdbcHandler[immutable.Seq[EventEnvelope[ShoppingCart.Event]]]JdbcHandler<List<EventEnvelope<ShoppingCart.Event>>>. The GroupedShoppingCartHandler is shown below.

The offset is stored in the same transaction used for the user defined handler, which means exactly-once processing semantics if the projection is restarted from previously stored offset.

Handler

It’s in the JdbcHandlerJdbcHandler that you implement the processing of each envelope. It’s essentially a consumer function from (JdbcSession, Envelope) to Unitvoid.

A handler that is consuming ShoppingCart.Event from eventsByTag can look like this:

Scala
sourceimport akka.projection.jdbc.scaladsl.JdbcHandler

class ShoppingCartHandler(repository: OrderRepository)
    extends JdbcHandler[EventEnvelope[ShoppingCart.Event], PlainJdbcSession] {
  private val logger = LoggerFactory.getLogger(getClass)

  override def process(session: PlainJdbcSession, envelope: EventEnvelope[ShoppingCart.Event]): Unit = {
    envelope.event match {
      case ShoppingCart.CheckedOut(cartId, time) =>
        logger.info(s"Shopping cart $cartId was checked out at $time")
        session.withConnection { conn =>
          repository.save(conn, Order(cartId, time))
        }

      case otherEvent =>
        logger.debug(s"Shopping cart ${otherEvent.cartId} changed by $otherEvent")
    }
  }
}
Java
sourcepublic class ShoppingCartHandler
    extends JdbcHandler<EventEnvelope<ShoppingCart.Event>, HibernateJdbcSession> {
  private final Logger logger = LoggerFactory.getLogger(getClass());

  @Override
  public void process(HibernateJdbcSession session, EventEnvelope<ShoppingCart.Event> envelope)
      throws Exception {
    ShoppingCart.Event event = envelope.event();
    if (event instanceof ShoppingCart.CheckedOut) {
      ShoppingCart.CheckedOut checkedOut = (ShoppingCart.CheckedOut) event;
      logger.info(
          "Shopping cart {} was checked out at {}", checkedOut.cartId, checkedOut.eventTime);

      // pass the EntityManager created by the projection
      // to the repository in order to use the same transaction
      orderRepository.save(
          session.entityManager, new Order(checkedOut.cartId, checkedOut.eventTime));
    } else {
      logger.debug("Shopping cart {} changed by {}", event.getCartId(), event);
    }
  }
}
Hint

Such simple handlers can also be defined as plain functions via the helper JdbcHandler.applyJdbcHandler.fromFunction factory method.

where the OrderRepository is an implementation of:

Scala
sourcecase class Order(id: String, time: Instant)
trait OrderRepository {
  def save(connection: Connection, order: Order): Unit
}
Java
sourceclass Order {
  public final String id;
  public final Instant time;

  public Order(String id, Instant time) {
    this.id = id;
    this.time = time;
  }
}

interface OrderRepository {
  void save(EntityManager entityManager, Order order);
}

Grouped handler

When using JdbcProjection.groupedWithin the handler is processing a SeqList of envelopes.

Scala
sourceimport akka.projection.jdbc.scaladsl.JdbcHandler

import scala.collection.immutable

class GroupedShoppingCartHandler(repository: OrderRepository)
    extends JdbcHandler[immutable.Seq[EventEnvelope[ShoppingCart.Event]], PlainJdbcSession] {
  private val logger = LoggerFactory.getLogger(getClass)

  override def process(
      session: PlainJdbcSession,
      envelopes: immutable.Seq[EventEnvelope[ShoppingCart.Event]]): Unit = {

    // save all events in DB
    envelopes.map(_.event).foreach {
      case ShoppingCart.CheckedOut(cartId, time) =>
        logger.info(s"Shopping cart $cartId was checked out at $time")
        session.withConnection { conn =>
          repository.save(conn, Order(cartId, time))
        }

      case otherEvent =>
        logger.debug(s"Shopping cart ${otherEvent.cartId} changed by $otherEvent")
    }
  }
}
Java
sourcepublic class GroupedShoppingCartHandler
    extends JdbcHandler<List<EventEnvelope<ShoppingCart.Event>>, HibernateJdbcSession> {
  private final Logger logger = LoggerFactory.getLogger(getClass());

  @Override
  public void process(
      HibernateJdbcSession session, List<EventEnvelope<ShoppingCart.Event>> envelopes)
      throws Exception {
    for (EventEnvelope<ShoppingCart.Event> envelope : envelopes) {
      ShoppingCart.Event event = envelope.event();
      if (event instanceof ShoppingCart.CheckedOut) {
        ShoppingCart.CheckedOut checkedOut = (ShoppingCart.CheckedOut) event;
        logger.info(
            "Shopping cart {} was checked out at {}", checkedOut.cartId, checkedOut.eventTime);

        // pass the EntityManager created by the projection
        // to the repository in order to use the same transaction
        orderRepository.save(
            session.entityManager, new Order(checkedOut.cartId, checkedOut.eventTime));

      } else {
        logger.debug("Shopping cart {} changed by {}", event.getCartId(), event);
      }
    }
  }
}

Stateful handler

The JdbcHandler can be stateful, with variables and mutable data structures. It is invoked by the Projection machinery one envelope at a time and visibility guarantees between the invocations are handled automatically, i.e. no volatile or other concurrency primitives are needed for managing the state as long as it’s not accessed by other threads than the one that called process.

Note

It is important that the Handler instance is not shared between several Projection instances, because then it would be invoked concurrently, which is not how it is intended to be used. Each Projection instance should use a new Handler instance.

Async handler

The HandlerHandler can be used with JdbcProjection.atLeastOnceAsync and JdbcProjection.groupedWithinAsync if the handler is not storing the projection result in the database. The handler could send to a Kafka topic or integrate with something else.

There are several examples of such Handler in the documentation for Cassandra Projections. Same type of handlers can be used with JdbcProjection instead of CassandraProjection.

Actor handler

A good alternative for advanced state management is to implement the handler as an actor, which is described in Processing with Actor.

Flow handler

An Akka Streams FlowWithContext can be used instead of a handler for processing the envelopes, which is described in Processing with Akka Streams.

Handler lifecycle

You can override the start and stop methods of the JdbcHandlerJdbcHandler to implement initialization before first envelope is processed and resource cleanup when the projection is stopped. Those methods are also called when the Projection is restarted after failure.

See also error handling.

Schema

The database schema for the offset storage table:

PostgreSQL
sourceCREATE TABLE IF NOT EXISTS akka_projection_offset_store (
  projection_name VARCHAR(255) NOT NULL,
  projection_key VARCHAR(255) NOT NULL,
  current_offset VARCHAR(255) NOT NULL,
  manifest VARCHAR(4) NOT NULL,
  mergeable BOOLEAN NOT NULL,
  last_updated BIGINT NOT NULL,
  PRIMARY KEY(projection_name, projection_key)
);

CREATE INDEX IF NOT EXISTS projection_name_index ON akka_projection_offset_store (projection_name);

CREATE TABLE IF NOT EXISTS akka_projection_management (
  projection_name VARCHAR(255) NOT NULL,
  projection_key VARCHAR(255) NOT NULL,
  paused BOOLEAN NOT NULL,
  last_updated BIGINT NOT NULL,
  PRIMARY KEY(projection_name, projection_key)
);
MySQL
sourceCREATE TABLE IF NOT EXISTS akka_projection_offset_store (
  projection_name VARCHAR(255) NOT NULL,
  projection_key VARCHAR(255) NOT NULL,
  current_offset VARCHAR(255) NOT NULL,
  manifest VARCHAR(4) NOT NULL,
  mergeable BOOLEAN NOT NULL,
  last_updated BIGINT NOT NULL,
  PRIMARY KEY(projection_name, projection_key)
);

CREATE INDEX projection_name_index ON akka_projection_offset_store (projection_name);

CREATE TABLE IF NOT EXISTS akka_projection_management (
  projection_name VARCHAR(255) NOT NULL,
  projection_key VARCHAR(255) NOT NULL,
  paused BOOLEAN NOT NULL,
  last_updated BIGINT NOT NULL,
  PRIMARY KEY(projection_name, projection_key)
);
Microsoft SQL Server
sourceIF NOT EXISTS (SELECT 1 FROM SYS.OBJECTS WHERE OBJECT_ID = OBJECT_ID(N'akka_projection_offset_store') AND TYPE IN (N'U'))
BEGIN
CREATE TABLE akka_projection_offset_store (
  projection_name VARCHAR(255) NOT NULL,
  projection_key VARCHAR(255) NOT NULL,
  current_offset VARCHAR(255) NOT NULL,
  manifest VARCHAR(4) NOT NULL,
  mergeable BIT NOT NULL,
  last_updated BIGINT NOT NULL
)

ALTER TABLE akka_projection_offset_store ADD CONSTRAINT pk_projection_id PRIMARY KEY(projection_name, projection_key)

CREATE INDEX projection_name_index ON akka_projection_offset_store (projection_name)
END

IF NOT EXISTS (SELECT 1 FROM SYS.OBJECTS WHERE OBJECT_ID = OBJECT_ID(N'akka_projection_management') AND TYPE IN (N'U'))
BEGIN
CREATE TABLE akka_projection_management (
  projection_name VARCHAR(255) NOT NULL,
  projection_key VARCHAR(255) NOT NULL,
  paused BIT NOT NULL,
  last_updated BIGINT NOT NULL
)

ALTER TABLE akka_projection_management ADD CONSTRAINT pk_projection_management_id PRIMARY KEY(projection_name, projection_key)
END
Oracle
sourceBEGIN
execute immediate 'create table "AKKA_PROJECTION_OFFSET_STORE" ("PROJECTION_NAME" VARCHAR2(255) NOT NULL,"PROJECTION_KEY" VARCHAR2(255) NOT NULL,"CURRENT_OFFSET" VARCHAR2(255) NOT NULL,"MANIFEST" VARCHAR2(4) NOT NULL,"MERGEABLE" CHAR(1) NOT NULL check ("MERGEABLE" in (0, 1)),"LAST_UPDATED" NUMBER(19) NOT NULL) ';
execute immediate 'alter table "AKKA_PROJECTION_OFFSET_STORE" add constraint "PK_PROJECTION_ID" primary key("PROJECTION_NAME","PROJECTION_KEY") ';
execute immediate 'create index "PROJECTION_NAME_INDEX" on "AKKA_PROJECTION_OFFSET_STORE" ("PROJECTION_NAME") ';
EXCEPTION
    WHEN OTHERS THEN
      IF SQLCODE = -955 THEN
        NULL; -- suppresses ORA-00955 exception
      ELSE
         RAISE;
      END IF;
END;

BEGIN
execute immediate 'create table "AKKA_PROJECTION_MANAGEMENT" ("PROJECTION_NAME" VARCHAR2(255) NOT NULL,"PROJECTION_KEY" VARCHAR2(255) NOT NULL,"PAUSED" CHAR(1) NOT NULL check ("PAUSED" in (0, 1)),"LAST_UPDATED" NUMBER(19) NOT NULL) ';
execute immediate 'alter table "AKKA_PROJECTION_MANAGEMENT" add constraint "PK_PROJECTION_MANAGEMENT_ID" primary key("PROJECTION_NAME","PROJECTION_KEY") ';
EXCEPTION
    WHEN OTHERS THEN
      IF SQLCODE = -955 THEN
        NULL; -- suppresses ORA-00955 exception
      ELSE
         RAISE;
      END IF;
END;
H2
sourceCREATE TABLE IF NOT EXISTS "akka_projection_offset_store" (
  "projection_name" VARCHAR(255) NOT NULL,
  "projection_key" VARCHAR(255) NOT NULL,
  "current_offset" VARCHAR(255) NOT NULL,
  "manifest" VARCHAR(4) NOT NULL,
  "mergeable" BOOLEAN NOT NULL,
  "last_updated" BIGINT NOT NULL,
  PRIMARY KEY("projection_name", "projection_key")
);

CREATE INDEX IF NOT EXISTS "projection_name_index" ON "akka_projection_offset_store" ("projection_name");

CREATE TABLE IF NOT EXISTS "akka_projection_management" (
  "projection_name" VARCHAR(255) NOT NULL,
  "projection_key" VARCHAR(255) NOT NULL,
  "paused" BOOLEAN NOT NULL,
  "last_updated" BIGINT NOT NULL,
  PRIMARY KEY("projection_name", "projection_key")
);

The schema can be created and dropped using the methods JdbcProjection.createTablesIfNotExists and JdbcProjection.dropTablesIfExists. This is particularly useful when writting tests. For production enviornments, we recommend creating the schema before deploying the application.

Important

As of version 1.1.0, the schema for PostgreSQL and H2 databases has changed. It now defaults to lowercase table and column names. If you have a schema in production, we recommend applying an ALTER table script to change it accordingly.

Alternatively, you can fallback to the uppercase format. You will also need to set akka.projection.jdbc.offset-store.table as an uppercase value, as this setting is now defaulting to lowercase.

akka.projection.jdbc.offset-store {
  table = "AKKA_PROJECTION_OFFSET_STORE"
  use-lowercase-schema = false
}

Offset types

The supported offset types of the JdbcProjection are:

Configuration

Make your edits/overrides in your application.conf.

The reference configuration file with the default values:

sourceakka.projection.jdbc {
  # choose one of: mysql-dialect, postgres-dialect, mssql-dialect, oracle-dialect or h2-dialect (testing)
  dialect = ""
  use-dispatcher = "akka.projection.jdbc.blocking-jdbc-dispatcher"
  blocking-jdbc-dispatcher {
    type = Dispatcher
    executor = "thread-pool-executor"
    thread-pool-executor {
      # Use same number of threads as connections in the JDBC connection pool.
      fixed-pool-size = ""
    }
    throughput = 1
  }

  offset-store {
    # set this to your database schema if applicable, empty by default
    schema = ""
    # the database table name for the offset store
    table = "akka_projection_offset_store"

    # the database table name for the projection manangement data
    management-table = "akka_projection_management"

    # Use lowercase table and column names. 
    # This is mostly useful for H2 and Postgres databases. MySQL and SQL Server are case insensitive. 
    # Oracle schema is case sensitive and is defined with uppercase, this property is therefore ignore when using Oracle
    use-lowercase-schema = true
  }

  debug.verbose-offset-store-logging = false
}
Note

Settings akka.projection.jdbc.dialect and akka.projection.jdbc.blocking-jdbc-dispatcher.thread-pool-executor.fixed-pool-size do not have a valid default value. You must configured them in your application.conf file.

See Required Configuration Settings and Blocking JDBC Dispatcher sections for details.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.