Messages from and to Kafka

A typical source for Projections is messages from Kafka. Akka Projections supports integration with Kafka using Alpakka Kafka.

The KafkaSourceProviderKafkaSourceProvider uses consumer group assignments from Kafka and can resume from offsets stored in a database.

Akka Projections can store the offsets from Kafka in a relational DB with JDBC or in relational DB with Slick.

The JdbcProjection or SlickProjection envelope handler will be run by the projection. This means that the target database operations can be run in the same transaction as the storage of the offset, which means when used with exactly-once the offsets will be persisted on the same transaction as the projected model (see Committing offset outside Kafka). It also offers at-least-once semantics.

Note

Offset storage of Kafka offsets are not implemented for Cassandra yet, see issue #97.

A Projection can also send messages to Kafka.

Dependencies

The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.

sbt
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
Maven
<project>
  ...
  <repositories>
    <repository>
      <id>akka-repository</id>
      <name>Akka library repository</name>
      <url>https://repo.akka.io/maven</url>
    </repository>
  </repositories>
</project>
Gradle
repositories {
    mavenCentral()
    maven {
        url "https://repo.akka.io/maven"
    }
}

To use the Kafka module of Akka Projections add the following dependency in your project:

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-projection-kafka" % "1.6.5"
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>com.lightbend.akka</groupId>
    <artifactId>akka-projection-kafka_${scala.binary.version}</artifactId>
    <version>1.6.5</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation "com.lightbend.akka:akka-projection-kafka_${versions.ScalaBinary}:1.6.5"
}

Akka Projections require Akka 2.10.0 or later, see Akka version.

Project Info: Akka Projections Kafka
Artifact
com.lightbend.akka
akka-projection-kafka
1.6.5
JDK versions
Eclipse Temurin JDK 11
Eclipse Temurin JDK 17
Eclipse Temurin JDK 21
Scala versions2.13.15, 3.3.4
JPMS module nameakka.projection.kafka
License
Readiness level
Supported, support is available from Lightbend
Since 1.0.0, 2020-09-10
Home pagehttps://akka.io
API documentation
Forums
Release notesGitHub releases
IssuesGitHub issues
Sourceshttps://github.com/akka/akka-projection

Transitive dependencies

The table below shows akka-projection-kafka’s direct dependencies and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersion
com.fasterxml.jackson.corejackson-databind2.17.2
com.lightbend.akkaakka-projection-core_2.131.6.5
com.typesafe.akkaakka-stream-kafka_2.137.0.0
org.scala-langscala-library2.13.15
Dependency tree
com.fasterxml.jackson.core    jackson-databind    2.17.2    The Apache Software License, Version 2.0
    com.fasterxml.jackson.core    jackson-annotations    2.17.2    The Apache Software License, Version 2.0
    com.fasterxml.jackson.core    jackson-core    2.17.2    The Apache Software License, Version 2.0
