Serialization

The general recommendation for de-/serialization of messages is to use byte arrays (or Strings) as value and do the de-/serialization in a map operation in the Akka Stream instead of implementing it directly in Kafka de-/serializers. When deserialization is handled explicitly within the Akka Stream, it is easier to implement the desired error handling strategy as the examples below show.

Jackson JSON

Serializing data to JSON text with Jackson in a map operator will turn the object instance into a String which is used as value in the ProducerRecord.

Java
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.core.JsonParseException;

final ObjectMapper mapper = new ObjectMapper();
final ObjectWriter sampleDataWriter = mapper.writerFor(SampleData.class);

CompletionStage<Done> producerCompletion =
    Source.from(samples)
        .map(sampleDataWriter::writeValueAsString)
        .map(json -> new ProducerRecord<String, String>(topic, json))
        .runWith(Producer.plainSink(producerDefaults()), mat);

To de-serialize a JSON String with Jackson in a map operator, extract the String and apply the Jackson object reader in a map operator. Amend the map operator with the extracted type as the object reader is not generic.

This example uses resuming to react on data which can’t be parsed correctly and ignores faulty elements.

Java
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.core.JsonParseException;

final ObjectMapper mapper = new ObjectMapper();
final ObjectReader sampleDataReader = mapper.readerFor(SampleData.class);

final Attributes resumeOnParseException =
    ActorAttributes.withSupervisionStrategy(
        exception -> {
          if (exception instanceof JsonParseException) {
            return Supervision.resume();
          } else {
            return Supervision.stop();
          }
        });

Consumer.DrainingControl<List<SampleData>> control =
    Consumer.plainSource(consumerSettings, Subscriptions.topics(topic))
        .map(ConsumerRecord::value)
        .<SampleData>map(sampleDataReader::readValue)
        .withAttributes(resumeOnParseException) // drop faulty elements
        .toMat(Sink.seq(), Keep.both())
        .mapMaterializedValue(Consumer::createDrainingControl)
        .run(mat);

Spray JSON

To de-serialize a JSON String with Spray JSON in a map operator, extract the String and use the Spray-provided implicits parseJson and convertTo in a map operator.

This example uses resuming to react on data which can’t be parsed correctly and ignores faulty elements.

Scala
import spray.json._

final case class SampleData(name: String, value: Int)

object SampleDataSprayProtocol extends DefaultJsonProtocol {
  implicit val sampleDataProtocol: RootJsonFormat[SampleData] = jsonFormat2(SampleData)
}

import SampleDataSprayProtocol._

    val resumeOnParsingException = ActorAttributes.withSupervisionStrategy {
      new akka.japi.function.Function[Throwable, Supervision.Directive] {
        override def apply(t: Throwable): Supervision.Directive = t match {
          case _: spray.json.JsonParser.ParsingException => Supervision.Resume
          case _ => Supervision.stop
        }
      }
    }

    val consumer = Consumer
      .plainSource(consumerSettings, Subscriptions.topics(topic))
      .map { consumerRecord =>
        val value = consumerRecord.value()
        val sampleData = value.parseJson.convertTo[SampleData]
        sampleData
      }
      .withAttributes(resumeOnParsingException)
      .toMat(Sink.seq)(Keep.both)
      .mapMaterializedValue(DrainingControl.apply)
      .run()

Avro with Schema Registry

If you want to use Confluent’s Schema Registry, you need to include the dependency on kafka-avro-serializer as shown below. It is not available from Maven Central, that’s why Confluent’s repository has to be specified. These examples use kafka-avro-seriazlizer version 5.0.1.

Maven
<project>
...
  <dependencies>
    ...
    <dependency>
      <groupId>io.confluent</groupId>
      <artifactId>kafka-avro-serializer</artifactId>
      <version>confluent.version (eg. 5.0.0)</version>
    </dependency>
    ...
  </dependencies>
  ...
  <repositories>
    <repository>
      <id>confluent-maven-repo</id>
      <name>Confluent Maven Repository</name>
      <url>https://packages.confluent.io/maven/</url>
    </repository>
  </repositories>
...
</project>
sbt
libraryDependencies += "io.confluent" % "kafka-avro-serializer" % confluentAvroVersion, //  eg. 5.0.0
resolvers += "Confluent Maven Repository" at "https://packages.confluent.io/maven/",
Gradle
dependencies {
  compile group: 'io.confluent', name: 'kafka-avro-serializer', version: confluentAvroVersion // eg. 5.0.0
}
repositories {
  maven {
    url  "https://packages.confluent.io/maven/"
  }
}

