Consumer

A consumer is used for subscribing to Kafka topics.

The underlying implementation is using the KafkaConsumer, see Javadoc for description of consumer groups, offsets, and other details.

Example Code

For the examples in this section we use the following two dummy classes to illustrate how messages can be consumed.

Scala
class DB {

  private val offset = new AtomicLong

  def save(record: ConsumerRecord[Array[Byte], String]): Future[Done] = {
    println(s"DB.save: ${record.value}")
    offset.set(record.offset)
    Future.successful(Done)
  }

  def loadOffset(): Future[Long] =
    Future.successful(offset.get)

  def update(data: String): Future[Done] = {
    println(s"DB.update: $data")
    Future.successful(Done)
  }
}
Java
static class DB {
  private final AtomicLong offset = new AtomicLong();

  public CompletionStage<Done> save(ConsumerRecord<byte[], String> record) {
    System.out.println("DB.save: " + record.value());
    offset.set(record.offset());
    return CompletableFuture.completedFuture(Done.getInstance());
  }

  public CompletionStage<Long> loadOffset() {
    return CompletableFuture.completedFuture(offset.get());
  }

  public CompletionStage<Done> update(String data) {
    System.out.println("DB.update: " + data);
    return CompletableFuture.completedFuture(Done.getInstance());
  }
}
Scala
class Rocket {
  def launch(destination: String): Future[Done] = {
    println(s"Rocket launched to $destination")
    Future.successful(Done)
  }
}
Java
static class Rocket {
  public CompletionStage<Done> launch(String destination) {
    System.out.println("Rocket launched to " + destination);
    return CompletableFuture.completedFuture(Done.getInstance());
  }
}

Settings

When creating a consumer stream you need to pass in ConsumerSettings that define things like:

  • bootstrap servers of the Kafka cluster
  • group id for the consumer, note that offsets are always committed for a given consumer group
  • serializers for the keys and values
  • tuning parameters
Scala
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
  .withBootstrapServers("localhost:9092")
  .withGroupId("group1")
  .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
Java
protected final ConsumerSettings<byte[], String> consumerSettings =
    ConsumerSettings.create(system, new ByteArrayDeserializer(), new StringDeserializer())
  .withBootstrapServers("localhost:9092")
  .withGroupId("group1")
  .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

In addition to programmatic construction of the ConsumerSettings it can also be created from configuration (application.conf). By default when creating ConsumerSettings with the ActorSystem parameter it uses the config section akka.kafka.consumer.

# Properties for akka.kafka.ConsumerSettings can be
# defined in this section or a configuration section with
# the same layout. 
akka.kafka.consumer {
  # Tuning property of scheduled polls.
  poll-interval = 50ms
  
  # Tuning property of the `KafkaConsumer.poll` parameter.
  # Note that non-zero value means that blocking of the thread that
  # is executing the stage will be blocked.
  poll-timeout = 50ms
  
  # The stage will be await outstanding offset commit requests before
  # shutting down, but if that takes longer than this timeout it will
  # stop forcefully.
  stop-timeout = 30s
  
  # How long to wait for `KafkaConsumer.close`
  close-timeout = 20s
  
  # If offset commit requests are not completed within this timeout
  # the returned Future is completed `CommitTimeoutException`.
  commit-timeout = 15s

  # If commits take longer than this time a warning is logged
  commit-time-warning = 1s
  
  # If for any reason KafkaConsumer.poll blocks for longer than the configured
  # poll-timeout then forcefully woken up with KafkaConsumer.wakeup
  # The KafkaConsumerActor will throw
  # org.apache.kafka.common.errors.WakeupException which will be ignored
  # until max-wakeups limit gets exceeded.
  wakeup-timeout = 3s

  # After exceeding maxinum wakeups the consumer will stop and the stage will fail.
  max-wakeups = 10

  # If enabled log stack traces before waking up the KafkaConsumer to give
  # some indication why the KafkaConsumer is not honouring the poll-timeout
  wakeup-debug = true
  
  # Fully qualified config path which holds the dispatcher configuration
  # to be used by the KafkaConsumerActor. Some blocking may occur.
  use-dispatcher = "akka.kafka.default-dispatcher"

  # Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
  # can be defined in this configuration section.
  kafka-clients {
    # Disable auto-commit by default
    enable.auto.commit = false
  }
}

