Subscription

Consumer Sources are created with different types of subscriptions, which control from which topics, partitions and offsets data is to be consumed.

Subscriptions are grouped into two categories: with automatic partition assignment and with manual control of partition assignment.

Factory methods for all subscriptions can be found in the SubscriptionsSubscriptions factory.

Automatic Partition Assignment

Topic

Subscribes to one or more topics. Partitions will be assigned automatically by the Kafka Client.

Scala
sourceval subscription = Subscriptions.topics(topic)
val consumer = Consumer.plainSource(consumerDefaults.withGroupId(group), subscription)
Java
sourcefinal AutoSubscription subscription = Subscriptions.topics(topic);
final Source<ConsumerRecord<String, String>, Consumer.Control> consumer =
    Consumer.plainSource(consumerDefaults().withGroupId(group), subscription);

Topic Pattern

Subscribes to one or more topics which match the given pattern. Take a look at the subscribe​(java.util.regex.Pattern pattern, ...) method documentation for more information on topic pattern matching.

Scala
sourceval pattern = s"topic-$suffix-[0-9]+"
val subscription = Subscriptions.topicPattern(pattern)
val consumer = Consumer.plainSource(consumerDefaults.withGroupId(group), subscription)
Java
sourcefinal String pattern = "topic-900[1|2]-[0-9]+";
final AutoSubscription subscription = Subscriptions.topicPattern(pattern);
final Source<ConsumerRecord<String, String>, Consumer.Control> consumer =
    Consumer.plainSource(consumerDefaults().withGroupId(group), subscription);

Manual Partition Assignment

Partition Assignment

Subscribes to given topics and their given partitions.

Scala
sourceval partition = 0
val subscription = Subscriptions.assignment(new TopicPartition(topic, partition))
val consumer = Consumer.plainSource(consumerDefaults, subscription)
Java
sourcefinal Integer partition = 0;
final ManualSubscription subscription =
    Subscriptions.assignment(new TopicPartition(topic, partition));
final Source<ConsumerRecord<String, String>, Consumer.Control> consumer =
    Consumer.plainSource(consumerDefaults(), subscription);

Partition Assignment with Offset

Subscribes to given topics and their partitions allowing to also set an offset from which messages will be read.

Scala
sourceval partition = 0
val offset: Long = totalMessages.toLong / 2
val subscription = Subscriptions.assignmentWithOffset(new TopicPartition(topic, partition) -> offset)
val consumer = Consumer.plainSource(consumerDefaults, subscription)
Java
sourcefinal Integer partition = 0;
final long offset = totalMessages / 2;
final ManualSubscription subscription =
    Subscriptions.assignmentWithOffset(new TopicPartition(topic, partition), offset);
final Source<ConsumerRecord<String, String>, Consumer.Control> consumer =
    Consumer.plainSource(consumerDefaults(), subscription);

This subscription can be used when offsets are stored in Kafka or on external storage. For more information, take a look at the Offset Storage external to Kafka documentation page.

Partition Assignment with Timestamp

Subscribes to given topics and their partitions allowing to also set a timestamp which will be used to find the offset from which messages will be read.

Scala
sourceval partition = 0
val now = System.currentTimeMillis
val messagesSince: Long = now - 5000
val subscription = Subscriptions.assignmentOffsetsForTimes(new TopicPartition(topic, partition) -> messagesSince)
val consumer = Consumer.plainSource(consumerDefaults, subscription)
Java
sourcefinal Integer partition = 0;
final Long now = System.currentTimeMillis();
final Long messagesSince = now - 5000;
final ManualSubscription subscription =
    Subscriptions.assignmentOffsetsForTimes(
        new TopicPartition(topic, partition), messagesSince);
final Source<ConsumerRecord<String, String>, Consumer.Control> consumer =
    Consumer.plainSource(consumerDefaults(), subscription);
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.