Error handling

Failing consumer

Errors from the Kafka consumer will be forwarded to the Alpakka sources that use it, the sources will fail their streams.

Lost connection to the Kafka broker

To fail a Alpakka Kafka consumer in case the Kafka broker is not available, configure a Connection Checker via ConsumerSettings. If not Connection Checker is configured, Alpakka will continue to poll the broker indefinitely.

Failing producer

Retry handling for producers is built-in into Kafka. In case of failure when sending a message, an exception will be thrown, which should fail the stream.

Restarting the stream with a backoff stage

Akka streams provides graph stages to gracefully restart a stream on failure, with a configurable backoff. This can be taken advantage of to restart a failing consumer with an exponential backoff, by wrapping it in a RestartSource:

Scala
val control = new AtomicReference[Consumer.Control](Consumer.NoopControl)

val result = RestartSource
  .onFailuresWithBackoff(
    minBackoff = 3.seconds,
    maxBackoff = 30.seconds,
    randomFactor = 0.2
  ) { () =>
    Consumer
      .plainSource(consumerSettings, Subscriptions.topics(topic))
      // this is a hack to get access to the Consumer.Control
      // instances of the latest Kafka Consumer source
      .mapMaterializedValue(c => control.set(c))
      .via(businessFlow)
  }
  .runWith(Sink.seq)

control.get().shutdown()
Java
AtomicReference<Consumer.Control> control = new AtomicReference<>(Consumer.createNoopControl());

RestartSource.onFailuresWithBackoff(
        java.time.Duration.ofSeconds(3),
        java.time.Duration.ofSeconds(30),
        0.2,
        () ->
            Consumer.plainSource(consumerSettings, Subscriptions.topics(topic))
                .mapMaterializedValue(
                    c -> {
                      control.set(c);
                      return c;
                    })
                .via(business()))
    .runWith(Sink.ignore(), materializer);

control.get().shutdown();

When a stream fails, library internals will handle all underlying resources.

(de)serialization

If reading from Kafka failure is caused by other reasons, like deserialization problems, then the stage will fail immediately. If you expect such cases, consider consuming raw byte arrays and deserializing in a subsequent map stage where you can use supervision to skip failed elements. See also Serialization and “At least once” pages for more suggestions.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.