Consumer

A consumer subscribes to Kafka topics and passes the messages into an Akka Stream.

The underlying implementation is using the KafkaConsumer, see Kafka API for a description of consumer groups, offsets, and other details.

Settings

When creating a consumer stream you need to pass in ConsumerSettings (API) that define things like:

  • de-serializers for the keys and values
  • bootstrap servers of the Kafka cluster
  • group id for the consumer, note that offsets are always committed for a given consumer group
  • Kafka consumer tuning parameters
Scala
val config = system.settings.config.getConfig("akka.kafka.consumer")
val consumerSettings =
  ConsumerSettings(config, new StringDeserializer, new ByteArrayDeserializer)
    .withBootstrapServers(bootstrapServers)
    .withGroupId("group1")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
Java
final Config config = system.settings().config().getConfig("akka.kafka.consumer");
final ConsumerSettings<String, byte[]> consumerSettings =
    ConsumerSettings.create(config, new StringDeserializer(), new ByteArrayDeserializer())
        .withBootstrapServers("localhost:9092")
        .withGroupId("group1")
        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

In addition to programmatic construction of the ConsumerSettings (API) it can also be created from configuration (application.conf).

When creating ConsumerSettings with the ActorSystem (API) settings it uses the config section akka.kafka.consumer. The format of these settings files are described in the Typesafe Config Documentation.

# Properties for akka.kafka.ConsumerSettings can be
# defined in this section or a configuration section with
# the same layout. 
akka.kafka.consumer {
  # Tuning property of scheduled polls.
  # Controls the interval from one scheduled poll to the next.
  poll-interval = 50ms
  
  # Tuning property of the `KafkaConsumer.poll` parameter.
  # Note that non-zero value means that the thread that
  # is executing the stage will be blocked. See also the `wakup-timeout` setting below.
  poll-timeout = 50ms
  
  # The stage will await outstanding offset commit requests before
  # shutting down, but if that takes longer than this timeout it will
  # stop forcefully.
  stop-timeout = 30s
  
  # Duration to wait for `KafkaConsumer.close` to finish.
  close-timeout = 20s
  
  # If offset commit requests are not completed within this timeout
  # the returned Future is completed `CommitTimeoutException`.
  commit-timeout = 15s

  # If commits take longer than this time a warning is logged
  commit-time-warning = 1s
  
  # If for any reason `KafkaConsumer.poll` blocks for longer than the configured
  # poll-timeout then it is forcefully woken up with `KafkaConsumer.wakeup`.
  # The KafkaConsumerActor will throw
  # `org.apache.kafka.common.errors.WakeupException` which will be ignored
  # until `max-wakeups` limit gets exceeded.
  wakeup-timeout = 3s

  # After exceeding maximum wakeups the consumer will stop and the stage will fail.
  # Setting it to 0 will let it ignore the wakeups and try to get the polling done forever.
  max-wakeups = 10

  # If set to a finite duration, the consumer will re-send the last committed offsets periodically
  # for all assigned partitions. See https://issues.apache.org/jira/browse/KAFKA-4682.
  commit-refresh-interval = infinite

  # If enabled, log stack traces before waking up the KafkaConsumer to give
  # some indication why the KafkaConsumer is not honouring the `poll-timeout`
  wakeup-debug = true
  
  # Fully qualified config path which holds the dispatcher configuration
  # to be used by the KafkaConsumerActor. Some blocking may occur.
  use-dispatcher = "akka.kafka.default-dispatcher"

  # Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
  # can be defined in this configuration section.
  kafka-clients {
    # Disable auto-commit by default
    enable.auto.commit = false
  }

  # Time to wait for pending requests when a partition is closed
  wait-close-partition = 500ms

  # Limits the query to Kafka for a topic's position
  position-timeout = 5s

  # When using `AssignmentOffsetsForTimes` subscriptions: timeout for the
  # call to Kafka's API
  offset-for-times-timeout = 5s

  # Timeout for akka.kafka.Metadata requests
  # This value is used instead of Kafka's default from `default.api.timeout.ms`
  # which is 1 minute.
  metadata-request-timeout = 5s
}

ConsumerSettings (API) can also be created from any other Config section with the same layout as above.

