Consumer
A consumer is used for subscribing to Kafka topics.
The underlaying implementation is using the KafkaConsumer
, see Javadoc for description of consumer groups, offsets, and other details.
Example Code
For the examples in this section we use the following two dummy classes to illustrate how messages can be consumed.
- Scala
-
class DB { private val offset = new AtomicLong def save(record: ConsumerRecord[Array[Byte], String]): Future[Done] = { println(s"DB.save: ${record.value}") offset.set(record.offset) Future.successful(Done) } def loadOffset(): Future[Long] = Future.successful(offset.get) def update(data: String): Future[Done] = { println(s"DB.update: $data") Future.successful(Done) } }
- Java
-
static class DB { private final AtomicLong offset = new AtomicLong(); public CompletionStage<Done> save(ConsumerRecord<byte[], String> record) { System.out.println("DB.save: " + record.value()); offset.set(record.offset()); return CompletableFuture.completedFuture(Done.getInstance()); } public CompletionStage<Long> loadOffset() { return CompletableFuture.completedFuture(offset.get()); } public CompletionStage<Done> update(String data) { System.out.println("DB.update: " + data); return CompletableFuture.completedFuture(Done.getInstance()); } }
- Scala
-
class Rocket { def launch(destination: String): Future[Done] = { println(s"Rocket launched to $destination") Future.successful(Done) } }
- Java
-
static class Rocket { public CompletionStage<Done> launch(String destination) { System.out.println("Rocket launched to " + destination); return CompletableFuture.completedFuture(Done.getInstance()); } }
Settings
When creating a consumer stream you need to pass in ConsumerSettings
that define things like:
- bootstrap servers of the Kafka cluster
- group id for the consumer, note that offsets are always committed for a given consumer group
- serializers for the keys and values
- tuning parameters
- Scala
-
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer) .withBootstrapServers("localhost:9092") .withGroupId("group1") .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
- Java
-
protected final ConsumerSettings<byte[], String> consumerSettings = ConsumerSettings.create(system, new ByteArrayDeserializer(), new StringDeserializer()) .withBootstrapServers("localhost:9092") .withGroupId("group1") .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
In addition to programmatic construction of the ConsumerSettings
it can also be created from configuration (application.conf
). By default when creating ConsumerSettings
with the ActorSystem
parameter it uses the config section akka.kafka.consumer
.
# Properties for akka.kafka.ConsumerSettings can be
# defined in this section or a configuration section with
# the same layout.
akka.kafka.consumer {
# Tuning property of scheduled polls.
poll-interval = 50ms
# Tuning property of the `KafkaConsumer.poll` parameter.
# Note that non-zero value means that blocking of the thread that
# is executing the stage will be blocked.
poll-timeout = 50ms
# The stage will be await outstanding offset commit requests before
# shutting down, but if that takes longer than this timeout it will
# stop forcefully.
stop-timeout = 30s
# How long to wait for `KafkaConsumer.close`
close-timeout = 20s
# If offset commit requests are not completed within this timeout
# the returned Future is completed `TimeoutException`.
commit-timeout = 15s
# If the KafkaConsumer can't connect to the broker the poll will be
# aborted after this timeout. The KafkaConsumerActor will throw
# org.apache.kafka.common.errors.WakeupException, which can be handled
# with Actor supervision strategy.
wakeup-timeout = 10s
# Fully qualified config path which holds the dispatcher configuration
# to be used by the KafkaConsumerActor. Some blocking may occur.
use-dispatcher = "akka.kafka.default-dispatcher"
# Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
# can be defined in this configuration section.
kafka-clients {
# Disable auto-commit by default
enable.auto.commit = false
}
}
ConsumerSettings
can also be created from any other Config
section with the same layout as above.
See KafkaConsumer Javadoc and ConsumerConfig Javadoc for details.
External Offset Storage
The Consumer.plainSource
emits ConsumerRecord
elements (as received from the underlying KafkaConsumer
).
It has not support for committing offsets to Kafka. It can be used when offset is stored externally
or with auto-commit (note that auto-commit is by default disabled).
The consumer application doesn’t need to use Kafka’s built-in offset storage, it can store offsets in a store of its own
choosing. The primary use case for this is allowing the application to store both the offset and the results of the
consumption in the same system in a way that both the results and offsets are stored atomically. This is not always
possible, but when it is it will make the consumption fully atomic and give “exactly once” semantics that are
stronger than the “at-least once” semantics you get with Kafka’s offset commit functionality.
- Scala
-
val db = new DB db.loadOffset().foreach { fromOffset => val partition = 0 val subscription = Subscriptions.assignmentWithOffset( new TopicPartition("topic1", partition) -> fromOffset ) val done = Consumer.plainSource(consumerSettings, subscription) .mapAsync(1)(db.save) .runWith(Sink.ignore)
- Java
-
final DB db = new DB(); db.loadOffset().thenAccept(fromOffset -> { Consumer.plainSource( consumerSettings, Subscriptions.assignmentWithOffset(new TopicPartition("topic1", 0), fromOffset) ).mapAsync(1, record -> db.save(record)) .runWith(Sink.ignore(), materializer); });
Note how the starting point (offset) is assigned for a given consumer group id,
topic and partition. The group id is defined in the ConsumerSettings
.
Offset Storage in Kafka
The Consumer.committableSource
makes it possible to commit offset positions to Kafka.
Compared to auto-commit this gives exact control of when a message is considered consumed.
If you need to store offsets in anything other than Kafka, plainSource
should be used instead of this API.
This is useful when “at-least once delivery” is desired, as each message will likely be delivered one time but in failure cases could be duplicated.
- Scala
-
val db = new DB val done = Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1")) .mapAsync(1) { msg => db.update(msg.record.value).map(_ => msg) } .mapAsync(1) { msg => msg.committableOffset.commitScaladsl() } .runWith(Sink.ignore)
- Java
-
final DB db = new DB(); Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1")) .mapAsync(1, msg -> db.update(msg.record().value()) .thenApply(done -> msg)) .mapAsync(1, msg -> msg.committableOffset().commitJavadsl()) .runWith(Sink.ignore(), materializer);
The above example uses separate mapAsync
stages for processing and committing. This guarantees that for parallelism
higher than 1 we will keep correct ordering of messages sent for commit.
Committing the offset for each message as illustrated above is rather slow. It is recommended to batch the commits for better throughput, with the trade-off that more messages may be re-delivered in case of failures.
You can use the Akka Stream batch
combinator to perform the batching. Note that it will only aggregate elements into batches if the downstream consumer is slower than the upstream producer.
- Scala
-
val db = new DB val done = Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1")) .mapAsync(1) { msg => db.update(msg.record.value).map(_ => msg.committableOffset) } .batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) { (batch, elem) => batch.updated(elem) } .mapAsync(3)(_.commitScaladsl()) .runWith(Sink.ignore)
- Java
-
final DB db = new DB(); Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1")) .mapAsync(1, msg -> db.update(msg.record().value()).thenApply(done -> msg.committableOffset())) .batch(20, first -> ConsumerMessage.emptyCommittableOffsetBatch().updated(first), (batch, elem) -> batch.updated(elem)) .mapAsync(3, c -> c.commitJavadsl()) .runWith(Sink.ignore(), materializer);
groupedWithin
is an alternative way of aggregating elements:
- Scala
-
.groupedWithin(10, 5.seconds) .map(group => group.foldLeft(CommittableOffsetBatch.empty) { (batch, elem) => batch.updated(elem) }) .mapAsync(3)(_.commitScaladsl())
- Java
-
source .groupedWithin(20, Duration.create(5, TimeUnit.SECONDS)) .map(group -> foldLeft(group)) .mapAsync(3, c -> c.commitJavadsl()) private ConsumerMessage.CommittableOffsetBatch foldLeft(List<ConsumerMessage.CommittableOffset> group) { ConsumerMessage.CommittableOffsetBatch batch = ConsumerMessage.emptyCommittableOffsetBatch(); for (ConsumerMessage.CommittableOffset elem: group) { batch = batch.updated(elem); } return batch; }
If you commit the offset before processing the message you get “at-most once delivery” semantics, and for that there is a Consumer.atMostOnceSource
. However, atMostOnceSource
commits the offset for each message and that is rather slow, batching of commits is recommended.
- Scala
-
val rocket = new Rocket val done = Consumer.atMostOnceSource(consumerSettings, Subscriptions.topics("topic1")) .mapAsync(1) { record => rocket.launch(record.value) } .runWith(Sink.ignore)
- Java
-
final Rocket rocket = new Rocket(); Consumer.atMostOnceSource(consumerSettings, Subscriptions.topics("topic1")) .mapAsync(1, record -> rocket.launch(record.value())) .runWith(Sink.ignore(), materializer);
Connecting Producer and Consumer
For cases when you need to read messages from one topic, transform or enrich them, and then write to another topic you can use Consumer.committableSource
and connect it to a Producer.commitableSink
. The commitableSink
will commit the offset back to the consumer when it has successfully published the message.
Note that there is a risk that something fails after publishing but before committing, so commitableSink
has “at-least once delivery” semantics.
- Scala
-
Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1")) .map { msg => println(s"topic1 -> topic2: $msg") ProducerMessage.Message(new ProducerRecord[Array[Byte], String]( "topic2", msg.record.value ), msg.committableOffset) } .runWith(Producer.commitableSink(producerSettings))
- Java
-
Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1")) .map(msg -> new ProducerMessage.Message<byte[], String, ConsumerMessage.Committable>( new ProducerRecord<>("topic2", msg.record().value()), msg.committableOffset())) .runWith(Producer.commitableSink(producerSettings), materializer);
As mentioned earlier, committing each message is rather slow and we can batch the commits to get higher throughput.
- Scala
-
val done = Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1")) .map(msg => ProducerMessage.Message(new ProducerRecord[Array[Byte], String]("topic2", msg.record.value), msg.committableOffset)) .via(Producer.flow(producerSettings)) .map(_.message.passThrough) .batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) { (batch, elem) => batch.updated(elem) } .mapAsync(3)(_.commitScaladsl()) .runWith(Sink.ignore)
- Java
-
Source<ConsumerMessage.CommittableOffset, Consumer.Control> source = Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1")) .map(msg -> new ProducerMessage.Message<byte[], String, ConsumerMessage.CommittableOffset>( new ProducerRecord<>("topic2", msg.record().value()), msg.committableOffset())) .via(Producer.flow(producerSettings)) .map(result -> result.message().passThrough()); source.batch(20, first -> ConsumerMessage.emptyCommittableOffsetBatch().updated(first), (batch, elem) -> batch.updated(elem)) .mapAsync(3, c -> c.commitJavadsl()) .runWith(Sink.ignore(), materializer);
Source per partition
Consumer.plainPartitionedSource
and Consumer.committablePartitionedSource
supports tracking the automatic partition assignment from Kafka. When topic-partition is assigned to a consumer this source will emit tuple with assigned topic-partition and a corresponding source. When topic-partition is revoked then corresponding source completes.
Backpressure per partition with batch commit:
- Scala
-
val done = Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics("topic1")) .flatMapMerge(maxPartitions, _._2) .via(business) .batch(max = 20, first => CommittableOffsetBatch.empty.updated(first.committableOffset)) { (batch, elem) => batch.updated(elem.committableOffset) } .mapAsync(3)(_.commitScaladsl()) .runWith(Sink.ignore)
- Java
-
Consumer .committablePartitionedSource(consumerSettings, Subscriptions.topics("topic1")) .flatMapMerge(maxPartitions, Pair::second) .via(business()) .batch( 100, first -> ConsumerMessage.emptyCommittableOffsetBatch().updated(first.committableOffset()), (batch, elem) -> batch.updated(elem.committableOffset()) ) .mapAsync(3, x -> x.commitJavadsl()) .runWith(Sink.ignore(), materializer);
Separate streams per partition:
- Scala
-
//Consumer group represented as Source[(TopicPartition, Source[Messages])] val consumerGroup = Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics("topic1")) //Process each assigned partition separately consumerGroup.map { case (topicPartition, source) => source .via(business) .toMat(Sink.ignore)(Keep.both) .run() } .mapAsyncUnordered(maxPartitions)(_._2) .runWith(Sink.ignore)
- Java
-
Consumer.Control c = Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics("topic1")) .map(pair -> pair.second().via(business()).toMat(Sink.ignore(), Keep.both()).run(materializer)) .mapAsyncUnordered(maxPartitions, (pair) -> pair.second()).to(Sink.ignore()).run(materializer);
Join flows based on automatically assigned partitions:
- Scala
-
type Msg = CommittableMessage[Array[Byte], String] def zipper(left: Source[Msg, _], right: Source[Msg, _]): Source[(Msg, Msg), NotUsed] = ??? Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics("topic1")) .map { case (topicPartition, source) => // get corresponding partition from other topic val otherSource = { val otherTopicPartition = new TopicPartition("otherTopic", topicPartition.partition()) Consumer.committableSource(consumerSettings, Subscriptions.assignment(otherTopicPartition)) } zipper(source, otherSource) } .flatMapMerge(maxPartitions, identity) .via(business) //build commit offsets .batch(max = 20, { case (l, r) => ( CommittableOffsetBatch.empty.updated(l.committableOffset), CommittableOffsetBatch.empty.updated(r.committableOffset) ) }) { case ((batchL, batchR), (l, r)) => batchL.updated(l.committableOffset) batchR.updated(r.committableOffset) (batchL, batchR) } .mapAsync(1) { case (l, r) => l.commitScaladsl().map(_ => r) } .mapAsync(1)(_.commitScaladsl()) .runWith(Sink.ignore)
Sharing KafkaConsumer
If you have many streams it can be more efficient to share the underlying KafkaConsumer
. That can be shared via the KafkaConsumerActor
. You need to create the actor and stop it when it is not needed any longer. You pass the ActorRef
as a parameter to the Consumer
factory methods.
- Scala
-
//Consumer is represented by actor val consumer: ActorRef = system.actorOf(KafkaConsumerActor.props(consumerSettings)) //Manually assign topic partition to it Consumer .plainExternalSource[Array[Byte], String](consumer, Subscriptions.assignment(new TopicPartition("topic1", 1))) .via(business) .runWith(Sink.ignore) //Manually assign another topic partition Consumer .plainExternalSource[Array[Byte], String](consumer, Subscriptions.assignment(new TopicPartition("topic1", 2))) .via(business) .runWith(Sink.ignore)
- Java
-
//Consumer is represented by actor ActorRef consumer = system.actorOf((KafkaConsumerActor.props(consumerSettings))); //Manually assign topic partition to it Consumer .plainExternalSource(consumer, Subscriptions.assignment(new TopicPartition("topic1", 1))) .via(business()) .runWith(Sink.ignore(), materializer); //Manually assign another topic partition Consumer .plainExternalSource(consumer, Subscriptions.assignment(new TopicPartition("topic1", 2))) .via(business()) .runWith(Sink.ignore(), materializer);