Producer

A producer publishes messages to Kafka topics. The message itself contains information about what topic and partition to publish to so you can publish to different topics with the same producer.

The underlying implementation is using the KafkaProducer, see the Kafka API for details.

Settings

When creating a producer stream you need to pass in ProducerSettings (API) that define things like:

  • bootstrap servers of the Kafka cluster
  • serializers for the keys and values
  • tuning parameters
Scala
val config = system.settings.config.getConfig("akka.kafka.producer")
val producerSettings =
  ProducerSettings(config, new StringSerializer, new StringSerializer)
    .withBootstrapServers("localhost:9092")
Full source at GitHub
Java
final Config config = system.settings().config().getConfig("akka.kafka.producer");
final ProducerSettings<String, String> producerSettings =
    ProducerSettings
        .create(config, new StringSerializer(), new StringSerializer())
        .withBootstrapServers("localhost:9092");
Full source at GitHub

In addition to programmatic construction of the ProducerSettings (API) it can also be created from configuration (application.conf).

When creating ProducerSettings with the ActorSystem (API) settings it uses the config section akka.kafka.producer. The format of these settings files are described in the Typesafe Config Documentation.

# Properties for akka.kafka.ProducerSettings can be
# defined in this section or a configuration section with
# the same layout. 
akka.kafka.producer {
  # Tuning parameter of how many sends that can run in parallel.
  parallelism = 100
  
  # How long to wait for `KafkaProducer.close`
  close-timeout = 60s
  
  # Fully qualified config path which holds the dispatcher configuration
  # to be used by the producer stages. Some blocking may occur.
  # When this value is empty, the dispatcher configured for the stream
  # will be used.
  use-dispatcher = "akka.kafka.default-dispatcher"

  # The time interval to commit a transaction when using the `Transactional.sink` or `Transactional.flow`
  eos-commit-interval = 100ms

  # Properties defined by org.apache.kafka.clients.producer.ProducerConfig
  # can be defined in this configuration section.
  kafka-clients {
  }
}
Full source at GitHub

ProducerSettings (API) can also be created from any other Config section with the same layout as above.

See KafkaProducer API and ProducerConfig API for more details regarding settings.

Producer as a Sink

Producer.plainSink (Producer APIProducer API) is the easiest way to publish messages. The sink consumes the Kafka type ProducerRecord (Kafka API) which contains

  1. a topic name to which the record is being sent,
  2. an optional partition number,
  3. an optional key, and
  4. a value.
Scala
val done: Future[Done] =
  Source(1 to 100)
    .map(_.toString)
    .map(value => new ProducerRecord[String, String]("topic1", value))
    .runWith(Producer.plainSink(producerSettings))
Full source at GitHub The materialized value of the sink is a Future[Done] which is completed with Done when the stream completes, or with with an exception in case an error occurs.
Java
CompletionStage<Done> done =
  Source.range(1, 100)
    .map(number -> number.toString())
    .map(value -> new ProducerRecord<String, String>("topic1", value))
    .runWith(Producer.plainSink(producerSettings), materializer);
Full source at GitHub The materialized value of the sink is a CompletionStage<Done> which is completed with Done when the stream completes, or with an exception in case an error occurs.

Producing messages

Sinks and flows accept implementations of ProducerMessage.Envelope (API) as input. They contain an extra field to pass through data, the so called passThrough. Its value is passed through the flow and becomes available in the ProducerMessage.Results’s passThrough(). It can for example hold a ConsumerMessage.CommittableOffset or ConsumerMessage.CommittableOffsetBatch (from a Consumer.committableSource) that can be committed after publishing to Kafka.

Produce a single message to Kafka

To create one message to a Kafka topic, use the ProducerMessage.Message type as in

Scala
new ProducerMessage.Message[KeyType, ValueType, PassThroughType](
  new ProducerRecord("topicName", key, value),
  passThrough
)
Full source at GitHub
Java
new ProducerMessage.Message<KeyType, ValueType, PassThroughType>(
        new ProducerRecord<>("topicName", key, value),
        passThrough
);
Full source at GitHub

