Consumer

A consumer is used for subscribing to Kafka topics.

The underlaying implementation is using the KafkaConsumer, see Javadoc for description of consumer groups, offsets, and other details.

Example Code

For the examples in this section we use the following two dummy classes to illustrate how messages can be consumed.

Scala
class DB {

  private val offset = new AtomicLong

  def save(record: ConsumerRecord[Array[Byte], String]): Future[Done] = {
    println(s"DB.save: ${record.value}")
    offset.set(record.offset)
    Future.successful(Done)
  }

  def loadOffset(): Future[Long] =
    Future.successful(offset.get)

  def update(data: String): Future[Done] = {
    println(s"DB.update: $data")
    Future.successful(Done)
  }
}
Java
static class DB {
  private final AtomicLong offset = new AtomicLong();

  public CompletionStage<Done> save(ConsumerRecord<byte[], String> record) {
    System.out.println("DB.save: " + record.value());
    offset.set(record.offset());
    return CompletableFuture.completedFuture(Done.getInstance());
  }

  public CompletionStage<Long> loadOffset() {
    return CompletableFuture.completedFuture(offset.get());
  }

  public CompletionStage<Done> update(String data) {
    System.out.println("DB.update: " + data);
    return CompletableFuture.completedFuture(Done.getInstance());
  }
}
Scala
class Rocket {
  def launch(destination: String): Future[Done] = {
    println(s"Rocket launched to $destination")
    Future.successful(Done)
  }
}
Java
static class Rocket {
  public CompletionStage<Done> launch(String destination) {
    System.out.println("Rocket launched to " + destination);
    return CompletableFuture.completedFuture(Done.getInstance());
  }
}

Settings

When creating a consumer stream you need to pass in ConsumerSettings that define things like:

  • bootstrap servers of the Kafka cluster
  • group id for the consumer, note that offsets are always committed for a given consumer group
  • serializers for the keys and values
  • tuning parameters
Scala
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
  .withBootstrapServers("localhost:9092")
  .withGroupId("group1")
  .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
Java
protected final ConsumerSettings<byte[], String> consumerSettings =
    ConsumerSettings.create(system, new ByteArrayDeserializer(), new StringDeserializer())
  .withBootstrapServers("localhost:9092")
  .withGroupId("group1")
  .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

In addition to programmatic construction of the ConsumerSettings it can also be created from configuration (application.conf). By default when creating ConsumerSettings with the ActorSystem parameter it uses the config section akka.kafka.consumer.

# Properties for akka.kafka.ConsumerSettings can be
# defined in this section or a configuration section with
# the same layout. 
akka.kafka.consumer {
  # Tuning property of scheduled polls.
  poll-interval = 50ms
  
  # Tuning property of the `KafkaConsumer.poll` parameter.
  # Note that non-zero value means that blocking of the thread that
  # is executing the stage will be blocked.
  poll-timeout = 50ms
  
  # The stage will be await outstanding offset commit requests before
  # shutting down, but if that takes longer than this timeout it will
  # stop forcefully.
  stop-timeout = 30s
  
  # How long to wait for `KafkaConsumer.close`
  close-timeout = 20s
  
  # If offset commit requests are not completed within this timeout
  # the returned Future is completed `TimeoutException`.
  commit-timeout = 15s
  
  # If the KafkaConsumer can't connect to the broker the poll will be
  # aborted after this timeout. The KafkaConsumerActor will throw
  # org.apache.kafka.common.errors.WakeupException, which can be handled
  # with Actor supervision strategy.
  wakeup-timeout = 10s
  
  # Fully qualified config path which holds the dispatcher configuration
  # to be used by the KafkaConsumerActor. Some blocking may occur.
  use-dispatcher = "akka.kafka.default-dispatcher"

  # Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
  # can be defined in this configuration section.
  kafka-clients {
    # Disable auto-commit by default
    enable.auto.commit = false
  }
}

ConsumerSettings can also be created from any other Config section with the same layout as above.

See KafkaConsumer Javadoc and ConsumerConfig Javadoc for details.