ConsumerSettings can also be created from any other Config section with the same layout as above.

See KafkaConsumer Javadoc and ConsumerConfig Javadoc for details.

External Offset Storage

Consumer.plainSource and Consumer.plainPartitionedManualOffsetSource can be used to emit ConsumerRecord elements (as received from the underlying KafkaConsumer). They do not have support for committing offsets to Kafka. When using these Sources, either store an offset externally or use auto-commit (note that auto-commit is by default disabled).

The consumer application doesn’t need to use Kafka’s built-in offset storage, it can store offsets in a store of its own choosing. The primary use case for this is allowing the application to store both the offset and the results of the consumption in the same system in a way that both the results and offsets are stored atomically. This is not always possible, but when it is it will make the consumption fully atomic and give “exactly once” semantics that are stronger than the “at-least once” semantics you get with Kafka’s offset commit functionality.

Scala
val db = new DB
db.loadOffset().foreach { fromOffset =>
  val partition = 0
  val subscription = Subscriptions.assignmentWithOffset(
    new TopicPartition("topic1", partition) -> fromOffset
  )
  val done =
    Consumer.plainSource(consumerSettings, subscription)
      .mapAsync(1)(db.save)
      .runWith(Sink.ignore)
val db = new DB
db.loadOffset().foreach { fromLongTime =>
  val partition = 0
  val subscription = Subscriptions.assignmentOffsetsForTimes(
    new TopicPartition("topic1", partition) -> fromLongTime
  )
  val done =
    Consumer.plainSource(consumerSettings, subscription)
      .mapAsync(1)(db.save)
      .runWith(Sink.ignore)
Java
final DB db = new DB();

db.loadOffset().thenAccept(fromOffset -> {
  Consumer.plainSource(
    consumerSettings,
    Subscriptions.assignmentWithOffset(new TopicPartition("topic1", 0), fromOffset)
  ).mapAsync(1, record -> db.save(record))
  .runWith(Sink.ignore(), materializer);
});

Note how with Consumer.plainSource, the starting point (offset) is assigned for a given consumer group id, topic and partition. The group id is defined in the ConsumerSettings.

With Consumer.plainPartitionedManualOffsetSource, only the consumer group id and the topic is required on creation. The starting point is fetched by calling the getOffsetsOnAssign function passed in by the user. This function should return a Map of TopicPartition to Long, with the Long representing the starting point. If a consumer is assigned a partition that is not included in the Map that results from getOffsetsOnAssign, the default starting position will be used, according to the consumer configuration value auto.offset.reset. Also note that Consumer.plainPartitionedManualOffsetSource emits tuples of assigned topic-partition and a corresponding source, as in Source per partition.

Offset Storage in Kafka

The Consumer.committableSource makes it possible to commit offset positions to Kafka.

Compared to auto-commit this gives exact control of when a message is considered consumed.

If you need to store offsets in anything other than Kafka, plainSource should be used instead of this API.

This is useful when “at-least once delivery” is desired, as each message will likely be delivered one time but in failure cases could be duplicated.

Scala
val db = new DB

val done =
  Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
    .mapAsync(1) { msg =>
      db.update(msg.record.value).map(_ => msg)
    }
    .mapAsync(1) { msg =>
      msg.committableOffset.commitScaladsl()
    }
    .runWith(Sink.ignore)
Java
final DB db = new DB();

Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
  .mapAsync(1, msg -> db.update(msg.record().value())
    .thenApply(done -> msg))
  .mapAsync(1, msg -> msg.committableOffset().commitJavadsl())
  .runWith(Sink.ignore(), materializer);

The above example uses separate mapAsync stages for processing and committing. This guarantees that for parallelism higher than 1 we will keep correct ordering of messages sent for commit.

Committing the offset for each message as illustrated above is rather slow. It is recommended to batch the commits for better throughput, with the trade-off that more messages may be re-delivered in case of failures.

You can use the Akka Stream batch combinator to perform the batching. Note that it will only aggregate elements into batches if the downstream consumer is slower than the upstream producer.

Scala
val db = new DB

val done =
  Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
    .mapAsync(1) { msg =>
      db.update(msg.record.value).map(_ => msg.committableOffset)
    }
    .batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) { (batch, elem) =>
      batch.updated(elem)
    }
    .mapAsync(3)(_.commitScaladsl())
    .runWith(Sink.ignore)
Java
final DB db = new DB();

Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
  .mapAsync(1, msg ->
    db.update(msg.record().value()).thenApply(done -> msg.committableOffset()))
  .batch(20,
    first -> ConsumerMessage.emptyCommittableOffsetBatch().updated(first),
    (batch, elem) -> batch.updated(elem))
    .mapAsync(3, c -> c.commitJavadsl())
  .runWith(Sink.ignore(), materializer);

groupedWithin is an alternative way of aggregating elements:

Scala
.groupedWithin(10, 5.seconds)
.map(group => group.foldLeft(CommittableOffsetBatch.empty) { (batch, elem) => batch.updated(elem) })
.mapAsync(3)(_.commitScaladsl())
Java
    source
      .groupedWithin(20, Duration.create(5, TimeUnit.SECONDS))
      .map(group -> foldLeft(group))
      .mapAsync(3, c -> c.commitJavadsl())

private ConsumerMessage.CommittableOffsetBatch foldLeft(List<ConsumerMessage.CommittableOffset> group) {
  ConsumerMessage.CommittableOffsetBatch batch = ConsumerMessage.emptyCommittableOffsetBatch();
  for (ConsumerMessage.CommittableOffset elem: group) {
    batch = batch.updated(elem);
  }
  return batch;
}

If you commit the offset before processing the message you get “at-most once delivery” semantics, and for that there is a Consumer.atMostOnceSource. However, atMostOnceSource commits the offset for each message and that is rather slow, batching of commits is recommended.

Scala
val rocket = new Rocket

val done = Consumer.atMostOnceSource(consumerSettings, Subscriptions.topics("topic1"))
  .mapAsync(1) { record =>
    rocket.launch(record.value)
  }
  .runWith(Sink.ignore)
Java
final Rocket rocket = new Rocket();

Consumer.atMostOnceSource(consumerSettings, Subscriptions.topics("topic1"))
  .mapAsync(1, record -> rocket.launch(record.value()))
  .runWith(Sink.ignore(), materializer);

Maintaining at-least-once delivery semantics requires care, so many risks and solutions are covered in At-Least-Once Delivery.

Connecting Producer and Consumer

For cases when you need to read messages from one topic, transform or enrich them, and then write to another topic you can use Consumer.committableSource and connect it to a Producer.commitableSink. The commitableSink will commit the offset back to the consumer when it has successfully published the message.

Note that there is a risk that something fails after publishing but before committing, so commitableSink has “at-least once delivery” semantics.

Scala
Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
  .map { msg =>
    println(s"topic1 -> topic2: $msg")
    ProducerMessage.Message(new ProducerRecord[Array[Byte], String](
      "topic2",
      msg.record.value
    ), msg.committableOffset)
  }
  .runWith(Producer.commitableSink(producerSettings))
Java
Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
  .map(msg ->
    new ProducerMessage.Message<byte[], String, ConsumerMessage.Committable>(
        new ProducerRecord<>("topic2", msg.record().value()), msg.committableOffset()))
  .runWith(Producer.commitableSink(producerSettings), materializer);

As mentioned earlier, committing each message is rather slow and we can batch the commits to get higher throughput.

Scala
val done =
  Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
    .map(msg =>
      ProducerMessage.Message(new ProducerRecord[Array[Byte], String]("topic2", msg.record.value), msg.committableOffset))
    .via(Producer.flow(producerSettings))
    .map(_.message.passThrough)
    .batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) { (batch, elem) =>
      batch.updated(elem)
    }
    .mapAsync(3)(_.commitScaladsl())
    .runWith(Sink.ignore)