com.lightbend.akka    akka-projection-core_2.13    1.6.5
    com.typesafe.akka    akka-actor-typed_2.13    2.10.0    BUSL-1.1
        com.typesafe.akka    akka-actor_2.13    2.10.0    BUSL-1.1
            com.typesafe    config    1.4.3    Apache-2.0
            org.scala-lang    scala-library    2.13.15    Apache-2.0
        com.typesafe.akka    akka-slf4j_2.13    2.10.0    BUSL-1.1
            com.typesafe.akka    akka-actor_2.13    2.10.0    BUSL-1.1
                com.typesafe    config    1.4.3    Apache-2.0
                org.scala-lang    scala-library    2.13.15    Apache-2.0
            org.scala-lang    scala-library    2.13.15    Apache-2.0
            org.slf4j    slf4j-api    2.0.16    MIT License
        org.scala-lang    scala-library    2.13.15    Apache-2.0
        org.slf4j    slf4j-api    2.0.16    MIT License
    com.typesafe.akka    akka-persistence-query_2.13    2.10.0    BUSL-1.1
        com.typesafe.akka    akka-persistence_2.13    2.10.0    BUSL-1.1
            com.typesafe.akka    akka-actor_2.13    2.10.0    BUSL-1.1
                com.typesafe    config    1.4.3    Apache-2.0
                org.scala-lang    scala-library    2.13.15    Apache-2.0
            com.typesafe.akka    akka-stream_2.13    2.10.0    BUSL-1.1
                com.typesafe.akka    akka-actor_2.13    2.10.0    BUSL-1.1
                    com.typesafe    config    1.4.3    Apache-2.0
                    org.scala-lang    scala-library    2.13.15    Apache-2.0
                com.typesafe.akka    akka-protobuf-v3_2.13    2.10.0    BUSL-1.1
                org.reactivestreams    reactive-streams    1.0.4    MIT-0
                org.scala-lang    scala-library    2.13.15    Apache-2.0
            org.scala-lang    scala-library    2.13.15    Apache-2.0
        com.typesafe.akka    akka-protobuf-v3_2.13    2.10.0    BUSL-1.1
        com.typesafe.akka    akka-stream_2.13    2.10.0    BUSL-1.1
            com.typesafe.akka    akka-actor_2.13    2.10.0    BUSL-1.1
                com.typesafe    config    1.4.3    Apache-2.0
                org.scala-lang    scala-library    2.13.15    Apache-2.0
            com.typesafe.akka    akka-protobuf-v3_2.13    2.10.0    BUSL-1.1
            org.reactivestreams    reactive-streams    1.0.4    MIT-0
            org.scala-lang    scala-library    2.13.15    Apache-2.0
        org.scala-lang    scala-library    2.13.15    Apache-2.0
    com.typesafe.akka    akka-protobuf-v3_2.13    2.10.0    BUSL-1.1
    com.typesafe.akka    akka-stream_2.13    2.10.0    BUSL-1.1
        com.typesafe.akka    akka-actor_2.13    2.10.0    BUSL-1.1
            com.typesafe    config    1.4.3    Apache-2.0
            org.scala-lang    scala-library    2.13.15    Apache-2.0
        com.typesafe.akka    akka-protobuf-v3_2.13    2.10.0    BUSL-1.1
        org.reactivestreams    reactive-streams    1.0.4    MIT-0
        org.scala-lang    scala-library    2.13.15    Apache-2.0
    org.scala-lang    scala-library    2.13.15    Apache-2.0
com.typesafe.akka    akka-stream-kafka_2.13    7.0.0    BUSL-1.1
    com.typesafe.akka    akka-stream_2.13    2.10.0    BUSL-1.1
        com.typesafe.akka    akka-actor_2.13    2.10.0    BUSL-1.1
            com.typesafe    config    1.4.3    Apache-2.0
            org.scala-lang    scala-library    2.13.15    Apache-2.0
        com.typesafe.akka    akka-protobuf-v3_2.13    2.10.0    BUSL-1.1
        org.reactivestreams    reactive-streams    1.0.4    MIT-0
        org.scala-lang    scala-library    2.13.15    Apache-2.0
    org.apache.kafka    kafka-clients    3.7.1    The Apache License, Version 2.0
        com.github.luben    zstd-jni    1.5.6-3    BSD 2-Clause License
        org.lz4    lz4-java    1.8.0    The Apache Software License, Version 2.0
        org.slf4j    slf4j-api    2.0.16    MIT License
        org.xerial.snappy    snappy-java    1.1.10.5    Apache-2.0
    org.scala-lang    scala-library    2.13.15    Apache-2.0
org.scala-lang    scala-library    2.13.15    Apache-2.0

KafkaSourceProvider

Important

Due to the mutable state inside KafkaSourceProviderKafkaSourceProvider, DO NOT share the instance of provider across projections.

For example, if you distribute projection via ShardedDaemonProcessShardedDaemonProcess, instantiate each provider inside the behavior factory.

