Consumer Metadata

To access the Kafka consumer metadata you need to create the KafkaConsumerActor as described in the Consumer documentation and send messages from Metadata (API) to it.

The metadata the Kafka Consumer provides is documented in the Kafka Consumer API.

The supported metadata are

Request Reply
ListTopics Topics
GetPartitionsFor PartitionsFor
GetBeginningOffsets BeginningOffsets
GetEndOffsets EndOffsets
GetOffsetsForTimes OffsetsForTimes
GetCommittedOffset CommittedOffset
Warning

Processing of these requests blocks the actor loop. The KafkaConsumerActor is configured to run on its own dispatcher, so just as the other remote calls to Kafka, the blocking happens within a designated thread pool.

However, calling these during consuming might affect performance and even cause timeouts in extreme cases.

Scala
import akka.actor.ActorRef
import akka.kafka.KafkaConsumerActor
import akka.kafka.Metadata
import akka.pattern.ask
import akka.util.Timeout

import scala.concurrent.Future
import scala.concurrent.duration._

    implicit val timeout = Timeout(5.seconds)

    val consumer: ActorRef = system.actorOf(KafkaConsumerActor.props(consumerSettings))

    // ... create source ...

    val topicsFuture: Future[Metadata.Topics] = (consumer ? Metadata.ListTopics).mapTo[Metadata.Topics]

    topicsFuture.map(_.response.foreach { map =>
      map.foreach {
        case (topic, partitionInfo) =>
          partitionInfo.foreach { info =>
            println(s"$topic: $info")
          }
      }
    })
Java
import akka.actor.ActorRef;
import akka.kafka.KafkaConsumerActor;
import akka.kafka.Metadata;
import akka.pattern.PatternsCS;
import akka.util.Timeout;
import org.apache.kafka.common.TopicPartition;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

// Create kafka consumer actor to be used with Consumer.plainExternalSource or committableExternalSource
ActorRef consumer = system.actorOf((KafkaConsumerActor.props(consumerSettings)));

// ... create source ...
Timeout timeout = new Timeout(2, TimeUnit.SECONDS);

CompletionStage<Metadata.Topics> topicsStage = PatternsCS.ask(consumer, Metadata.createListTopics(), timeout)
        .thenApply(reply -> ((Metadata.Topics) reply));

// print response
topicsStage
        .thenApply(Metadata.Topics::getResponse)
        .thenAccept(responseOption -> responseOption.ifPresent(map -> map.forEach((topic, partitionInfo) ->
                partitionInfo.forEach(info ->
                        System.out.println(topic + ": " + info.toString())
                ))));
The source code for this page can be found here.