Error handling

Failing consumer

When a consumer fails to read from Kafka due to connection problems, it throws a WakeupException which is handled internally with retries. Refer to consumer configuration settings for details on wakeup-timeout and max-wakeups if you’re interested in tweaking the retry handling parameters. When the last retry fails, source stage will be failed with an exception.

Failing producer

Retry handling in case of producer is built-in into Kafka. In case of failure when sending a message, an exception will be thrown, which should fail the stream.

Restarting the stream

Typical approach is to run the stream inside an actor. When there’s an exception, this actor should be stopped and a new one should be created. Stopping the actor on stream failure:

val done =
  Consumer.plainSource(consumerSettings, Subscriptions.topics("topic1"))
    .mapAsync(1)(msg => processingActor ? ProcessMsg(msg))

done.onComplete {
  case Failure(ex) =>
    log.error(ex, "Stream failed, stopping the actor.")
    self ! PoisonPill
  case Success(ex) => // graceful stream shutdown handling
CompletionStage<Done> done = Consumer.plainSource(
        .mapAsync(1, msg -> ask(processingActor, msg, timeout)) // akka.pattern.PatternsCS.ask
        .map(elem -> (ConsumerRecord<byte[], String>) elem)
        .runWith(Sink.ignore(), materializer);

done.exceptionally(e -> {
    system.log().error(e, e.getMessage());
    getSelf().tell(PoisonPill.getInstance(), getSelf());
    return Done.getInstance();


In order to ensure that stopped actor gets re-created, it can be wrapped with a BackoffSupervisor

import akka.pattern.{Backoff, BackoffSupervisor}

val childProps = Props(classOf[StreamWrapperActor])

val supervisorProps = BackoffSupervisor.props(
    childName = "streamActor",
    minBackoff = 3.seconds,
    maxBackoff = 30.seconds,
    randomFactor = 0.2
val supervisor = system.actorOf(supervisorProps, name = "streamActorSupervisor")
Props childProps = Props.create(StreamWrapperActor.class);

final Props supervisorProps = BackoffSupervisor.props(
                Duration.create(3, TimeUnit.SECONDS),
                Duration.create(30, TimeUnit.SECONDS),

system.actorOf(supervisorProps, "streamActorSupervisor");

When a stream fails, library internals will handle all underlying resources.


If reading from Kafka failure is caused by other reasons, like deserialization problems, then the stage will fail immediately. If you expect such cases, consider consuming raw byte arrays and deserializing in a subsequent map stage where you can use supervision to skip failed elements. See also the “At least once” page for more suggestions.

The source code for this page can be found here.