A SourceProviderSourceProvider defines the source of the envelopes that the Projection will process. A SourceProvider for messages from Kafka can be defined with the KafkaSourceProviderKafkaSourceProvider like this:

Scala
sourceimport akka.kafka.ConsumerSettings
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
val bootstrapServers = "localhost:9092"
val groupId = "group-wordcount"
val topicName = "words"
val consumerSettings =
  ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
    .withBootstrapServers(bootstrapServers)
    .withGroupId(groupId)
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

val sourceProvider: SourceProvider[MergeableOffset[JLong], ConsumerRecord[String, String]] =
  KafkaSourceProvider(system, consumerSettings, Set(topicName))
Java
sourceimport akka.kafka.ConsumerSettings;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;

String bootstrapServers = "localhost:9092";
String groupId = "group-wordcount";
String topicName = "words";
ConsumerSettings<String, String> consumerSettings =
    ConsumerSettings.create(system, new StringDeserializer(), new StringDeserializer())
        .withBootstrapServers(bootstrapServers)
        .withGroupId(groupId)
        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

SourceProvider<MergeableOffset<Long>, ConsumerRecord<String, String>> sourceProvider =
    KafkaSourceProvider.create(system, consumerSettings, Collections.singleton(topicName));

Please consult the Alpakka Kafka documentation for specifics around the ConsumerSettings. The KafkaSourceProvider is using Consumer.plainPartitionedManualOffsetSource.

The Projection can then be defined as:

Scala
sourceval sessionProvider = new HibernateSessionFactory

val projectionId = ProjectionId("WordCount", "wordcount-1")
val projection =
  JdbcProjection.exactlyOnce(
    projectionId,
    sourceProvider,
    () => sessionProvider.newInstance(),
    handler = () => new WordCountJdbcHandler(wordRepository))
Java
sourcefinal HibernateSessionFactory sessionProvider = new HibernateSessionFactory();

ProjectionId projectionId = ProjectionId.of("WordCount", "wordcount-1");
ExactlyOnceProjection<MergeableOffset<Long>, ConsumerRecord<String, String>> projection =
    JdbcProjection.exactlyOnce(
        projectionId,
        sourceProvider,
        sessionProvider::newInstance,
        () -> new WordCountJdbcHandler(wordRepository),
        system);

and the WordCountJdbcHandler:

Scala
sourceclass WordCountJdbcHandler(val wordRepository: WordRepository)
    extends JdbcHandler[ConsumerRecord[String, String], HibernateJdbcSession] {

  @throws[Exception]
  override def process(session: HibernateJdbcSession, envelope: ConsumerRecord[String, String]): Unit = {
    val word = envelope.value
    wordRepository.increment(session.entityManager, word)
  }
}
Java
sourcepublic class WordCountJdbcHandler
    extends JdbcHandler<ConsumerRecord<String, String>, HibernateJdbcSession> {
  private Logger logger = LoggerFactory.getLogger(getClass());
  private WordRepository wordRepository;

  public WordCountJdbcHandler(WordRepository wordRepository) {
    this.wordRepository = wordRepository;
  }

  @Override
  public void process(HibernateJdbcSession session, ConsumerRecord<String, String> envelope)
      throws Exception {
    String word = envelope.value();
    wordRepository.increment(session.entityManager, word);
  }
}

Where the WordRepository is an implementation of:

Scala
sourcetrait WordRepository {
  def increment(entityManager: EntityManager, word: String): Unit
}
Java
sourceinterface WordRepository {
  void increment(EntityManager entityManager, String word);
}

Committing offset outside Kafka

The KafkaSourceProvider described above stores the Kafka offsets in a database. The main advantage of storing the offsets in a database is that exactly-once processing semantics can be achieved if the target database operations of the projection can be run in the same transaction as the storage of the offset.

However, there is a caveat when choosing for exactly-once. When the Kafka Consumer Group rebalance occurs it’s possible that some messages from a revoked partitions are still in-flight and have not yet been committed to the offset store. Projections will attempt to filter out such messages, but it’s not possible to guarantee it all the time.

