Error Handling - Version 2.5-SNAPSHOT

Error Handling

Strategies for how to handle exceptions from processing stream elements can be defined when materializing the stream. The error handling strategies are inspired by actor supervision strategies, but the semantics have been adapted to the domain of stream processing.

Warning

ZipWith, GraphStage junction, ActorPublisher source and ActorSubscriber sink components do not honour the supervision strategy attribute yet.

Supervision Strategies

There are three ways to handle exceptions from application code:

  • Stop - The stream is completed with failure.
  • Resume - The element is dropped and the stream continues.
  • Restart - The element is dropped and the stream continues after restarting the stage. Restarting a stage means that any accumulated state is cleared. This is typically performed by creating a new instance of the stage.

By default the stopping strategy is used for all exceptions, i.e. the stream will be completed with failure when an exception is thrown.

final Materializer mat = ActorMaterializer.create(system);
final Source<Integer, NotUsed> source = Source.from(Arrays.asList(0, 1, 2, 3, 4, 5))
  .map(elem -> 100 / elem);
final Sink<Integer, CompletionStage<Integer>> fold =
  Sink.<Integer, Integer> fold(0, (acc, elem) -> acc + elem);
final CompletionStage<Integer> result = source.runWith(fold, mat);
// division by zero will fail the stream and the
// result here will be a Future completed with Failure(ArithmeticException)

The default supervision strategy for a stream can be defined on the settings of the materializer.

final Function<Throwable, Supervision.Directive> decider = exc -> {
  if (exc instanceof ArithmeticException)
    return Supervision.resume();
  else
    return Supervision.stop();
};
final Materializer mat = ActorMaterializer.create(
  ActorMaterializerSettings.create(system).withSupervisionStrategy(decider),
  system);
final Source<Integer, NotUsed> source = Source.from(Arrays.asList(0, 1, 2, 3, 4, 5))
  .map(elem -> 100 / elem);
final Sink<Integer, CompletionStage<Integer>> fold =
  Sink.fold(0, (acc, elem) -> acc + elem);
final CompletionStage<Integer> result = source.runWith(fold, mat);
// the element causing division by zero will be dropped
// result here will be a Future completed with Success(228)

Here you can see that all ArithmeticException will resume the processing, i.e. the elements that cause the division by zero are effectively dropped.

Note

Be aware that dropping elements may result in deadlocks in graphs with cycles, as explained in Graph cycles, liveness and deadlocks.

The supervision strategy can also be defined for all operators of a flow.

final Materializer mat = ActorMaterializer.create(system);
final Function<Throwable, Supervision.Directive> decider = exc -> {
  if (exc instanceof ArithmeticException)
    return Supervision.resume();
  else
    return Supervision.stop();
};
final Flow<Integer, Integer, NotUsed> flow =
    Flow.of(Integer.class).filter(elem -> 100 / elem < 50).map(elem -> 100 / (5 - elem))
    .withAttributes(ActorAttributes.withSupervisionStrategy(decider));
final Source<Integer, NotUsed> source = Source.from(Arrays.asList(0, 1, 2, 3, 4, 5))
  .via(flow); 
final Sink<Integer, CompletionStage<Integer>> fold =
  Sink.<Integer, Integer> fold(0, (acc, elem) -> acc + elem);
final CompletionStage<Integer> result = source.runWith(fold, mat);
// the elements causing division by zero will be dropped
// result here will be a Future completed with Success(150)

Restart works in a similar way as Resume with the addition that accumulated state, if any, of the failing processing stage will be reset.

final Materializer mat = ActorMaterializer.create(system);
final Function<Throwable, Supervision.Directive> decider = exc -> {
  if (exc instanceof IllegalArgumentException)
    return Supervision.restart();
  else
    return Supervision.stop();
};
final Flow<Integer, Integer, NotUsed> flow =
  Flow.of(Integer.class).scan(0, (acc, elem) -> {
    if (elem < 0) throw new IllegalArgumentException("negative not allowed");
    else return acc + elem;
  })
  .withAttributes(ActorAttributes.withSupervisionStrategy(decider));
final Source<Integer, NotUsed> source = Source.from(Arrays.asList(1, 3, -1, 5, 7))
  .via(flow);
final CompletionStage<List<Integer>> result = source.grouped(1000)
  .runWith(Sink.<List<Integer>>head(), mat);
// the negative element cause the scan stage to be restarted,
// i.e. start from 0 again
// result here will be a Future completed with Success(List(0, 1, 4, 0, 5, 12))

Errors from mapAsync

Stream supervision can also be applied to the futures of mapAsync.

Let's say that we use an external service to lookup email addresses and we would like to discard those that cannot be found.

We start with the tweet stream of authors:

final Source<Author, NotUsed> authors = tweets
  .filter(t -> t.hashtags().contains(AKKA))
  .map(t -> t.author);

Assume that we can lookup their email address using:

public CompletionStage<String> lookupEmail(String handle)

The CompletionStage is completed normally if the email is not found.

Transforming the stream of authors to a stream of email addresses by using the lookupEmail service can be done with mapAsync and we use Supervision.getResumingDecider to drop unknown email addresses:

final Attributes resumeAttrib =
  ActorAttributes.withSupervisionStrategy(Supervision.getResumingDecider());
final Flow<Author, String, NotUsed> lookupEmail =
    Flow.of(Author.class)
    .mapAsync(4, author -> addressSystem.lookupEmail(author.handle))
    .withAttributes(resumeAttrib);
final Source<String, NotUsed> emailAddresses = authors.via(lookupEmail);

If we would not use Resume the default stopping strategy would complete the stream with failure on the first CompletionStage that was completed exceptionally.

Contents