Consumer Metadata
To access the Kafka consumer metadata you need to create the KafkaConsumerActor
as described in the Consumer documentation and send messages from Metadata
(API) to it.
The metadata the Kafka Consumer provides is documented in the Kafka Consumer API.
Supported metadata
The supported metadata are
Request | Reply |
---|---|
ListTopics | Topics |
GetPartitionsFor | PartitionsFor |
GetBeginningOffsets | BeginningOffsets |
GetEndOffsets | EndOffsets |
GetOffsetsForTimes | OffsetsForTimes |
GetCommittedOffset | CommittedOffset |
These requests are blocking within the Kafka client library up to a timeout configured by metadata-request-timeout
or ConsumerSettings.withMetadataRequestTimeout
respectively.
Processing of these requests blocks the actor loop. The KafkaConsumerActor
is configured to run on its own dispatcher, so just as the other remote calls to Kafka, the blocking happens within a designated thread pool.
However, calling these during consuming might affect performance and even cause timeouts in extreme cases.
Please consider to use a dedicated KafkaConsumerActor
to run metadata requests against.
Example
- Scala
- Java
-
import akka.actor.ActorRef; import akka.kafka.ConsumerSettings; import akka.kafka.KafkaConsumerActor; import akka.kafka.Metadata; import akka.kafka.testkit.javadsl.TestcontainersKafkaJunit4Test; import akka.pattern.Patterns; import java.time.Duration; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletionStage; import java.util.stream.Collectors; import org.apache.kafka.common.PartitionInfo; Duration timeout = Duration.ofSeconds(2); ConsumerSettings<String, String> settings = consumerSettings.withMetadataRequestTimeout(timeout); ActorRef consumer = system().actorOf((KafkaConsumerActor.props(settings))); CompletionStage<Metadata.Topics> topicsStage = Patterns.ask(consumer, Metadata.createListTopics(), timeout) .thenApply(reply -> ((Metadata.Topics) reply)); // convert response CompletionStage<Optional<List<String>>> response = topicsStage .thenApply(Metadata.Topics::getResponse) .thenApply( responseOptional -> responseOptional.map( map -> map.entrySet().stream() .flatMap( entry -> { String topic = entry.getKey(); List<PartitionInfo> partitionInfos = entry.getValue(); return partitionInfos.stream() .map(info -> topic + ": " + info.toString()); }) .collect(Collectors.toList())));