External Offset Storage

The Consumer.plainSource emits ConsumerRecord elements (as received from the underlying KafkaConsumer).
It has not support for committing offsets to Kafka. It can be used when offset is stored externally
or with auto-commit (note that auto-commit is by default disabled).

The consumer application doesn’t need to use Kafka’s built-in offset storage, it can store offsets in a store of its own
choosing. The primary use case for this is allowing the application to store both the offset and the results of the
consumption in the same system in a way that both the results and offsets are stored atomically. This is not always
possible, but when it is it will make the consumption fully atomic and give “exactly once” semantics that are
stronger than the “at-least once” semantics you get with Kafka’s offset commit functionality.

Scala
val db = new DB
db.loadOffset().foreach { fromOffset =>
  val partition = 0
  val subscription = Subscriptions.assignmentWithOffset(
    new TopicPartition("topic1", partition) -> fromOffset
  )
  val done =
    Consumer.plainSource(consumerSettings, subscription)
      .mapAsync(1)(db.save)
      .runWith(Sink.ignore)
Java
final DB db = new DB();

db.loadOffset().thenAccept(fromOffset -> {
  Consumer.plainSource(
    consumerSettings,
    Subscriptions.assignmentWithOffset(new TopicPartition("topic1", 0), fromOffset)
  ).mapAsync(1, record -> db.save(record))
  .runWith(Sink.ignore(), materializer);
});

Note how the starting point (offset) is assigned for a given consumer group id,
topic and partition. The group id is defined in the ConsumerSettings.

Offset Storage in Kafka

The Consumer.committableSource makes it possible to commit offset positions to Kafka.

Compared to auto-commit this gives exact control of when a message is considered consumed.

If you need to store offsets in anything other than Kafka, plainSource should be used instead of this API.

This is useful when “at-least once delivery” is desired, as each message will likely be delivered one time but in failure cases could be duplicated.

Scala
val db = new DB

val done =
  Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
    .mapAsync(1) { msg =>
      db.update(msg.record.value).map(_ => msg)
    }
    .mapAsync(1) { msg =>
      msg.committableOffset.commitScaladsl()
    }
    .runWith(Sink.ignore)
Java
final DB db = new DB();

Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
  .mapAsync(1, msg -> db.update(msg.record().value())
    .thenApply(done -> msg))
  .mapAsync(1, msg -> msg.committableOffset().commitJavadsl())
  .runWith(Sink.ignore(), materializer);

The above example uses separate mapAsync stages for processing and committing. This guarantees that for parallelism higher than 1 we will keep correct ordering of messages sent for commit.

Committing the offset for each message as illustrated above is rather slow. It is recommended to batch the commits for better throughput, with the trade-off that more messages may be re-delivered in case of failures.

You can use the Akka Stream batch combinator to perform the batching. Note that it will only aggregate elements into batches if the downstream consumer is slower than the upstream producer.

Scala
val db = new DB

val done =
  Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
    .mapAsync(1) { msg =>
      db.update(msg.record.value).map(_ => msg.committableOffset)
    }
    .batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) { (batch, elem) =>
      batch.updated(elem)
    }
    .mapAsync(3)(_.commitScaladsl())
    .runWith(Sink.ignore)
Java
final DB db = new DB();

Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
  .mapAsync(1, msg ->
    db.update(msg.record().value()).thenApply(done -> msg.committableOffset()))
  .batch(20,
    first -> ConsumerMessage.emptyCommittableOffsetBatch().updated(first),
    (batch, elem) -> batch.updated(elem))
    .mapAsync(3, c -> c.commitJavadsl())
  .runWith(Sink.ignore(), materializer);

groupedWithin is an alternative way of aggregating elements:

Scala
.groupedWithin(10, 5.seconds)
.map(group => group.foldLeft(CommittableOffsetBatch.empty) { (batch, elem) => batch.updated(elem) })
.mapAsync(3)(_.commitScaladsl())
Java
    source
      .groupedWithin(20, Duration.create(5, TimeUnit.SECONDS))
      .map(group -> foldLeft(group))
      .mapAsync(3, c -> c.commitJavadsl())