See KafkaConsumer API and ConsumerConfig API for more details regarding settings.

Offset Storage external to Kafka

The Kafka read offset can either be stored in Kafka (see below), or at a data store of your choice.

Consumer.plainSource (Consumer APIConsumer API) and Consumer.plainPartitionedManualOffsetSource can be used to emit ConsumerRecord (Kafka API) elements as received from the underlying KafkaConsumer. They do not have support for committing offsets to Kafka. When using these Sources, either store an offset externally, or use auto-commit (note that auto-commit is disabled by default).

Scala
consumerSettings
  .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
  .withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000")
Java
consumerSettings
    .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
    .withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");

The consumer application doesn’t need to use Kafka’s built-in offset storage, it can store offsets in a store of its own choosing. The primary use case for this is allowing the application to store both the offset and the results of the consumption in the same system in a way that both the results and offsets are stored atomically. This is not always possible, but when it is it will make the consumption fully atomic and give “exactly once” semantics that are stronger than the “at-least-once” semantics you get with Kafka’s offset commit functionality.

Scala
  val db = new OffsetStore
  val control = db.loadOffset().map { fromOffset =>
    Consumer
      .plainSource(
        consumerSettings,
        Subscriptions.assignmentWithOffset(
          new TopicPartition(topic, /* partition = */ 0) -> fromOffset
        )
      )
      .mapAsync(1)(db.businessLogicAndStoreOffset)
      .toMat(Sink.seq)(Keep.both)
      .run()
  }

class OffsetStore {
  def businessLogicAndStoreOffset(record: ConsumerRecord[String, String]): Future[Done] = // ...
  def loadOffset(): Future[Long] = // ...
}
Java
  final OffsetStorage db = new OffsetStorage();

  CompletionStage<Consumer.Control> controlCompletionStage =
      db.loadOffset()
          .thenApply(
              fromOffset -> {
                return Consumer.plainSource(
                        consumerSettings,
                        Subscriptions.assignmentWithOffset(
                            new TopicPartition("topic1", /* partition: */ 0), fromOffset))
                    .mapAsync(1, db::businessLogicAndStoreOffset)
                    .to(Sink.ignore())
                    .run(materializer);
              });