To mitigate that risk, you can increase the value of akka.projection.kafka.read-offset-delay (defaults to 500ms). This delay adds a buffer of time between when the Kafka Source Provider starts up, or when it’s assigned a new partition, to retrieve the map of partitions to offsets to give any projections running in parallel a chance to drain in-flight messages.

Committing offset in Kafka

When using the approach of committing the offsets back to Kafka the Alpakka Kafka comittableSource can be used, and Akka Projections is not needed for that usage.

Sending to Kafka

To send events to Kafka one can use SendProducerSendProducer or Producer.flowWithContextProducer.flowWithContext method in Alpakka Kafka.

Sending to Kafka using the SendProducer

An async Handler that is sending to Kafka may look like this:

Scala
sourceclass WordPublisher(topic: String, sendProducer: SendProducer[String, String])(implicit ec: ExecutionContext)
    extends Handler[WordEnvelope] {
  private val logger = LoggerFactory.getLogger(getClass)

  override def process(envelope: WordEnvelope): Future[Done] = {
    val word = envelope.word
    // using the word as the key and `DefaultPartitioner` will select partition based on the key
    // so that same word always ends up in same partition
    val key = word
    val producerRecord = new ProducerRecord(topic, key, word)
    val result = sendProducer.send(producerRecord).map { recordMetadata =>
      logger.info("Published word [{}] to topic/partition {}/{}", word, topic, recordMetadata.partition)
      Done
    }
    result
  }
}
Java
sourceclass WordPublisher extends Handler<WordEnvelope> {
  private final Logger logger = LoggerFactory.getLogger(getClass());
  private final String topic;
  private final SendProducer<String, String> sendProducer;

  public WordPublisher(String topic, SendProducer<String, String> sendProducer) {
    this.topic = topic;
    this.sendProducer = sendProducer;
  }

  @Override
  public CompletionStage<Done> process(WordEnvelope envelope) {
    String word = envelope.word;
    // using the word as the key and `DefaultPartitioner` will select partition based on the key
    // so that same word always ends up in same partition
    String key = word;
    ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, key, word);
    CompletionStage<RecordMetadata> result = sendProducer.send(producerRecord);
    CompletionStage<Done> done =
        result.thenApply(
            recordMetadata -> {
              logger.info(
                  "Published word [{}] to topic/partition {}/{}",
                  word,
                  topic,
                  recordMetadata.partition());
              return Done.getInstance();
            });
    return done;
  }
}

The SendProducer is constructed with:

Scala
sourceimport org.apache.kafka.common.serialization.StringSerializer
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.SendProducer
val bootstrapServers = "localhost:9092"
val topicName = "words"
private val producerSettings =
  ProducerSettings(system, new StringSerializer, new StringSerializer)
    .withBootstrapServers(bootstrapServers)
import akka.actor.typed.scaladsl.adapter._ // FIXME might not be needed in later Alpakka Kafka version?
private val sendProducer = SendProducer(producerSettings)(system.toClassic)
Java
sourceimport org.apache.kafka.common.serialization.StringSerializer;
import akka.kafka.ProducerSettings;
import akka.kafka.javadsl.SendProducer;

String bootstrapServers = "localhost:9092";
String topicName = "words";
ProducerSettings<String, String> producerSettings =
    ProducerSettings.create(system, new StringSerializer(), new StringSerializer())
        .withBootstrapServers(bootstrapServers);
SendProducer<String, String> sendProducer = new SendProducer<>(producerSettings, system);

Please consult the for specifics around the ProducerSettings and SendProducer.

The Projection is defined as:

Scala
sourceval sourceProvider = new WordSource
val sessionProvider = new HibernateSessionFactory

val projectionId = ProjectionId("PublishWords", "words")
val projection =
  JdbcProjection
    .atLeastOnceAsync(
      projectionId,
      sourceProvider,
      () => sessionProvider.newInstance(),
      handler = () => new WordPublisher(topicName, sendProducer))