private ConsumerMessage.CommittableOffsetBatch foldLeft(List<ConsumerMessage.CommittableOffset> group) {
  ConsumerMessage.CommittableOffsetBatch batch = ConsumerMessage.emptyCommittableOffsetBatch();
  for (ConsumerMessage.CommittableOffset elem: group) {
    batch = batch.updated(elem);
  }
  return batch;
}

If you commit the offset before processing the message you get “at-most once delivery” semantics, and for that there is a Consumer.atMostOnceSource. However, atMostOnceSource commits the offset for each message and that is rather slow, batching of commits is recommended.

Scala
val rocket = new Rocket

val done = Consumer.atMostOnceSource(consumerSettings, Subscriptions.topics("topic1"))
  .mapAsync(1) { record =>
    rocket.launch(record.value)
  }
  .runWith(Sink.ignore)
Java
final Rocket rocket = new Rocket();

Consumer.atMostOnceSource(consumerSettings, Subscriptions.topics("topic1"))
  .mapAsync(1, record -> rocket.launch(record.value()))
  .runWith(Sink.ignore(), materializer);

Connecting Producer and Consumer

For cases when you need to read messages from one topic, transform or enrich them, and then write to another topic you can use Consumer.committableSource and connect it to a Producer.commitableSink. The commitableSink will commit the offset back to the consumer when it has successfully published the message.

Note that there is a risk that something fails after publishing but before committing, so commitableSink has “at-least once delivery” semantics.

Scala
Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
  .map { msg =>
    println(s"topic1 -> topic2: $msg")
    ProducerMessage.Message(new ProducerRecord[Array[Byte], String](
      "topic2",
      msg.record.value
    ), msg.committableOffset)
  }
  .runWith(Producer.commitableSink(producerSettings))
Java
Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
  .map(msg ->
    new ProducerMessage.Message<byte[], String, ConsumerMessage.Committable>(
        new ProducerRecord<>("topic2", msg.record().value()), msg.committableOffset()))
  .runWith(Producer.commitableSink(producerSettings), materializer);

As mentioned earlier, committing each message is rather slow and we can batch the commits to get higher throughput.

Scala
val done =
  Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
    .map(msg =>
      ProducerMessage.Message(new ProducerRecord[Array[Byte], String]("topic2", msg.record.value), msg.committableOffset))
    .via(Producer.flow(producerSettings))
    .map(_.message.passThrough)
    .batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) { (batch, elem) =>
      batch.updated(elem)
    }
    .mapAsync(3)(_.commitScaladsl())
    .runWith(Sink.ignore)
Java
Source<ConsumerMessage.CommittableOffset, Consumer.Control> source =
  Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
  .map(msg ->
      new ProducerMessage.Message<byte[], String, ConsumerMessage.CommittableOffset>(
          new ProducerRecord<>("topic2", msg.record().value()), msg.committableOffset()))
  .via(Producer.flow(producerSettings))
  .map(result -> result.message().passThrough());

  source.batch(20,
      first -> ConsumerMessage.emptyCommittableOffsetBatch().updated(first),
      (batch, elem) -> batch.updated(elem))
    .mapAsync(3, c -> c.commitJavadsl())
    .runWith(Sink.ignore(), materializer);

Source per partition

Consumer.plainPartitionedSource and Consumer.committablePartitionedSource supports tracking the automatic partition assignment from Kafka. When topic-partition is assigned to a consumer this source will emit tuple with assigned topic-partition and a corresponding source. When topic-partition is revoked then corresponding source completes.

Backpressure per partition with batch commit:

Scala
val done = Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics("topic1"))
  .flatMapMerge(maxPartitions, _._2)
  .via(business)
  .batch(max = 20, first => CommittableOffsetBatch.empty.updated(first.committableOffset)) { (batch, elem) =>
    batch.updated(elem.committableOffset)
  }
  .mapAsync(3)(_.commitScaladsl())
  .runWith(Sink.ignore)