class OffsetStorage {
  public CompletionStage<Done> businessLogicAndStoreOffset(
      ConsumerRecord<String, byte[]> record) { // ... }
  public CompletionStage<Long> loadOffset() { // ... }
}

For Consumer.plainSource the Subscriptions.assignmentWithOffset specifies the starting point (offset) for a given consumer group id, topic and partition. The group id is defined in the ConsumerSettings.

Alternatively, with Consumer.plainPartitionedManualOffsetSource (Consumer APIConsumer API), only the consumer group id and the topic are required on creation. The starting point is fetched by calling the getOffsetsOnAssign function passed in by the user. This function should return a Map of TopicPartition (API) to Long, with the Long representing the starting point. If a consumer is assigned a partition that is not included in the Map that results from getOffsetsOnAssign, the default starting position will be used, according to the consumer configuration value auto.offset.reset. Also note that Consumer.plainPartitionedManualOffsetSource emits tuples of assigned topic-partition and a corresponding source, as in Source per partition.

Offset Storage in Kafka - committing

The Consumer.committableSource (Consumer APIConsumer API) makes it possible to commit offset positions to Kafka. Compared to auto-commit this gives exact control of when a message is considered consumed.

This is useful when “at-least-once” delivery is desired, as each message will likely be delivered one time, but in failure cases could be received more than once.

Scala
  val control =
    Consumer
      .committableSource(consumerSettings, Subscriptions.topics(topic))
      .mapAsync(10) { msg =>
        business(msg.record.key, msg.record.value).map(_ => msg.committableOffset)
      }
      .mapAsync(5)(offset => offset.commitScaladsl())
      .toMat(Sink.seq)(Keep.both)
      .mapMaterializedValue(DrainingControl.apply)
      .run()

def business(key: String, value: Array[Byte]): Future[Done] = // ???
Java
  Consumer.Control control =
      Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
          .mapAsync(
              1,
              msg ->
                  business(msg.record().key(), msg.record().value())
                      .thenApply(done -> msg.committableOffset()))
          .mapAsync(1, offset -> offset.commitJavadsl())
          .to(Sink.ignore())
          .run(materializer);

CompletionStage<String> business(String key, byte[] value) { // .... }

The above example uses separate mapAsync stages for processing and committing. This guarantees that for parallelism higher than 1 we will keep correct ordering of messages sent for commit.

Committing the offset for each message as illustrated above is rather slow. It is recommended to batch the commits for better throughput, with the trade-off that more messages may be re-delivered in case of failures.

You can use a pre-defined Committer.sink to perform commits in batches:

Scala
val control =
  Consumer
    .committableSource(consumerSettings, Subscriptions.topics(topic))
    .mapAsync(1) { msg =>
      business(msg.record.key, msg.record.value)
        .map(_ => msg.committableOffset)
    }
    .toMat(Committer.sink(committerSettings))(Keep.both)
    .mapMaterializedValue(DrainingControl.apply)
    .run()
Java
Consumer.Control control =
    Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
        .mapAsync(
            1,
            msg ->
                business(msg.record().key(), msg.record().value())
                    .<ConsumerMessage.Committable>thenApply(done -> msg.committableOffset()))
        .to(Committer.sink(committerSettings))
        .run(materializer);

When creating a Committer.sink you need to pass in CommitterSettings (API) that defines:

  • max-batch — maximum number of messages to commit at once,
  • max-interval — maximum interval between commits.

The bigger the values are, the less load you put on Kafka and the smaller are chances that committing offsets will become a bottleneck. However, increasing these values also means that in case of a failure you will have to re-process more messages.

You can also make a manual batching using the Akka Stream batch combinator to perform the batching. Note that it will only aggregate elements into batches if the downstream consumer is slower than the upstream producer.

Scala
val control =
  Consumer
    .committableSource(consumerSettings, Subscriptions.topics(topic))
    .mapAsync(1) { msg =>
      business(msg.record.key, msg.record.value)
        .map(_ => msg.committableOffset)
    }
    .batch(
      max = 20,
      CommittableOffsetBatch(_)
    )(_.updated(_))
    .mapAsync(3)(_.commitScaladsl())
    .toMat(Sink.seq)(Keep.both)
    .mapMaterializedValue(DrainingControl.apply)
    .run()
Java
Consumer.Control control =
    Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
        .mapAsync(
            1,
            msg ->
                business(msg.record().key(), msg.record().value())
                    .thenApply(done -> msg.committableOffset()))
        .batch(
            20,
            ConsumerMessage::createCommittableOffsetBatch,
            ConsumerMessage.CommittableOffsetBatch::updated)
        .mapAsync(3, c -> c.commitJavadsl())
        .to(Sink.ignore())
        .run(materializer);

If you consume from a topic with low activity, and possibly no messages arrive for more than 24 hours, consider enabling periodical commit refresh (akka.kafka.consumer.commit-refresh-interval configuration parameters), otherwise offsets might expire in the Kafka storage.

For less active topics timing-based aggregation with groupedWithin might be a better choice than the batch operator.

Scala
source
  .groupedWithin(10, 5.seconds)
  .map(CommittableOffsetBatch(_))
  .mapAsync(3)(_.commitScaladsl())
Java
source
    .groupedWithin(20, java.time.Duration.of(5, ChronoUnit.SECONDS))
    .map(ConsumerMessage::createCommittableOffsetBatch)
    .mapAsync(3, c -> c.commitJavadsl())

The Consumer.commitWithMetadataSource allows you to add metadata to the committed offset based on the last consumed record.

Note that the first offset provided to the consumer during a partition assignment will not contain metadata. This offset can get committed due to a periodic commit refresh (akka.kafka.consumer.commit-refresh-interval configuration parmeters) and the commit will not contain metadata.

Scala
def metadataFromRecord(record: ConsumerRecord[String, String]): String =
  record.timestamp().toString

val control =
  Consumer
    .commitWithMetadataSource(consumerSettings, Subscriptions.topics(topic), metadataFromRecord)
    .mapAsync(1) { msg =>
      business(msg.record.key, msg.record.value)
        .map(_ => msg.committableOffset)
    }
    .batch(
      max = 20,
      CommittableOffsetBatch(_)
    )(_.updated(_))
    .mapAsync(3)(_.commitScaladsl())
    .toMat(Sink.seq)(Keep.both)
    .mapMaterializedValue(DrainingControl.apply)
    .run()
Java
Consumer.Control control =
    Consumer.commitWithMetadataSource(
            consumerSettings,
            Subscriptions.topics("topic1"),
            (record) -> Long.toString(record.timestamp()))
        .mapAsync(
            1,
            msg ->
                business(msg.record().key(), msg.record().value())
                    .thenApply(done -> msg.committableOffset()))
        .batch(
            20,
            ConsumerMessage::createCommittableOffsetBatch,
            ConsumerMessage.CommittableOffsetBatch::updated)
        .mapAsync(3, c -> c.commitJavadsl())
        .to(Sink.ignore())
        .run(materializer);

If you commit the offset before processing the message you get “at-most-once” delivery semantics, this is provided by Consumer.atMostOnceSource. However, atMostOnceSource commits the offset for each message and that is rather slow, batching of commits is recommended.

Scala
  val (control, result) =
    Consumer
      .atMostOnceSource(consumerSettings, Subscriptions.topics(topic))
      .mapAsync(1)(record => business(record.key, record.value()))
      .map(it => {
        println(s"Done with $it")
        it
      })
      .toMat(Sink.seq)(Keep.both)
      .run()

def business(key: String, value: String): Future[Done] = // ???
Java
  Consumer.Control control =
      Consumer.atMostOnceSource(consumerSettings, Subscriptions.topics("topic1"))
          .mapAsync(10, record -> business(record.key(), record.value()))
          .to(Sink.foreach(it -> System.out.println("Done with " + it)))
          .run(materializer);

CompletionStage<String> business(String key, byte[] value) { // .... }

Maintaining at-least-once delivery semantics requires care, many risks and solutions are covered in At-Least-Once Delivery.

Connecting Producer and Consumer

For cases when you need to read messages from one topic, transform or enrich them, and then write to another topic you can use Consumer.committableSource and connect it to a Producer.commitableSink. The commitableSink will commit the offset back to the consumer when it has successfully published the message.

The committableSink accepts implementations ProducerMessage.Envelope (API) that contain the offset to commit the consumption of the originating message (of type ConsumerMessage.Committable (API)). See Producing messages about different implementations of Envelope supported.

Note that there is a risk that something fails after publishing but before committing, so commitableSink has “at-least-once” delivery semantics.

Scala
val control =
  Consumer
    .committableSource(consumerSettings, Subscriptions.topics(topic1, topic2))
    .map { msg =>
      ProducerMessage.single(
        new ProducerRecord(targetTopic, msg.record.key, msg.record.value),
        msg.committableOffset
      )
    }
    .toMat(Producer.commitableSink(producerSettings))(Keep.both)
    .mapMaterializedValue(DrainingControl.apply)
    .run()
Java
Consumer.Control control =
    Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1", "topic2"))
        .map(
            msg ->
                ProducerMessage.<String, byte[], ConsumerMessage.Committable>single(
                    new ProducerRecord<>(
                        "targetTopic", msg.record().key(), msg.record().value()),
                    msg.committableOffset()))
        .to(Producer.commitableSink(producerSettings))
        .run(materializer);

As Producer.committableSink’s committing of messages one-by-one is rather slow, prefer a flow together with batching of commits.

Scala
val control = Consumer
  .committableSource(consumerSettings, Subscriptions.topics(topic))
  .map(
    msg =>
      ProducerMessage.single(
        new ProducerRecord(targetTopic, msg.record.key, msg.record.value),
        msg.committableOffset
    )
  )
  .via(Producer.flexiFlow(producerSettings))
  .map(_.passThrough)
  .batch(max = 20, CommittableOffsetBatch.apply)(_.updated(_))
  .mapAsync(3)(_.commitScaladsl())
  .toMat(Sink.ignore)(Keep.both)
  .mapMaterializedValue(DrainingControl.apply)
  .run()
Java
Source<ConsumerMessage.CommittableOffset, Consumer.Control> source =
    Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
        .map(
            msg ->
                ProducerMessage.single(
                    new ProducerRecord<>("topic2", msg.record().key(), msg.record().value()),
                    msg.committableOffset()))
        .via(Producer.flexiFlow(producerSettings))
        .map(result -> result.passThrough());

source
    .batch(
        20,
        ConsumerMessage::createCommittableOffsetBatch,
        ConsumerMessage.CommittableOffsetBatch::updated)
    .mapAsync(3, c -> c.commitJavadsl())
    .runWith(Sink.ignore(), materializer);
Note

There is a risk that something fails after publishing, but before committing, so commitableSink has “at-least-once” delivery semantics.

To get delivery guarantees, please read about transactions.

Source per partition

Consumer.plainPartitionedSource (Consumer APIConsumer API) , Consumer.committablePartitionedSource, and Consumer.commitWithMetadataPartitionedSource support tracking the automatic partition assignment from Kafka. When a topic-partition is assigned to a consumer, this source will emit a tuple with the assigned topic-partition and a corresponding source. When a topic-partition is revoked, the corresponding source completes.

Backpressure per partition with batch commit:

Scala
val (control, result) = Consumer
  .committablePartitionedSource(consumerSettings, Subscriptions.topics(topic))
  .flatMapMerge(maxPartitions, _._2)
  .via(businessFlow)
  .map(_.committableOffset)
  .batch(max = 100, CommittableOffsetBatch.apply)(_.updated(_))
  .mapAsync(3)(_.commitScaladsl())
  .toMat(Sink.seq)(Keep.both)
  .run()
Java
Consumer.DrainingControl<Done> control =
    Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics("topic1"))
        .flatMapMerge(maxPartitions, Pair::second)
        .via(business())
        .map(msg -> msg.committableOffset())
        .batch(
            100,
            ConsumerMessage::createCommittableOffsetBatch,
            ConsumerMessage.CommittableOffsetBatch::updated)
        .mapAsync(3, offsets -> offsets.commitJavadsl())
        .toMat(Sink.ignore(), Keep.both())
        .mapMaterializedValue(Consumer::createDrainingControl)
        .run(materializer);

Separate streams per partition:

Scala
val (control, result) = Consumer
  .committablePartitionedSource(consumerSettings, Subscriptions.topics(topic))
  .map {
    case (topicPartition, source) =>
      source
        .via(businessFlow)
        .mapAsync(1)(_.committableOffset.commitScaladsl())
        .runWith(Sink.ignore)
  }
  .mapAsyncUnordered(maxPartitions)(identity)
  .toMat(Sink.ignore)(Keep.both)
  .run()
Java
Consumer.DrainingControl<Done> control =
    Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics("topic1"))
        .map(
            pair -> {
              Source<ConsumerMessage.CommittableMessage<String, byte[]>, NotUsed> source =
                  pair.second();
              return source
                  .via(business())
                  .mapAsync(1, message -> message.committableOffset().commitJavadsl())
                  .runWith(Sink.ignore(), materializer);
            })
        .mapAsyncUnordered(maxPartitions, completion -> completion)
        .toMat(Sink.ignore(), Keep.both())
        .mapMaterializedValue(Consumer::createDrainingControl)
        .run(materializer);

Join flows based on automatically assigned partitions:

Scala
type Msg = CommittableMessage[String, Array[Byte]]

def zipper(left: Source[Msg, _], right: Source[Msg, _]): Source[(Msg, Msg), NotUsed] = ???

Consumer
  .committablePartitionedSource(consumerSettings, Subscriptions.topics("topic1"))
  .map {
    case (topicPartition, source) =>
      // get corresponding partition from other topic
      val otherTopicPartition = new TopicPartition("otherTopic", topicPartition.partition())
      val otherSource = Consumer.committableSource(consumerSettings, Subscriptions.assignment(otherTopicPartition))
      zipper(source, otherSource)
  }
  .flatMapMerge(maxPartitions, identity)
  .via(businessFlow)
  //build commit offsets
  .batch(
    max = 20,
    seed = {
      case (left, right) =>
        (
          CommittableOffsetBatch(left.committableOffset),
          CommittableOffsetBatch(right.committableOffset)
        )
    }
  )(
    aggregate = {
      case ((batchL, batchR), (l, r)) =>
        batchL.updated(l.committableOffset)
        batchR.updated(r.committableOffset)
        (batchL, batchR)
    }
  )
  .mapAsync(1) { case (l, r) => l.commitScaladsl().map(_ => r) }
  .mapAsync(1)(_.commitScaladsl())
  .runWith(Sink.ignore)

Sharing the KafkaConsumer instance

If you have many streams it can be more efficient to share the underlying KafkaConsumer (Kafka API) instance. It is shared by creating a KafkaConsumerActor (API). You need to create the actor and stop it by sending KafkaConsumerActor.Stop when it is not needed any longer. You pass the ActorRef as a parameter to the Consumer (Consumer APIConsumer API) factory methods.

Scala
//Consumer is represented by actor
val consumer: ActorRef = system.actorOf(KafkaConsumerActor.props(consumerSettings))

//Manually assign topic partition to it
val (controlPartition1, result1) = Consumer
  .plainExternalSource[String, Array[Byte]](
    consumer,
    Subscriptions.assignment(new TopicPartition(topic, partition1))
  )
  .via(businessFlow)
  .toMat(Sink.seq)(Keep.both)
  .run()

//Manually assign another topic partition
val (controlPartition2, result2) = Consumer
  .plainExternalSource[String, Array[Byte]](
    consumer,
    Subscriptions.assignment(new TopicPartition(topic, partition2))
  )
  .via(businessFlow)
  .toMat(Sink.seq)(Keep.both)
  .run()

// ....

              controlPartition1.shutdown()
              controlPartition2.shutdown()
consumer ! KafkaConsumerActor.Stop
Java
// Consumer is represented by actor
ActorRef consumer = system.actorOf((KafkaConsumerActor.props(consumerSettings)));

// Manually assign topic partition to it
Consumer.Control controlPartition1 =
    Consumer.plainExternalSource(
            consumer, Subscriptions.assignment(new TopicPartition("topic1", 1)))
        .via(business())
        .to(Sink.ignore())
        .run(materializer);

// Manually assign another topic partition
Consumer.Control controlPartition2 =
    Consumer.plainExternalSource(
            consumer, Subscriptions.assignment(new TopicPartition("topic1", 2)))
        .via(business())
        .to(Sink.ignore())
        .run(materializer);

consumer.tell(KafkaConsumerActor.stop(), self);

Accessing KafkaConsumer metrics

You can access the underlying consumer metrics via the materialized Control instance:

Scala
val control: Consumer.Control = Consumer
  .plainSource(consumerSettings, Subscriptions.assignment(new TopicPartition(topic, partition)))
  .via(businessFlow)
  .to(Sink.ignore)
  .run()


val metrics: Future[Map[MetricName, Metric]] = control.metrics
metrics.foreach(map => println(s"metrics: ${map.mkString("\n")}"))
Java
// run the stream to obtain the materialized Control value
Consumer.Control control =
    Consumer.plainSource(
            consumerSettings, Subscriptions.assignment(new TopicPartition("topic1", 2)))
        .via(business())
        .to(Sink.ignore())
        .run(materializer);

CompletionStage<Map<MetricName, Metric>> metrics = control.getMetrics();
metrics.thenAccept(map -> System.out.println("Metrics: " + map));

Accessing KafkaConsumer metadata

Accessing of Kafka consumer metadata is possible as described in Consumer Metadata.

Listening for rebalance events

You may set up an rebalance event listener actor that will be notified when your consumer will be assigned or revoked from consuming from specific topic partitions. Two kinds of messages will be sent to this listener actor

  • akka.kafka.TopicPartitionsAssigned and
  • akka.kafka.TopicPartitionsRevoked, like this:
Scala
import akka.kafka.{TopicPartitionsAssigned, TopicPartitionsRevoked}

class RebalanceListener extends Actor with ActorLogging {
  def receive: Receive = {
    case TopicPartitionsAssigned(subscription, topicPartitions) =>
      log.info("Assigned: {}", topicPartitions)

    case TopicPartitionsRevoked(subscription, topicPartitions) =>
      log.info("Revoked: {}", topicPartitions)
  }
}

val rebalanceListener = system.actorOf(Props(new RebalanceListener))
val subscription = Subscriptions
  .topics(topic)
  // additionally, pass the actor reference:
  .withRebalanceListener(rebalanceListener)

// use the subscription as usual:
  Consumer
    .plainSource(consumerSettings, subscription)
Java
class RebalanceListener extends AbstractLoggingActor {

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(
            TopicPartitionsAssigned.class,
            assigned -> {
              log().info("Assigned: {}", assigned);
            })
        .match(
            TopicPartitionsRevoked.class,
            revoked -> {
              log().info("Revoked: {}", revoked);
            })
        .build();
  }
}