Java
sourceWordSource sourceProvider = new WordSource();
HibernateSessionFactory sessionProvider = new HibernateSessionFactory();

ProjectionId projectionId = ProjectionId.of("PublishWords", "words");
Projection<WordEnvelope> projection =
    JdbcProjection.atLeastOnceAsync(
        projectionId,
        sourceProvider,
        sessionProvider::newInstance,
        () -> new WordPublisher(topicName, sendProducer),
        system);

where the SourceProvider in this example is:

Scala
sourcetype Word = String
type Count = Int
final case class WordEnvelope(offset: Long, word: Word)

class WordSource(implicit ec: ExecutionContext) extends SourceProvider[Long, WordEnvelope] {

  private val src = Source(
    List(WordEnvelope(1L, "abc"), WordEnvelope(2L, "def"), WordEnvelope(3L, "ghi"), WordEnvelope(4L, "abc")))

  override def source(offset: () => Future[Option[Long]]): Future[Source[WordEnvelope, NotUsed]] = {
    offset()
      .map {
        case Some(o) => src.dropWhile(_.offset <= o)
        case _       => src
      }
      .map(_.throttle(1, 1.second))
  }

  override def extractOffset(env: WordEnvelope): Long = env.offset

  override def extractCreationTime(env: WordEnvelope): Long = 0L
}
Java
sourcepublic class WordEnvelope {
  public final Long offset;
  public final String word;

  public WordEnvelope(Long offset, String word) {
    this.offset = offset;
    this.word = word;
  }
}

class WordSource extends SourceProvider<Long, WordEnvelope> {

  private final Source<WordEnvelope, NotUsed> src =
      Source.from(
          Arrays.asList(
              new WordEnvelope(1L, "abc"),
              new WordEnvelope(2L, "def"),
              new WordEnvelope(3L, "ghi"),
              new WordEnvelope(4L, "abc")));

  @Override
  public CompletionStage<Source<WordEnvelope, NotUsed>> source(
      Supplier<CompletionStage<Optional<Long>>> offset) {
    return offset
        .get()
        .thenApply(
            o -> {
              if (o.isPresent())
                return src.dropWhile(envelope -> envelope.offset <= o.get())
                    .throttle(1, Duration.ofSeconds(1));
              else return src.throttle(1, Duration.ofSeconds(1));
            });
  }

  @Override
  public Long extractOffset(WordEnvelope envelope) {
    return envelope.offset;
  }

  @Override
  public long extractCreationTime(WordEnvelope envelope) {
    return 0L;
  }
}

Sending to Kafka using a Producer Flow

Alternatively, we can define the same projection using Producer.flowWithContextProducer.flowWithContext in combination with atLeastOnceFlow.

The WordSource emits WordEnvelopes, therefore we will build a flow that takes every single emitted WordEnvelope and map it into an Alpakka Kafka ProducerMessageProducerMessage. The ProducerMessage factory methods can be used to produce a single message, multiple messages, or pass through a message (skip a message from being produced). The ProducerMessageProducerMessage will pass through Producer.flowWithContextProducer.flowWithContext that will publish it to the Kafka Topic and finally we map the result to Done.

Scala
sourceimport org.apache.kafka.common.serialization.StringSerializer
import akka.kafka.ProducerSettings
import org.apache.kafka.clients.producer.ProducerRecord
import akka.kafka.ProducerMessage
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.FlowWithContext
import akka.projection.ProjectionContext

val bootstrapServers = "localhost:9092"
val topicName = "words"

private val producerSettings =
  ProducerSettings(system, new StringSerializer, new StringSerializer)
    .withBootstrapServers(bootstrapServers)

val producerFlow =
  FlowWithContext[WordEnvelope, ProjectionContext]
    .map(wordEnv => ProducerMessage.single(new ProducerRecord(topicName, wordEnv.word, wordEnv.word)))
    .via(Producer.flowWithContext(producerSettings))
    .map(_ => Done)