Java
Consumer
  .committablePartitionedSource(consumerSettings, Subscriptions.topics("topic1"))
  .flatMapMerge(maxPartitions, Pair::second)
  .via(business())
  .batch(
      100,
      first -> ConsumerMessage.emptyCommittableOffsetBatch().updated(first.committableOffset()),
      (batch, elem) -> batch.updated(elem.committableOffset())
  )
  .mapAsync(3, x -> x.commitJavadsl())
  .runWith(Sink.ignore(), materializer);

Separate streams per partition:

Scala
//Consumer group represented as Source[(TopicPartition, Source[Messages])]
val consumerGroup =
  Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics("topic1"))
//Process each assigned partition separately
consumerGroup.map {
  case (topicPartition, source) =>
    source
      .via(business)
      .toMat(Sink.ignore)(Keep.both)
      .run()
}
  .mapAsyncUnordered(maxPartitions)(_._2)
  .runWith(Sink.ignore)
Java
Consumer.Control c =
  Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics("topic1"))
    .map(pair -> pair.second().via(business()).toMat(Sink.ignore(), Keep.both()).run(materializer))
    .mapAsyncUnordered(maxPartitions, (pair) -> pair.second()).to(Sink.ignore()).run(materializer);

Join flows based on automatically assigned partitions:

Scala
type Msg = CommittableMessage[Array[Byte], String]
def zipper(left: Source[Msg, _], right: Source[Msg, _]): Source[(Msg, Msg), NotUsed] = ???

Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics("topic1"))
  .map {
    case (topicPartition, source) =>
      // get corresponding partition from other topic
      val otherSource = {
        val otherTopicPartition = new TopicPartition("otherTopic", topicPartition.partition())
        Consumer.committableSource(consumerSettings, Subscriptions.assignment(otherTopicPartition))
      }
      zipper(source, otherSource)
  }
  .flatMapMerge(maxPartitions, identity)
  .via(business)
  //build commit offsets
  .batch(max = 20, {
    case (l, r) => (
      CommittableOffsetBatch.empty.updated(l.committableOffset),
      CommittableOffsetBatch.empty.updated(r.committableOffset)
    )
  }) {
    case ((batchL, batchR), (l, r)) =>
      batchL.updated(l.committableOffset)
      batchR.updated(r.committableOffset)
      (batchL, batchR)
  }
  .mapAsync(1) { case (l, r) => l.commitScaladsl().map(_ => r) }
  .mapAsync(1)(_.commitScaladsl())
  .runWith(Sink.ignore)

Sharing KafkaConsumer

If you have many streams it can be more efficient to share the underlying KafkaConsumer. That can be shared via the KafkaConsumerActor. You need to create the actor and stop it when it is not needed any longer. You pass the ActorRef as a parameter to the Consumer factory methods.

Scala
//Consumer is represented by actor
val consumer: ActorRef = system.actorOf(KafkaConsumerActor.props(consumerSettings))

//Manually assign topic partition to it
Consumer
  .plainExternalSource[Array[Byte], String](consumer, Subscriptions.assignment(new TopicPartition("topic1", 1)))
  .via(business)
  .runWith(Sink.ignore)

//Manually assign another topic partition
Consumer
  .plainExternalSource[Array[Byte], String](consumer, Subscriptions.assignment(new TopicPartition("topic1", 2)))
  .via(business)
  .runWith(Sink.ignore)
Java
//Consumer is represented by actor
ActorRef consumer = system.actorOf((KafkaConsumerActor.props(consumerSettings)));

//Manually assign topic partition to it
Consumer
  .plainExternalSource(consumer, Subscriptions.assignment(new TopicPartition("topic1", 1)))
  .via(business())
  .runWith(Sink.ignore(), materializer);

//Manually assign another topic partition
Consumer
  .plainExternalSource(consumer, Subscriptions.assignment(new TopicPartition("topic1", 2)))
  .via(business())
  .runWith(Sink.ignore(), materializer);