Producer

A producer publishes messages to Kafka topics. The message itself contains information about what topic and partition to publish to so you can publish to different topics with the same producer.

The underlaying implementation is using the KafkaProducer, see Javadoc for details.

Settings

When creating a consumer stream you need to pass in ProducerSettings that define things like:

  • bootstrap servers of the Kafka cluster
  • serializers for the keys and values
  • tuning parameters
Scala
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
  .withBootstrapServers("localhost:9092")
Java
protected final ProducerSettings<byte[], String> producerSettings = ProducerSettings
  .create(system, new ByteArraySerializer(), new StringSerializer())
  .withBootstrapServers("localhost:9092");

In addition to programmatic construction of the ProducerSettings it can also be created from configuration (application.conf). By default when creating ProducerSettings with the ActorSystem parameter it uses the config section akka.kafka.producer.

# Properties for akka.kafka.ProducerSettings can be
# defined in this section or a configuration section with
# the same layout. 
akka.kafka.producer {
  # Tuning parameter of how many sends that can run in parallel.
  parallelism = 100
  
  # How long to wait for `KafkaProducer.close`
  close-timeout = 60s
  
  # Fully qualified config path which holds the dispatcher configuration
  # to be used by the producer stages. Some blocking may occur.
  # When this value is empty, the dispatcher configured for the stream
  # will be used.
  use-dispatcher = "akka.kafka.default-dispatcher"

  # Properties defined by org.apache.kafka.clients.producer.ProducerConfig
  # can be defined in this configuration section.
  kafka-clients {
  }
}

ProducerSettings can also be created from any other Config section with the same layout as above.

See KafkaProducer Javadoc and ProducerConfig Javadoc for details.

Producer as a Sink

Producer.plainSink is the easiest way to publish messages. The sink consumes ProducerRecord elements which contains a topic name to which the record is being sent, an optional partition number, and an optional key and value.

Scala
val done = Source(1 to 100)
  .map(_.toString)
  .map { elem =>
    new ProducerRecord[Array[Byte], String]("topic1", elem)
  }
  .runWith(Producer.plainSink(producerSettings))
The materialized value of the sink is a Future[Done] which is completed with Done when the stream completes or with exception if an error occurs.
Java
CompletionStage<Done> done =
  Source.range(1, 100)
    .map(n -> n.toString()).map(elem -> new ProducerRecord<byte[], String>("topic1", elem))
    .runWith(Producer.plainSink(producerSettings), materializer);
The materialized value of the sink is a CompletionStage[Done] which is completed with Done when the stream completes or with exception if an error occurs.

There is another variant of a producer sink named Producer.commitableSink that is useful when connecting a consumer to a producer and let the sink commit the offset back to the consumer when it has successfully published the message.

Scala
Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
  .map { msg =>
    println(s"topic1 -> topic2: $msg")
    ProducerMessage.Message(new ProducerRecord[Array[Byte], String](
      "topic2",
      msg.record.value
    ), msg.committableOffset)
  }
  .runWith(Producer.commitableSink(producerSettings))
Java
Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
  .map(msg ->
    new ProducerMessage.Message<byte[], String, ConsumerMessage.Committable>(
        new ProducerRecord<>("topic2", msg.record().value()), msg.committableOffset()))
  .runWith(Producer.commitableSink(producerSettings), materializer);

Note that there is a risk that something fails after publishing but before committing, so commitableSink has “at-least once delivery” semantics.

Producer as a Flow

Sometimes there is a need for publishing messages in the middle of the stream processing, not as the last step, and then you can use Producer.flow

Scala
val done = Source(1 to 100)
  .map { n =>
    // val partition = math.abs(n) % 2
    val partition = 0
    ProducerMessage.Message(new ProducerRecord[Array[Byte], String](
      "topic1", partition, null, n.toString
    ), n)
  }
  .via(Producer.flow(producerSettings))
  .map { result =>
    val record = result.message.record
    println(s"${record.topic}/${record.partition} ${result.offset}: ${record.value}" +
      s"(${result.message.passThrough})")
    result
  }
  .runWith(Sink.ignore)
Java
CompletionStage<Done> done =
  Source.range(1, 100)
    .map(n -> {
      //int partition = Math.abs(n) % 2;
      int partition = 0;
      String elem = String.valueOf(n);
      return new ProducerMessage.Message<byte[], String, Integer>(
        new ProducerRecord<>("topic1", partition, null, elem), n);
    })
    .via(Producer.flow(producerSettings))
    .map(result -> {
      ProducerRecord<byte[], String> record = result.message().record();
      System.out.println(record);
      return result;
    })
    .runWith(Sink.ignore(), materializer);

It is possible to pass through a message, which can for example be a ConsumerMessage.CommittableOffset or ConsumerMessage.CommittableOffsetBatch that can be committed later in the flow. Here is an example illustrating that:

Scala
val done =
  Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
    .map { msg =>
      println(s"topic1 -> topic2: $msg")
      ProducerMessage.Message(new ProducerRecord[Array[Byte], String](
        "topic2",
        msg.record.value
      ), msg.committableOffset)
    }
    .via(Producer.flow(producerSettings))
    .mapAsync(producerSettings.parallelism) { result =>
      result.message.passThrough.commitScaladsl()
    }
    .runWith(Sink.ignore)
Java
Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
  .map(msg ->
    new ProducerMessage.Message<byte[], String, ConsumerMessage.Committable>(
      new ProducerRecord<>("topic2", msg.record().value()), msg.committableOffset()))
  .via(Producer.flow(producerSettings))
  .mapAsync(producerSettings.parallelism(), result ->
    result.message().passThrough().commitJavadsl())
  .runWith(Sink.ignore(), materializer);

Sharing KafkaProducer

If you have many streams it can be more efficient to share the underlying KafkaProducer.

You can create a KafkaProducer instance from ProducerSettings.

Scala
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
  .withBootstrapServers("localhost:9092")
val kafkaProducer = producerSettings.createKafkaProducer()
Java
protected final ProducerSettings<byte[], String> producerSettings = ProducerSettings
  .create(system, new ByteArraySerializer(), new StringSerializer())
  .withBootstrapServers("localhost:9092");
protected final KafkaProducer<byte[], String> kafkaProducer = producerSettings.createKafkaProducer();

The KafkaProducer is passed as a parameter to the Producer factory methods.

Scala
val done = Source(1 to 100)
  .map(_.toString)
  .map { elem =>
    new ProducerRecord[Array[Byte], String]("topic1", elem)
  }
  .runWith(Producer.plainSink(producerSettings, kafkaProducer))
Java
CompletionStage<Done> done =
        Source.range(1, 100)
                .map(n -> n.toString()).map(elem -> new ProducerRecord<byte[], String>("topic1", elem))
                .runWith(Producer.plainSink(producerSettings, kafkaProducer), materializer);
The source code for this page can be found here.