Java
Source<ConsumerMessage.CommittableOffset, Consumer.Control> source =
  Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
  .map(msg ->
      new ProducerMessage.Message<byte[], String, ConsumerMessage.CommittableOffset>(
          new ProducerRecord<>("topic2", msg.record().value()), msg.committableOffset()))
  .via(Producer.flow(producerSettings))
  .map(result -> result.message().passThrough());

  source.batch(20,
      first -> ConsumerMessage.emptyCommittableOffsetBatch().updated(first),
      (batch, elem) -> batch.updated(elem))
    .mapAsync(3, c -> c.commitJavadsl())
    .runWith(Sink.ignore(), materializer);

Source per partition

Consumer.plainPartitionedSource and Consumer.committablePartitionedSource supports tracking the automatic partition assignment from Kafka. When topic-partition is assigned to a consumer this source will emit tuple with assigned topic-partition and a corresponding source. When topic-partition is revoked then corresponding source completes.

Backpressure per partition with batch commit:

Scala
val done = Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics("topic1"))
  .flatMapMerge(maxPartitions, _._2)
  .via(business)
  .batch(max = 20, first => CommittableOffsetBatch.empty.updated(first.committableOffset)) { (batch, elem) =>
    batch.updated(elem.committableOffset)
  }
  .mapAsync(3)(_.commitScaladsl())
  .runWith(Sink.ignore)
