Consumer Metadata

To access the Kafka consumer metadata you need to create the KafkaConsumerActor as described in the Consumer documentation and send messages from Metadata (API) to it.

The metadata the Kafka Consumer provides is documented in the Kafka Consumer API.

The supported metadata are

Request Reply
ListTopics Topics
GetPartitionsFor PartitionsFor
GetBeginningOffsets BeginningOffsets
GetEndOffsets EndOffsets
GetOffsetsForTimes OffsetsForTimes
GetCommittedOffset CommittedOffset

Processing of these requests blocks the actor loop. The KafkaConsumerActor is configured to run on its own dispatcher, so just as the other remote calls to Kafka, the blocking happens within a designated thread pool.

However, calling these during consuming might affect performance and even cause timeouts in extreme cases.

import akka.kafka.KafkaConsumerActor
import akka.kafka.Metadata
import akka.pattern.ask
import akka.util.Timeout

import scala.concurrent.Future
import scala.concurrent.duration._

implicit val timeout = Timeout(5.seconds)

val consumer: ActorRef = system.actorOf(KafkaConsumerActor.props(consumerSettings))

// ... create source ...

val topicsFuture: Future[Metadata.Topics] = (consumer ? Metadata.ListTopics).mapTo[Metadata.Topics] { map =>
  map.foreach {
    case (topic, partitionInfo) =>
      partitionInfo.foreach { info =>
        println(s"$topic: $info")
Full source at GitHub
import akka.kafka.KafkaConsumerActor;
import akka.kafka.Metadata;
import akka.pattern.PatternsCS;
import akka.util.Timeout;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

// Create kafka consumer actor to be used with Consumer.plainExternalSource or committableExternalSource
ActorRef consumer = system.actorOf((KafkaConsumerActor.props(consumerSettings)));

// ... create source ...
Timeout timeout = new Timeout(2, TimeUnit.SECONDS);

CompletionStage<Metadata.Topics> topicsStage = PatternsCS.ask(consumer, Metadata.createListTopics(), timeout)
        .thenApply(reply -> ((Metadata.Topics) reply));

// print response
        .thenAccept(responseOption -> responseOption.ifPresent(map -> map.forEach((topic, partitionInfo) ->
                partitionInfo.forEach(info ->
                        System.out.println(topic + ": " + info.toString())
Full source at GitHub
The source code for this page can be found here.