For flows the ProducerMessage.Messages continue as ProducerMessage.Result elements containing:

  1. the original input message,
  2. the record metadata (Kafka RecordMetadata API), and
  3. access to the passThrough within the message.

Let one stream element produce multiple messages to Kafka

The ProducerMessage.MultiMessage contains a list of ProducerRecords to produce multiple messages to Kafka topics.

Scala
new ProducerMessage.MultiMessage[KeyType, ValueType, PassThroughType](
  immutable.Seq(
    new ProducerRecord("topicName", key, value),
    new ProducerRecord("anotherTopic", key, value)
  ),
  passThrough
)
Full source at GitHub
Java
new ProducerMessage.MultiMessage<KeyType, ValueType, PassThroughType>(
        Arrays.asList(
                new ProducerRecord<>("topicName", key, value),
                new ProducerRecord<>("anotherTopic", key, value)
        ),
        passThrough
);
Full source at GitHub

For flows the ProducerMessage.MultiMessages continue as ProducerMessage.MultiResult elements containing:

  1. a list of MultiResultPart with
    1. the original input message,
    2. the record metadata (Kafka RecordMetadata API), and
  2. the passThrough data.

Let a stream element pass through, without producing a message to Kafka

The ProducerMessage.PassThroughMessage allows to let an element pass through a Kafka flow without producing a new message to a Kafka topic. This is primarily useful with Kafka commit offsets and transactions, so that these can be committed without producing new messages.

Scala
new ProducerMessage.PassThroughMessage(
  passThrough
)
Full source at GitHub
Java
new ProducerMessage.PassThroughMessage<>(
        passThrough
);
Full source at GitHub

For flows the ProducerMessage.PassThroughMessages continue as ProducerMessage.PassThroughResult elements containing the passThrough data.

Producer as a Flow

Producer.flexiFlow allows the stream to continue after publishing messages to Kafka. It accepts implementations of ProducerMessage.Envelope (API) as input, which continue in the flow as implementations of ProducerMessage.Results (API).

Scala
val done = Source(1 to 100)
  .map { number =>
    val partition = 0
    val value = number.toString
    ProducerMessage.Message(
      new ProducerRecord("topic1", partition, "key", value),
      number
    )
  }
  .via(Producer.flexiFlow(producerSettings))
  .map {
    case ProducerMessage.Result(metadata, message) =>
      val record = message.record
      s"${metadata.topic}/${metadata.partition} ${metadata.offset}: ${record.value}"

    case ProducerMessage.MultiResult(parts, passThrough) =>
      parts
        .map {
          case MultiResultPart(metadata, record) =>
            s"${metadata.topic}/${metadata.partition} ${metadata.offset}: ${record.value}"
        }
        .mkString(", ")

    case ProducerMessage.PassThroughResult(passThrough) =>
      s"passed through"
  }
  .runWith(Sink.foreach(println(_)))
Full source at GitHub
Java
CompletionStage<Done> done =
        Source.range(1, 100)
                .map(number -> {
                    int partition = 0;
                    String value = String.valueOf(number);
                    ProducerMessage.Envelope<String, String, Integer> msg =
                            new ProducerMessage.Message<String, String, Integer>(
                                    new ProducerRecord<>("topic1", partition, "key", value),
                                    number
                            );
                    return msg;
                })

                .via(Producer.flexiFlow(producerSettings))

                .map(result -> {
                    if (result instanceof ProducerMessage.Result) {
                        ProducerMessage.Result<String, String, Integer> res = (ProducerMessage.Result<String, String, Integer>) result;
                        ProducerRecord<String, String> record = res.message().record();
                        RecordMetadata meta = res.metadata();
                        return meta.topic() + "/" + meta.partition() + " " + res.offset() + ": " + record.value();
                    } else if (result instanceof ProducerMessage.MultiResult) {
                        ProducerMessage.MultiResult<String, String, Integer> res = (ProducerMessage.MultiResult<String, String, Integer>) result;
                        return res.getParts() .stream().map( part -> {
                            RecordMetadata meta = part.metadata();
                            return meta.topic() + "/" + meta.partition() + " " + part.metadata().offset() + ": " + part.record().value();
                        }).reduce((acc, s) -> acc + ", " + s);
                    } else {
                        return "passed through";
                    }
                })
                .runWith(Sink.foreach(System.out::println), materializer);
