class ConsumerSettings[K, V] extends AnyRef
Settings for consumers. See akka.kafka.consumer
section in
reference.conf
. Note that the companion object provides
apply
and create
functions for convenient construction of the settings, together with
the with
methods.
The constructor is Internal API.
- Source
- ConsumerSettings.scala
- Alphabetic
- By Inheritance
- ConsumerSettings
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Value Members
- final def !=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def ##: Int
- Definition Classes
- AnyRef → Any
- final def ==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def asInstanceOf[T0]: T0
- Definition Classes
- Any
- def clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.CloneNotSupportedException]) @native()
- val closeTimeout: FiniteDuration
- val commitRefreshInterval: Duration
- val commitTimeWarning: FiniteDuration
- val commitTimeout: FiniteDuration
- val connectionCheckerSettings: ConnectionCheckerSettings
- val consumerFactory: (ConsumerSettings[K, V]) => Consumer[K, V]
- val consumerGroupUpdateInterval: FiniteDuration
- def createKafkaConsumer(): Consumer[K, V]
Create a Kafka Consumer instance from these settings.
Create a Kafka Consumer instance from these settings.
This will fail with
IllegalStateException
if asynchronous enrichment is set up -- always prefer createKafkaConsumerAsync() or createKafkaConsumerCompletionStage().- Exceptions thrown
IllegalStateException
if asynchronous enrichment is set viawithEnrichAsync
orwithEnrichCompletionStage
, you must usecreateKafkaConsumerAsync
orcreateKafkaConsumerCompletionStage
to apply it
- def createKafkaConsumerAsync()(implicit executionContext: ExecutionContext): Future[Consumer[K, V]]
Scala API.
Scala API.
Create a Kafka Consumer instance from these settings (without blocking for
enriched
). - def createKafkaConsumerCompletionStage(executor: Executor): CompletionStage[Consumer[K, V]]
Java API.
Java API.
Create a Kafka Consumer instance from these settings (without blocking for
enriched
). - val dispatcher: String
- val drainingCheckInterval: FiniteDuration
- val enrichAsync: Option[(ConsumerSettings[K, V]) => Future[ConsumerSettings[K, V]]]
- def enriched: Future[ConsumerSettings[K, V]]
Applies
enrichAsync
to complement these settings from asynchronous sources. - final def eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def equals(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef → Any
- final def getClass(): Class[_ <: AnyRef]
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- def getCloseTimeout: Duration
- def getMetadataRequestTimeout: Duration
- def getOffsetForTimesTimeout: Duration
- def getPositionTimeout: Duration
- def getProperties: Map[String, AnyRef]
Get the Kafka consumer settings as map.
- def getProperty(key: String): String
Java API: Get a raw property.
Java API: Get a raw property.
null
if it is not defined. - def hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- final def isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- val keyDeserializerOpt: Option[Deserializer[K]]
- val metadataRequestTimeout: FiniteDuration
- final def ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- final def notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- final def notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- val offsetForTimesTimeout: FiniteDuration
- val partitionHandlerWarning: FiniteDuration
- val pollInterval: FiniteDuration
- val pollTimeout: FiniteDuration
- val positionTimeout: FiniteDuration
- val properties: Map[String, String]
- val resetProtectionSettings: OffsetResetProtectionSettings
- val stopTimeout: FiniteDuration
- final def synchronized[T0](arg0: => T0): T0
- Definition Classes
- AnyRef
- def toString(): String
- Definition Classes
- ConsumerSettings → AnyRef → Any
- val valueDeserializerOpt: Option[Deserializer[V]]
- final def wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException]) @native()
- val waitClosePartition: FiniteDuration
- def withBootstrapServers(bootstrapServers: String): ConsumerSettings[K, V]
A comma-separated list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
- def withClientId(clientId: String): ConsumerSettings[K, V]
An id string to pass to the server when making requests.
An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.
- def withCloseTimeout(closeTimeout: Duration): ConsumerSettings[K, V]
Java API: Set duration to wait for
KafkaConsumer.close
to finish. - def withCloseTimeout(closeTimeout: FiniteDuration): ConsumerSettings[K, V]
Set duration to wait for
KafkaConsumer.close
to finish. - def withCommitRefreshInterval(commitRefreshInterval: Duration): ConsumerSettings[K, V]
Java API: If set to a finite duration, the consumer will re-send the last committed offsets periodically for all assigned partitions.
Java API: If set to a finite duration, the consumer will re-send the last committed offsets periodically for all assigned partitions. @see https://issues.apache.org/jira/browse/KAFKA-4682 Set to
java.time.Duration.ZERO
to switch it off.- See also
https://issues.apache.org/jira/browse/KAFKA-4682
- def withCommitRefreshInterval(commitRefreshInterval: Duration): ConsumerSettings[K, V]
If set to a finite duration, the consumer will re-send the last committed offsets periodically for all assigned partitions.
If set to a finite duration, the consumer will re-send the last committed offsets periodically for all assigned partitions.
- See also
https://issues.apache.org/jira/browse/KAFKA-4682
- def withCommitTimeout(commitTimeout: Duration): ConsumerSettings[K, V]
Java API: If offset commit requests are not completed within this timeout the returned Future is completed with akka.kafka.CommitTimeoutException.
- def withCommitTimeout(commitTimeout: FiniteDuration): ConsumerSettings[K, V]
If offset commit requests are not completed within this timeout the returned Future is completed with akka.kafka.CommitTimeoutException.
- def withCommitWarning(commitTimeWarning: Duration): ConsumerSettings[K, V]
Java API: If commits take longer than this time a warning is logged
- def withCommitWarning(commitTimeWarning: FiniteDuration): ConsumerSettings[K, V]
If commits take longer than this time a warning is logged
- def withConnectionChecker(kafkaConnectionCheckerConfig: ConnectionCheckerSettings): ConsumerSettings[K, V]
Enable kafka connection checker with provided settings
- def withConsumerFactory(factory: (ConsumerSettings[K, V]) => Consumer[K, V]): ConsumerSettings[K, V]
Replaces the default Kafka consumer creation logic.
- def withConsumerGroupUpdateInterval(interval: Duration): ConsumerSettings[K, V]
Java API: For transactional flows only, how often to push consumer group metadata to the producers a shorter interval makes the risk of dropping batched elements smaller but at the cost of more work sending those updates.
- def withConsumerGroupUpdateInterval(interval: FiniteDuration): ConsumerSettings[K, V]
Scala API: For transactional flows only, how often to push consumer group metadata to the producers a shorter interval makes the risk of dropping batched elements smaller but at the cost of more work sending those updates.
- def withDispatcher(dispatcher: String): ConsumerSettings[K, V]
Fully qualified config path which holds the dispatcher configuration to be used by the akka.kafka.KafkaConsumerActor.
Fully qualified config path which holds the dispatcher configuration to be used by the akka.kafka.KafkaConsumerActor. Some blocking may occur.
- def withDrainingCheckInterval(drainingCheckInterval: Duration): ConsumerSettings[K, V]
Java API: Check interval for TransactionalProducer when finishing transaction before shutting down consumer
- def withDrainingCheckInterval(drainingCheckInterval: FiniteDuration): ConsumerSettings[K, V]
Scala API: Check interval for TransactionalProducer when finishing transaction before shutting down consumer
- def withEnrichAsync(value: (ConsumerSettings[K, V]) => Future[ConsumerSettings[K, V]]): ConsumerSettings[K, V]
Scala API.
Scala API. A hook to allow for resolving some settings asynchronously.
- Since
2.0.0
- def withEnrichCompletionStage(value: Function[ConsumerSettings[K, V], CompletionStage[ConsumerSettings[K, V]]]): ConsumerSettings[K, V]
Java API.
Java API. A hook to allow for resolving some settings asynchronously.
- Since
2.0.0
- def withGroupId(groupId: String): ConsumerSettings[K, V]
A unique string that identifies the consumer group this consumer belongs to.
- def withGroupInstanceId(groupInstanceId: String): ConsumerSettings[K, V]
An id string that marks consumer as a unique static member of the consumer group.
- def withMetadataRequestTimeout(metadataRequestTimeout: Duration): ConsumerSettings[K, V]
Java API
- def withMetadataRequestTimeout(metadataRequestTimeout: FiniteDuration): ConsumerSettings[K, V]
Scala API
- def withOffsetForTimesTimeout(offsetForTimesTimeout: Duration): ConsumerSettings[K, V]
Java API: Limits the blocking on Kafka consumer offsetForTimes calls.
- def withOffsetForTimesTimeout(offsetForTimesTimeout: FiniteDuration): ConsumerSettings[K, V]
Scala API: Limits the blocking on Kafka consumer offsetForTimes calls.
- def withPartitionAssignmentStrategies(strategies: Array[String]): ConsumerSettings[K, V]
A list of class names or class types, ordered by preference, of supported partition assignment strategies that the client will use to distribute partition ownership amongst consumer instances when group management is used.
A list of class names or class types, ordered by preference, of supported partition assignment strategies that the client will use to distribute partition ownership amongst consumer instances when group management is used.
See https://kafka.apache.org/documentation/#consumerconfigs_partition.assignment.strategy
- def withPartitionAssignmentStrategyCooperativeStickyAssignor(): ConsumerSettings[K, V]
Sets the
CooperativeStickyAssignor
assignment strategy.Sets the
CooperativeStickyAssignor
assignment strategy.- See also
https://kafka.apache.org/documentation/#consumerconfigs_partition.assignment.strategy
https://kafka.apache.org/33/documentation.html#upgrade_300_notable
- def withPartitionHandlerWarning(partitionHandlerWarning: Duration): ConsumerSettings[K, V]
Java API
- def withPartitionHandlerWarning(partitionHandlerWarning: FiniteDuration): ConsumerSettings[K, V]
Scala API
- def withPollInterval(pollInterval: Duration): ConsumerSettings[K, V]
Java API: Set the interval from one scheduled poll to the next.
- def withPollInterval(pollInterval: FiniteDuration): ConsumerSettings[K, V]
Set the interval from one scheduled poll to the next.
- def withPollTimeout(pollTimeout: Duration): ConsumerSettings[K, V]
Java API: Set the maximum duration a poll to the Kafka broker is allowed to take.
- def withPollTimeout(pollTimeout: FiniteDuration): ConsumerSettings[K, V]
Set the maximum duration a poll to the Kafka broker is allowed to take.
- def withPositionTimeout(positionTimeout: Duration): ConsumerSettings[K, V]
Java API: Limits the blocking on Kafka consumer position calls.
- def withPositionTimeout(positionTimeout: FiniteDuration): ConsumerSettings[K, V]
Scala API: Limits the blocking on Kafka consumer position calls.
- def withProperties(properties: Map[String, String]): ConsumerSettings[K, V]
Java API: The raw properties of the kafka-clients driver, see constants in org.apache.kafka.clients.consumer.ConsumerConfig.
- def withProperties(properties: (String, String)*): ConsumerSettings[K, V]
Scala API: The raw properties of the kafka-clients driver, see constants in org.apache.kafka.clients.consumer.ConsumerConfig.
- def withProperties(properties: Map[String, String]): ConsumerSettings[K, V]
Scala API: The raw properties of the kafka-clients driver, see constants in org.apache.kafka.clients.consumer.ConsumerConfig.
- def withProperty(key: String, value: String): ConsumerSettings[K, V]
The raw properties of the kafka-clients driver, see constants in org.apache.kafka.clients.consumer.ConsumerConfig.
- def withResetProtectionSettings(resetProtection: OffsetResetProtectionSettings): ConsumerSettings[K, V]
Set the protection for unintentional offset reset.
- def withStopTimeout(stopTimeout: Duration): ConsumerSettings[K, V]
Java API: The stage will await outstanding offset commit requests before shutting down, but if that takes longer than this timeout it will stop forcefully.
- def withStopTimeout(stopTimeout: FiniteDuration): ConsumerSettings[K, V]
The stage will await outstanding offset commit requests before shutting down, but if that takes longer than this timeout it will stop forcefully.
- def withWaitClosePartition(waitClosePartition: Duration): ConsumerSettings[K, V]
Java API: Time to wait for pending requests when a partition is closed.
- def withWaitClosePartition(waitClosePartition: FiniteDuration): ConsumerSettings[K, V]
Time to wait for pending requests when a partition is closed.