Java
sourceimport org.apache.kafka.common.serialization.StringSerializer;
import akka.kafka.ProducerSettings;
import org.apache.kafka.clients.producer.ProducerRecord;
import akka.kafka.ProducerMessage;
import akka.kafka.javadsl.Producer;
import akka.stream.javadsl.FlowWithContext;
import akka.projection.ProjectionContext;

String bootstrapServers = "localhost:9092";
String topicName = "words";

ProducerSettings<String, String> producerSettings =
    ProducerSettings.create(system, new StringSerializer(), new StringSerializer())
        .withBootstrapServers(bootstrapServers);

FlowWithContext<WordEnvelope, ProjectionContext, Done, ProjectionContext, NotUsed>
    producerFlow =
        FlowWithContext.<WordEnvelope, ProjectionContext>create()
            .map(
                wordEnv ->
                    ProducerMessage.single(
                        new ProducerRecord<String, String>(
                            topicName, wordEnv.word, wordEnv.word)))
            .via(Producer.flowWithContext(producerSettings))
            .map(__ -> Done.getInstance());

The resulting flow is then used in the atLeastOnceFlow factory method to build the Projection.

Scala
sourceval sourceProvider = new WordSource
val sessionProvider = new HibernateSessionFactory

val projectionId = ProjectionId("PublishWords", "words")
val projection =
  JdbcProjection
    .atLeastOnceFlow(projectionId, sourceProvider, () => sessionProvider.newInstance(), producerFlow)
Java
sourceWordSource sourceProvider = new WordSource();
HibernateSessionFactory sessionProvider = new HibernateSessionFactory();

ProjectionId projectionId = ProjectionId.of("PublishWords", "words");
Projection<WordEnvelope> projection =
    JdbcProjection.atLeastOnceFlow(
        projectionId, sourceProvider, sessionProvider::newInstance, producerFlow, system);

Mergeable Offset

The offset type for a projection is determined by the SourceProviderSourceProvider that’s used. Akka Projections supports a variety of offset types. In most cases an event is associated with a single offset row in the projection implementation’s offset store, but the KafkaSourceProviderKafkaSourceProvider uses a special type of offset called a MergeableOffsetMergeableOffset.

MergeableOffsetMergeableOffset allows us to read and write a map of offsets to the projection offset store. This is required because a subscription to consume from Kafka normally spans more than 1 Kafka Partition (see the Apache Kafka documentation for more information on Kafka’s partitioning model). To begin consuming from Kafka using offsets stored in a projection’s offset store we must provide the Kafka Consumer with a map of topic partitions to offset map (a java.util.Map[org.apache.kafka.common.TopicPartition, java.lang.Long]java.util.Map<org.apache.kafka.common.TopicPartition, java.lang.Long>). The Kafka offset map is modelled as multiple rows in the projection offset table, where each row includes the projection name, a surrogate projection key that represents the Kafka topic partition, and the offset as a java.lang.Long. When a projection with KafkaSourceProviderKafkaSourceProvider is started, or when a Kafka consumer group rebalance occurs, we read all the rows from the offset table for a projection name. When an offset is committed we persist one or more rows of the Kafka offset map back to the projection offset table.

Configuration

Make your edits/overrides in your application.conf.

The reference configuration file with the default values:

sourceakka.projection.kafka {
  # The time to wait before retrieving the last saved offsets. Due to the asynchronous nature of Akka Streams, 
  # when a Kafka Consumer Group rebalance occurs it's possible that some messages from a revoked partitions 
  # are still in-flight and have not yet been committed to the offset store. Projections will attempt to 
  # filter out such messages, but it's not possible to guarantee it all the time. This delay adds a small 
  # buffer of time between when the Kafka Source Provider starts up, or when it's assigned a new partition, 
  # to retrieve the map of partitions to offsets to give any projections running in parallel a chance 
  # to drain in-flight messages.
  read-offset-delay = 500 ms
}
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.