Producer

To create serializers that use the Schema Registry, its URL needs to be provided as configuration AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to the serializer and that serializer is used in the ProducerSettings.

Scala
import io.confluent.kafka.serializers.{AbstractKafkaAvroSerDeConfig, KafkaAvroDeserializer, KafkaAvroSerializer}
import org.apache.avro.specific.SpecificRecord
import org.apache.kafka.common.serialization._

val kafkaAvroSerDeConfig = Map[String, Any] {
  AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl
}
val producerSettings: ProducerSettings[String, SpecificRecord] = {
  val kafkaAvroSerializer = new KafkaAvroSerializer()
  kafkaAvroSerializer.configure(kafkaAvroSerDeConfig.asJava, false)
  val serializer = kafkaAvroSerializer.asInstanceOf[Serializer[SpecificRecord]]

  ProducerSettings(system, new StringSerializer, serializer)
    .withBootstrapServers(bootstrapServers)
}

val sample = new SampleAvroClass("key", "name")
val samples = immutable.Seq(sample, sample, sample)
val producerCompletion =
  Source(samples)
    .map(n => new ProducerRecord[String, SpecificRecord](topic, n.key, n))
    .runWith(Producer.plainSink(producerSettings))
Java
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

Map<String, Object> kafkaAvroSerDeConfig = new HashMap<>();
kafkaAvroSerDeConfig.put(
    AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
KafkaAvroSerializer kafkaAvroSerializer = new KafkaAvroSerializer();
kafkaAvroSerializer.configure(kafkaAvroSerDeConfig, false);
Serializer<Object> serializer = kafkaAvroSerializer;

ProducerSettings<String, Object> producerSettings =
    ProducerSettings.create(sys, new StringSerializer(), serializer)
        .withBootstrapServers(bootstrapServers());

SampleAvroClass sample = new SampleAvroClass("key", "name");
List<SampleAvroClass> samples = Arrays.asList(sample, sample, sample);
CompletionStage<Done> producerCompletion =
    Source.from(samples)
        .map(n -> new ProducerRecord<String, Object>(topic, n.key(), n))
        .runWith(Producer.plainSink(producerSettings), mat);

Consumer

To create deserializers that use the Schema Registry, its URL needs to be provided as configuration AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to the deserializer and that deserializer is used in the ConsumerSettings.

Scala
import io.confluent.kafka.serializers.{AbstractKafkaAvroSerDeConfig, KafkaAvroDeserializer, KafkaAvroSerializer}
import org.apache.avro.specific.SpecificRecord
import org.apache.kafka.common.serialization._

val kafkaAvroSerDeConfig = Map[String, Any] {
  AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl
}
val consumerSettings: ConsumerSettings[String, SpecificRecord] = {
  val kafkaAvroDeserializer = new KafkaAvroDeserializer()
  kafkaAvroDeserializer.configure(kafkaAvroSerDeConfig.asJava, false)
  val deserializer = kafkaAvroDeserializer.asInstanceOf[Deserializer[SpecificRecord]]

  ConsumerSettings(system, new StringDeserializer, deserializer)
    .withBootstrapServers(bootstrapServers)
    .withGroupId(group)
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
}

val (control, result) =
  Consumer
    .plainSource(consumerSettings, Subscriptions.topics(topic))
    .take(samples.size.toLong)
    .toMat(Sink.seq)(Keep.both)
    .run()
Java
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

Map<String, Object> kafkaAvroSerDeConfig = new HashMap<>();
kafkaAvroSerDeConfig.put(
    AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer();
kafkaAvroDeserializer.configure(kafkaAvroSerDeConfig, false);
Deserializer<Object> deserializer = kafkaAvroDeserializer;

ConsumerSettings<String, Object> consumerSettings =
    ConsumerSettings.create(sys, new StringDeserializer(), deserializer)
        .withBootstrapServers(bootstrapServers())
        .withGroupId(group)
        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

Consumer.DrainingControl<List<ConsumerRecord<String, Object>>> controlCompletionStagePair =
    Consumer.plainSource(consumerSettings, Subscriptions.topics(topic))
        .take(samples.size())
        .toMat(Sink.seq(), Keep.both())
        .mapMaterializedValue(Consumer::createDrainingControl)
        .run(mat);
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.