Error handling
Failing consumer
When a consumer fails to read from Kafka due to connection problems, it throws a WakeupException which is handled internally with retries. Refer to consumer configuration settings for details on wakeup-timeout
and max-wakeups
if you’re interested in tweaking the retry handling parameters. When the currently configured number of max-wakeups
is reached, the source stage will fail with an exception and stop.
Failing producer
Retry handling for producers is built-in into Kafka. In case of failure when sending a message, an exception will be thrown, which should fail the stream.
Restarting the stream with a backoff stage
Akka streams provides graph stages to gracefully restart a stream on failure, with a configurable backoff. This can be taken advantage of to restart a failing consumer with an exponential backoff, by wrapping it in a RestartSource
:
- Scala
- Java
-
AtomicReference<Consumer.Control> control = new AtomicReference<>(Consumer.createNoopControl()); RestartSource.onFailuresWithBackoff( java.time.Duration.ofSeconds(3), java.time.Duration.ofSeconds(30), 0.2, () -> Consumer.plainSource(consumerSettings, Subscriptions.topics(topic)) .mapMaterializedValue( c -> { control.set(c); return c; }) .via(business())) .runWith(Sink.ignore(), materializer); control.get().shutdown();
When a stream fails, library internals will handle all underlying resources.
If reading from Kafka failure is caused by other reasons, like deserialization problems, then the stage will fail immediately. If you expect such cases, consider consuming raw byte arrays and deserializing in a subsequent map
stage where you can use supervision to skip failed elements. See also Serialization and “At least once” pages for more suggestions.