  ActorRef rebalanceListener = this.system.actorOf(Props.create(RebalanceListener.class));

  Subscription subscription =
      Subscriptions.topics("topic")
          // additionally, pass the actor reference:
          .withRebalanceListener(rebalanceListener);

  // use the subscription as usual:
  Consumer.plainSource(consumerSettings, subscription);

Controlled shutdown

The Source created with Consumer.plainSource and similar methods materializes to a Consumer.Control (APIAPI) instance. This can be used to stop the stream in a controlled manner.

When using external offset storage, a call to Consumer.Control.shutdown() suffices to complete the Source, which starts the completion of the stream.

Scala
val (consumerControl, streamComplete) =
  Consumer
    .plainSource(consumerSettings,
                 Subscriptions.assignmentWithOffset(
                   new TopicPartition(topic, 0) -> offset
                 ))
    .via(businessFlow)
    .toMat(Sink.ignore)(Keep.both)
    .run()

consumerControl.shutdown()
Java
final OffsetStorage db = new OffsetStorage();

db.loadOffset()
    .thenAccept(
        fromOffset -> {
          Consumer.Control control =
              Consumer.plainSource(
                      consumerSettings,
                      Subscriptions.assignmentWithOffset(
                          new TopicPartition("topic1", 0), fromOffset))
                  .mapAsync(
                      10,
                      record -> {
                        return business(record.key(), record.value())
                            .thenApply(res -> db.storeProcessedOffset(record.offset()));
                      })
                  .toMat(Sink.ignore(), Keep.left())
                  .run(materializer);

          // Shutdown the consumer when desired
          control.shutdown();
        });

When you are using offset storage in Kafka, the shutdown process involves several steps:

  1. Consumer.Control.stop() to stop producing messages from the Source. This does not stop the underlying Kafka Consumer.
  2. Wait for the stream to complete, so that a commit request has been made for all offsets of all processed messages (via commitScaladsl() or commitJavadsl()).
  3. Consumer.Control.shutdown() to wait for all outstanding commit requests to finish and stop the Kafka Consumer.

To manage this shutdown process, use the Consumer.DrainingControl (APIAPI) by combining the Consumer.Control with the sink’s materialized completion future in mapMaterializedValue'. That control offers the methoddrainAndShutdown` which implements the process descibed above. It is recommended to use the same shutdown mechanism also when not using batching to avoid potential race conditions, depending on the exact layout of the stream.

Scala
val drainingControl =
  Consumer
    .committableSource(consumerSettings, Subscriptions.topics(topic))
    .mapAsync(1) { msg =>
      business(msg.record).map(_ => msg.committableOffset)
    }
    .batch(max = 20, first => CommittableOffsetBatch(first)) { (batch, elem) =>
      batch.updated(elem)
    }
    .mapAsync(3)(_.commitScaladsl())
    .toMat(Sink.ignore)(Keep.both)
    .mapMaterializedValue(DrainingControl.apply)
    .run()

val streamComplete = drainingControl.drainAndShutdown()
Java
final Executor ec = Executors.newCachedThreadPool();

Consumer.DrainingControl<Done> control =
    Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
        .mapAsync(
            1,
            msg ->
                business(msg.record().key(), msg.record().value())
                    .thenApply(done -> msg.committableOffset()))
        .batch(
            20,
            first -> ConsumerMessage.createCommittableOffsetBatch(first),
            (batch, elem) -> batch.updated(elem))
        .mapAsync(3, c -> c.commitJavadsl())
        .toMat(Sink.ignore(), Keep.both())
        .mapMaterializedValue(Consumer::createDrainingControl)
        .run(materializer);

control.drainAndShutdown(ec);
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.