Full source at GitHub

The passThrough can for example hold a ConsumerMessage.CommittableOffset or ConsumerMessage.CommittableOffsetBatch that can be committed after publishing to Kafka.

Scala
val control = Consumer
  .committableSource(consumerSettings, Subscriptions.topics("topic1"))
  .map { msg =>
    ProducerMessage.Message[String, Array[Byte], ConsumerMessage.CommittableOffset](
      new ProducerRecord("topic2", msg.record.value),
      passThrough = msg.committableOffset
    )
  }
  .via(Producer.flexiFlow(producerSettings))
  .mapAsync(producerSettings.parallelism) { result =>
    val committable = result.passThrough
    committable.commitScaladsl()
  }
  .toMat(Sink.ignore)(Keep.both)
  .mapMaterializedValue(DrainingControl.apply)
  .run()
Full source at GitHub
Java
Consumer.DrainingControl<Done> control =
    Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
        .map(msg -> {
          ProducerMessage.Envelope<String, byte[], ConsumerMessage.Committable> prodMsg =
              new ProducerMessage.Message<>(
                  new ProducerRecord<>("topic2", msg.record().value()),
                  msg.committableOffset() // the passThrough
              );
          return prodMsg;
        })

        .via(Producer.flexiFlow(producerSettings))

        .mapAsync(producerSettings.parallelism(), result -> {
            ConsumerMessage.Committable committable = result.passThrough();
            return committable.commitJavadsl();
        })
        .toMat(Sink.ignore(), Keep.both())
        .mapMaterializedValue(Consumer::createDrainingControl)
        .run(materializer);
Full source at GitHub

Connecting a Producer to a Consumer

See the Consumer page.

Sharing the KafkaProducer instance

The underlying KafkaProducer (Kafka API) is thread safe and sharing a single producer instance across streams will generally be faster than having multiple instances.

To create a KafkaProducer from the Kafka connector settings described above, the ProducerSettings contain a factory method createKafkaProducer.

Scala
val config = system.settings.config.getConfig("akka.kafka.producer")
val producerSettings =
  ProducerSettings(config, new StringSerializer, new StringSerializer)
    .withBootstrapServers("localhost:9092")
val kafkaProducer = producerSettings.createKafkaProducer()
Full source at GitHub
Java
final Config config = system.settings().config().getConfig("akka.kafka.producer");
final ProducerSettings<String, String> producerSettings =
    ProducerSettings
        .create(config, new StringSerializer(), new StringSerializer())
        .withBootstrapServers("localhost:9092");
final KafkaProducer<String, String> kafkaProducer =
    producerSettings.createKafkaProducer();
Full source at GitHub

The KafkaProducer instance is passed as a parameter to the Producer factory methods.

Scala
val done = Source(1 to 100)
  .map(_.toString)
  .map(value => new ProducerRecord[String, String]("topic1", value))
  .runWith(Producer.plainSink(producerSettings, kafkaProducer))
Full source at GitHub
Java
CompletionStage<Done> done =
    Source.range(1, 100)
        .map(number -> number.toString())
        .map(value -> new ProducerRecord<String, String>("topic1", value))
        .runWith(Producer.plainSink(producerSettings, kafkaProducer), materializer);
Full source at GitHub

Accessing KafkaProducer metrics

By passing an explicit reference to a KafkaProducer (as shown in the previous section) its metrics become accessible. Refer to the Kafka MetricName API for more details.

Scala
val metrics: util.Map[org.apache.kafka.common.MetricName, _ <: org.apache.kafka.common.Metric] =
  kafkaProducer.metrics() // observe metrics
Full source at GitHub
Java
Map<org.apache.kafka.common.MetricName, ? extends org.apache.kafka.common.Metric> metrics =
        kafkaProducer.metrics();// observe metrics
Full source at GitHub
The source code for this page can be found here.