Consumer Metadata

To access the Kafka consumer metadata you need to create the KafkaConsumerActor as described in the Consumer documentation and send messages from Metadata (API) to it.

The metadata the Kafka Consumer provides is documented in the Kafka Consumer API.

Supported metadata

The supported metadata are

Request Reply
ListTopics Topics
GetPartitionsFor PartitionsFor
GetBeginningOffsets BeginningOffsets
GetEndOffsets EndOffsets
GetOffsetsForTimes OffsetsForTimes
GetCommittedOffset CommittedOffset

These requests are blocking within the Kafka client library up to a timeout configured by metadata-request-timeout or ConsumerSettings.withMetadataRequestTimeout respectively.

Warning

Processing of these requests blocks the actor loop. The KafkaConsumerActor is configured to run on its own dispatcher, so just as the other remote calls to Kafka, the blocking happens within a designated thread pool.

However, calling these during consuming might affect performance and even cause timeouts in extreme cases.

Please consider to use a dedicated KafkaConsumerActor to run metadata requests against.

Example

Scala
import akka.actor.ActorRef
import akka.kafka.{KafkaConsumerActor, KafkaPorts, Metadata}
import akka.pattern.ask
import akka.util.Timeout
import org.apache.kafka.common.TopicPartition

import scala.concurrent.Future
import scala.concurrent.duration._

val timeout = 5.seconds
val settings = consumerSettings.withMetadataRequestTimeout(timeout)
implicit val askTimeout = Timeout(timeout)

val consumer: ActorRef = system.actorOf(KafkaConsumerActor.props(settings))

val topicsFuture: Future[Metadata.Topics] = (consumer ? Metadata.ListTopics).mapTo[Metadata.Topics]

topicsFuture.map(_.response.foreach { map =>
  println("Found topics:")
  map.foreach {
    case (topic, partitionInfo) =>
      partitionInfo.foreach { info =>
        println(s"  $topic: $info")
      }
  }
})
Java
import akka.actor.ActorRef;
import akka.kafka.ConsumerSettings;
import akka.kafka.KafkaConsumerActor;
import akka.kafka.KafkaPorts;
import akka.kafka.Metadata;
import akka.pattern.Patterns;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import org.apache.kafka.common.PartitionInfo;

Duration timeout = Duration.ofSeconds(2);
ConsumerSettings<String, String> settings =
    consumerSettings.withMetadataRequestTimeout(timeout);

ActorRef consumer = system().actorOf((KafkaConsumerActor.props(settings)));

CompletionStage<Metadata.Topics> topicsStage =
    Patterns.ask(consumer, Metadata.createListTopics(), timeout)
        .thenApply(reply -> ((Metadata.Topics) reply));

// convert response
CompletionStage<Optional<List<String>>> response =
    topicsStage
        .thenApply(Metadata.Topics::getResponse)
        .thenApply(
            responseOptional ->
                responseOptional.map(
                    map ->
                        map.entrySet()
                            .stream()
                            .flatMap(
                                entry -> {
                                  String topic = entry.getKey();
                                  List<PartitionInfo> partitionInfos = entry.getValue();
                                  return partitionInfos
                                      .stream()
                                      .map(info -> topic + ": " + info.toString());
                                })
                            .collect(Collectors.toList())));
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.