Java
Consumer
  .committablePartitionedSource(consumerSettings, Subscriptions.topics("topic1"))
  .flatMapMerge(maxPartitions, Pair::second)
  .via(business())
  .batch(
      100,
      first -> ConsumerMessage.emptyCommittableOffsetBatch().updated(first.committableOffset()),
      (batch, elem) -> batch.updated(elem.committableOffset())
  )
  .mapAsync(3, x -> x.commitJavadsl())
  .runWith(Sink.ignore(), materializer);

Separate streams per partition:

Scala
//Consumer group represented as Source[(TopicPartition, Source[Messages])]
val consumerGroup =
  Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics("topic1"))
//Process each assigned partition separately
consumerGroup.map {
  case (topicPartition, source) =>
    source
      .via(business)
      .toMat(Sink.ignore)(Keep.both)
      .run()
}
  .mapAsyncUnordered(maxPartitions)(_._2)
  .runWith(Sink.ignore)
Java
Consumer.Control c =
  Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics("topic1"))
    .map(pair -> pair.second().via(business()).toMat(Sink.ignore(), Keep.both()).run(materializer))
    .mapAsyncUnordered(maxPartitions, (pair) -> pair.second()).to(Sink.ignore()).run(materializer);

Join flows based on automatically assigned partitions:

Scala
type Msg = CommittableMessage[Array[Byte], String]
def zipper(left: Source[Msg, _], right: Source[Msg, _]): Source[(Msg, Msg), NotUsed] = ???

Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics("topic1"))
  .map {
    case (topicPartition, source) =>
      // get corresponding partition from other topic
      val otherSource = {
        val otherTopicPartition = new TopicPartition("otherTopic", topicPartition.partition())
        Consumer.committableSource(consumerSettings, Subscriptions.assignment(otherTopicPartition))
      }
      zipper(source, otherSource)
  }
  .flatMapMerge(maxPartitions, identity)
  .via(business)
  //build commit offsets
  .batch(max = 20, {
    case (l, r) => (
      CommittableOffsetBatch.empty.updated(l.committableOffset),
      CommittableOffsetBatch.empty.updated(r.committableOffset)
    )
  }) {
    case ((batchL, batchR), (l, r)) =>
      batchL.updated(l.committableOffset)
      batchR.updated(r.committableOffset)
      (batchL, batchR)
  }
  .mapAsync(1) { case (l, r) => l.commitScaladsl().map(_ => r) }
  .mapAsync(1)(_.commitScaladsl())
  .runWith(Sink.ignore)

Sharing KafkaConsumer

