Producer
A producer publishes messages to Kafka topics. The message itself contains information about what topic and partition to publish to so you can publish to different topics with the same producer.
The underlying implementation is using the KafkaProducer
, see the Kafka API for details.
Choosing a producer
Alpakka Kafka offers producer flows and sinks that connect to Kafka and write data. The tables below may help you to find the producer best suited for your use-case.
Producers
These factory methods are part of the Producer APIProducer API.
Shared producer | Factory method | Stream element type | Pass-through |
---|---|---|---|
Available | plainSink |
ProducerRecord |
N/A |
Available | flexiFlow |
Envelope |
Any |
Available | flowWithContext |
Envelope |
No |
Transactional producers
These factory methods are part of the Transactional APITransactional API. For details see Transactions.
Shared producer | Factory method | Stream element type | Pass-through |
---|---|---|---|
No | sink |
Envelope |
N/A |
No | flow |
Envelope |
No |
No | sinkWithOffsetContext |
Envelope |
N/A |
No | flowWithOffsetContext |
Envelope |
No |
Settings
When creating a producer stream you need to pass in ProducerSettings
(API) that define things like:
- bootstrap servers of the Kafka cluster
- serializers for the keys and values
- tuning parameters
- Scala
-
val config = system.settings.config.getConfig("akka.kafka.producer") val producerSettings = ProducerSettings(config, new StringSerializer, new StringSerializer) .withBootstrapServers(bootstrapServers)
- Java
-
final Config config = system.settings().config().getConfig("akka.kafka.producer"); final ProducerSettings<String, String> producerSettings = ProducerSettings.create(config, new StringSerializer(), new StringSerializer()) .withBootstrapServers("localhost:9092");
In addition to programmatic construction of the ProducerSettings
(API) it can also be created from configuration (application.conf
).
When creating ProducerSettings
with the ActorSystem
(API) settings it uses the config section akka.kafka.producer
. The format of these settings files are described in the Typesafe Config Documentation.
# Properties for akka.kafka.ProducerSettings can be
# defined in this section or a configuration section with
# the same layout.
akka.kafka.producer {
# Tuning parameter of how many sends that can run in parallel.
parallelism = 100
# Duration to wait for `KafkaProducer.close` to finish.
close-timeout = 60s
# Fully qualified config path which holds the dispatcher configuration
# to be used by the producer stages. Some blocking may occur.
# When this value is empty, the dispatcher configured for the stream
# will be used.
use-dispatcher = "akka.kafka.default-dispatcher"
# The time interval to commit a transaction when using the `Transactional.sink` or `Transactional.flow`
# for exactly-once-semantics processing.
eos-commit-interval = 100ms
# Properties defined by org.apache.kafka.clients.producer.ProducerConfig
# can be defined in this configuration section.
kafka-clients {
}
}
ProducerSettings
(API) can also be created from any other Config
section with the same layout as above.
See KafkaProducer API and ProducerConfig API for more details regarding settings.
Producer as a Sink
Producer.plainSink
(Producer APIProducer API) is the easiest way to publish messages. The sink consumes the Kafka type ProducerRecord
(Kafka API) which contains
- a topic name to which the record is being sent,
- an optional partition number,
- an optional key, and
- a value.
- Scala
-
The materialized value of the sink is aval done: Future[Done] = Source(1 to 100) .map(_.toString) .map(value => new ProducerRecord[String, String](topic, value)) .runWith(Producer.plainSink(producerSettings))
Future[Done]
which is completed withDone
when the stream completes, or with with an exception in case an error occurs. - Java
-
The materialized value of the sink is aCompletionStage<Done> done = Source.range(1, 100) .map(number -> number.toString()) .map(value -> new ProducerRecord<String, String>(topic, value)) .runWith(Producer.plainSink(producerSettings), materializer);
CompletionStage<Done>
which is completed withDone
when the stream completes, or with an exception in case an error occurs.
Producing messages
Sinks and flows accept implementations of ProducerMessage.Envelope
(API) as input. They contain an extra field to pass through data, the so called passThrough
. Its value is passed through the flow and becomes available in the ProducerMessage.Results
’s passThrough()
. It can for example hold a ConsumerMessage.CommittableOffset
or ConsumerMessage.CommittableOffsetBatch
(from a Consumer.committableSource
) that can be committed after publishing to Kafka.
Produce a single message to Kafka
To create one message to a Kafka topic, use the ProducerMessage.Message
type as in
- Scala
-
val single: ProducerMessage.Envelope[KeyType, ValueType, PassThroughType] = ProducerMessage.single( new ProducerRecord("topicName", key, value), passThrough )
- Java
-
ProducerMessage.Envelope<KeyType, ValueType, PassThroughType> message = ProducerMessage.single(new ProducerRecord<>("topicName", key, value), passThrough);
For flows the ProducerMessage.Message
s continue as ProducerMessage.Result
elements containing:
- the original input message,
- the record metadata (Kafka RecordMetadata API), and
- access to the
passThrough
within the message.
Let one stream element produce multiple messages to Kafka
The ProducerMessage.MultiMessage
contains a list of ProducerRecords
to produce multiple messages to Kafka topics.
- Scala
-
val multi: ProducerMessage.Envelope[KeyType, ValueType, PassThroughType] = ProducerMessage.multi( immutable.Seq( new ProducerRecord("topicName", key, value), new ProducerRecord("anotherTopic", key, value) ), passThrough )
- Java
-
ProducerMessage.Envelope<KeyType, ValueType, PassThroughType> multiMessage = ProducerMessage.multi( Arrays.asList( new ProducerRecord<>("topicName", key, value), new ProducerRecord<>("anotherTopic", key, value)), passThrough);
For flows the ProducerMessage.MultiMessage
s continue as ProducerMessage.MultiResult
elements containing:
- a list of
MultiResultPart
with- the original input message,
- the record metadata (Kafka RecordMetadata API), and
- the
passThrough
data.
Let a stream element pass through, without producing a message to Kafka
The ProducerMessage.PassThroughMessage
allows to let an element pass through a Kafka flow without producing a new message to a Kafka topic. This is primarily useful with Kafka commit offsets and transactions, so that these can be committed without producing new messages.
- Scala
-
val ptm: ProducerMessage.Envelope[KeyType, ValueType, PassThroughType] = ProducerMessage.passThrough( passThrough )
- Java
-
ProducerMessage.Envelope<KeyType, ValueType, PassThroughType> ptm = ProducerMessage.passThrough(passThrough);
For flows the ProducerMessage.PassThroughMessage
s continue as ProducerMessage.PassThroughResult
elements containing the passThrough
data.
Producer as a Flow
Producer.flexiFlow
allows the stream to continue after publishing messages to Kafka. It accepts implementations of ProducerMessage.Envelope
(API) as input, which continue in the flow as implementations of ProducerMessage.Results
(API).
- Scala
-
val done = Source(1 to 100) .map { number => val partition = 0 val value = number.toString ProducerMessage.single( new ProducerRecord(topic, partition, "key", value), number ) } .via(Producer.flexiFlow(producerSettings)) .map { case ProducerMessage.Result(metadata, ProducerMessage.Message(record, passThrough)) => s"${metadata.topic}/${metadata.partition} ${metadata.offset}: ${record.value}" case ProducerMessage.MultiResult(parts, passThrough) => parts .map { case MultiResultPart(metadata, record) => s"${metadata.topic}/${metadata.partition} ${metadata.offset}: ${record.value}" } .mkString(", ") case ProducerMessage.PassThroughResult(passThrough) => s"passed through" } .runWith(Sink.foreach(println(_)))
- Java
-
CompletionStage<Done> done = Source.range(1, 100) .map( number -> { int partition = 0; String value = String.valueOf(number); ProducerMessage.Envelope<String, String, Integer> msg = ProducerMessage.single( new ProducerRecord<>(topic, partition, "key", value), number); return msg; }) .via(Producer.flexiFlow(producerSettings)) .map( result -> { if (result instanceof ProducerMessage.Result) { ProducerMessage.Result<String, String, Integer> res = (ProducerMessage.Result<String, String, Integer>) result; ProducerRecord<String, String> record = res.message().record(); RecordMetadata meta = res.metadata(); return meta.topic() + "/" + meta.partition() + " " + res.offset() + ": " + record.value(); } else if (result instanceof ProducerMessage.MultiResult) { ProducerMessage.MultiResult<String, String, Integer> res = (ProducerMessage.MultiResult<String, String, Integer>) result; return res.getParts().stream() .map( part -> { RecordMetadata meta = part.metadata(); return meta.topic() + "/" + meta.partition() + " " + part.metadata().offset() + ": " + part.record().value(); }) .reduce((acc, s) -> acc + ", " + s); } else { return "passed through"; } }) .runWith(Sink.foreach(System.out::println), materializer);
The passThrough
can for example hold a ConsumerMessage.CommittableOffset
or ConsumerMessage.CommittableOffsetBatch
that can be committed after publishing to Kafka.
- Scala
-
val control = Consumer .committableSource(consumerSettings, Subscriptions.topics(topic)) .map { msg => ProducerMessage.single( new ProducerRecord(targetTopic, msg.record.key, msg.record.value), passThrough = msg.committableOffset ) } .via(Producer.flexiFlow(producerSettings)) .map(_.passThrough) .toMat(Committer.sink(committerSettings))(Keep.both) .mapMaterializedValue(DrainingControl.apply) .run()
- Java
-
Consumer.DrainingControl<Done> control = Consumer.committableSource(consumerSettings, Subscriptions.topics(topic)) .map( msg -> ProducerMessage.single( new ProducerRecord<>(targetTopic, msg.record().key(), msg.record().value()), msg.committableOffset() // the passThrough )) .via(Producer.flexiFlow(producerSettings)) .map(m -> m.passThrough()) .toMat(Committer.sink(committerSettings), Keep.both()) .mapMaterializedValue(Consumer::createDrainingControl) .run(materializer);
Connecting a Producer to a Consumer
See the Consumer page.
Sharing the KafkaProducer instance
The underlying KafkaProducer
(Kafka API) is thread safe and sharing a single producer instance across streams will generally be faster than having multiple instances.
To create a KafkaProducer
from the Kafka connector settings described above, the ProducerSettings
contain a factory method createKafkaProducer
.
- Scala
-
val config = system.settings.config.getConfig("akka.kafka.producer") val producerSettings = ProducerSettings(config, new StringSerializer, new StringSerializer) .withBootstrapServers(bootstrapServers) val kafkaProducer = producerSettings.createKafkaProducer() // using the kafkaProducer kafkaProducer.close()
- Java
-
final Config config = system.settings().config().getConfig("akka.kafka.producer"); final ProducerSettings<String, String> producerSettings = ProducerSettings.create(config, new StringSerializer(), new StringSerializer()) .withBootstrapServers("localhost:9092"); final org.apache.kafka.clients.producer.Producer<String, String> kafkaProducer = producerSettings.createKafkaProducer();
The KafkaProducer
instance is passed as a parameter to the Producer
factory methods.
- Scala
-
val done = Source(1 to 100) .map(_.toString) .map(value => new ProducerRecord[String, String](topic, value)) .runWith(Producer.plainSink(producerSettings, kafkaProducer))
- Java
-
CompletionStage<Done> done = Source.range(1, 100) .map(number -> number.toString()) .map(value -> new ProducerRecord<String, String>(topic, value)) .runWith(Producer.plainSink(producerSettings, kafkaProducer), materializer);
Accessing KafkaProducer metrics
By passing an explicit reference to a KafkaProducer
(as shown in the previous section) its metrics become accessible. Refer to the Kafka MetricName API for more details.