If you have many streams it can be more efficient to share the underlying KafkaConsumer. That can be shared via the KafkaConsumerActor. You need to create the actor and stop it when it is not needed any longer. You pass the ActorRef as a parameter to the Consumer factory methods.

Scala
//Consumer is represented by actor
val consumer: ActorRef = system.actorOf(KafkaConsumerActor.props(consumerSettings))

//Manually assign topic partition to it
Consumer
  .plainExternalSource[Array[Byte], String](consumer, Subscriptions.assignment(new TopicPartition("topic1", 1)))
  .via(business)
  .runWith(Sink.ignore)

//Manually assign another topic partition
Consumer
  .plainExternalSource[Array[Byte], String](consumer, Subscriptions.assignment(new TopicPartition("topic1", 2)))
  .via(business)
  .runWith(Sink.ignore)
Java
//Consumer is represented by actor
ActorRef consumer = system.actorOf((KafkaConsumerActor.props(consumerSettings)));

//Manually assign topic partition to it
Consumer
  .plainExternalSource(consumer, Subscriptions.assignment(new TopicPartition("topic1", 1)))
  .via(business())
  .runWith(Sink.ignore(), materializer);

//Manually assign another topic partition
Consumer
  .plainExternalSource(consumer, Subscriptions.assignment(new TopicPartition("topic1", 2)))
  .via(business())
  .runWith(Sink.ignore(), materializer);

Accessing KafkaConsumer metrics

You can access the underlying consumer metrics by ask-ing the KafkaConsumerActor for them:

Scala
// Consumer is represented by actor
val consumer: ActorRef = system.actorOf(KafkaConsumerActor.props(consumerSettings))

// use the consumer actor manually in streams:
val control: Consumer.Control = Consumer
  .plainExternalSource[Array[Byte], String](consumer, Subscriptions.assignment(new TopicPartition("topic1", 1)))
  .via(business)
  .to(Sink.ignore)
  .run()

println(s"metrics: ${control.metrics}")
Java
// run the stream to obtain the materialized Control value
Consumer.Control control = Consumer
    .plainSource(consumerSettings, Subscriptions.assignment(new TopicPartition("topic1", 2)))
    .via(business())
    .to(Sink.ignore())
    .run(materializer);

CompletionStage<Map<MetricName, Metric>> metrics = control.getMetrics();
metrics.thenAccept(m -> System.out.println("Metrics: " + m));

Listening for rebalance events

You may set up an rebalance event listener actor that will be notified when your consumer will be assigned or revoked from consuming from specific topic partitions. Two kinds of messages will be sent to this listener actor akka.kafka.TopicPartitionsAssigned and akka.kafka.TopicPartitionsRevoked, like this:

Scala
import akka.kafka.TopicPartitionsAssigned
import akka.kafka.TopicPartitionsRevoked

class RebalanceListener extends Actor with ActorLogging {
  def receive: Receive = {
    case TopicPartitionsAssigned(sub, topicPartitions) ⇒
      log.info("Assigned: {}", topicPartitions)

    case TopicPartitionsRevoked(sub, topicPartitions) ⇒
      log.info("Revoked: {}", topicPartitions)
  }
}

  val listener = system.actorOf(Props[RebalanceListener])

  val sub = Subscriptions.topics(Set("topic")) // create subscription
    // additionally, pass the rebalance callbacks:
    .withRebalanceListener(listener)

  // use the subscription as usual:
  Consumer.plainSource(consumerSettings, sub)
Java
class RebalanceListener extends AbstractActor {

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(akka.kafka.TopicPartitionsAssigned.class, assigned -> {})
        .match(akka.kafka.TopicPartitionsRevoked.class, revoked -> {})
        .build();
  }
}

  ActorRef listener = this.system.actorOf(Props.create(RebalanceListener.class));

  // pass in the listener callbacks into the subscription:
  Subscription sub = Subscriptions.topics("topic")
      .withRebalanceListener(listener);

  // use the subscription as usual:
  Consumer
    .plainSource(consumerSettings, sub);
The source code for this page can be found here.