RestartSource.onFailuresWithBackoff

Wrap the given SourceSource with a SourceSource that will restart it when it fails using an exponential backoff.

Error handling

Signature

RestartSource.onFailuresWithBackoffRestartSource.onFailuresWithBackoff

Description

Wraps the given SourceSource with a SourceSource that will restart it when it fails using an exponential backoff. The backoff resets back to minBackoff if there hasn’t been a failure within minBackoff.

This SourceSource will never emit a failure, since the failure of the wrapped SourceSource is always handled by restarting. The wrapped SourceSource can be completed by completing this SourceSource. When that happens, the wrapped SourceSource, if currently running will be cancelled, and it will not be restarted. This can be triggered by the downstream cancelling, or externally by introducing a KillSwitch right after this SourceSource in the graph.

See also:

Examples

This shows that a Source is not restarted if it completes, only if it fails. Tick is only printed three times as the take(3) means the inner source completes successfully after emitting the first 3 elements.

Scala
val finiteSource = Source.tick(1.second, 1.second, "tick").take(3)
val forever = RestartSource.onFailuresWithBackoff(1.second, 10.seconds, 0.1)(() => finiteSource)
forever.runWith(Sink.foreach(println))
// prints
// tick
// tick
// tick
Java
Source<String, Cancellable> finiteSource =
    Source.tick(Duration.ofSeconds(1), Duration.ofSeconds(1), "tick").take(3);
Source<String, NotUsed> forever =
    RestartSource.onFailuresWithBackoff(
        Duration.ofSeconds(1), Duration.ofSeconds(10), 0.1, () -> finiteSource);
forever.runWith(Sink.foreach(System.out::println), system);
// prints
// tick
// tick
// tick

If the inner source instead fails, it will be restarted with an increasing backoff. The source emits 1, 2, 3, and then throws an exception. The first time the exception is thrown the source is restarted after 1s, then 2s etc, until the maxBackoff of 10s.

Scala
// could throw if for example it used a database connection to get rows
val flakySource: Source[() => Int, NotUsed] =
  Source(List(() => 1, () => 2, () => 3, () => throw CantConnectToDatabase("darn")))
val forever =
  RestartSource.onFailuresWithBackoff(minBackoff = 1.second, maxBackoff = 10.seconds, 0.1)(() => flakySource)
forever.runWith(Sink.foreach(nr => system.log.info("{}", nr())))
// logs
//[INFO] [12/10/2019 13:51:58.300] [default-akka.test.stream-dispatcher-7] [akka.actor.ActorSystemImpl(default)] 1
//[INFO] [12/10/2019 13:51:58.301] [default-akka.test.stream-dispatcher-7] [akka.actor.ActorSystemImpl(default)] 2
//[INFO] [12/10/2019 13:51:58.302] [default-akka.test.stream-dispatcher-7] [akka.actor.ActorSystemImpl(default)] 3
//[WARN] [12/10/2019 13:51:58.310] [default-akka.test.stream-dispatcher-7] [RestartWithBackoffSource(akka://default)] Restarting graph due to failure. stack_trace:  (docs.stream.operators.source.Restart$CantConnectToDatabase: darn)
// --> 1 second gap
//[INFO] [12/10/2019 13:51:59.379] [default-akka.test.stream-dispatcher-8] [akka.actor.ActorSystemImpl(default)] 1
//[INFO] [12/10/2019 13:51:59.382] [default-akka.test.stream-dispatcher-8] [akka.actor.ActorSystemImpl(default)] 2
//[INFO] [12/10/2019 13:51:59.383] [default-akka.test.stream-dispatcher-8] [akka.actor.ActorSystemImpl(default)] 3
//[WARN] [12/10/2019 13:51:59.386] [default-akka.test.stream-dispatcher-8] [RestartWithBackoffSource(akka://default)] Restarting graph due to failure. stack_trace:  (docs.stream.operators.source.Restart$CantConnectToDatabase: darn)
//--> 2 second gap
//[INFO] [12/10/2019 13:52:01.594] [default-akka.test.stream-dispatcher-8] [akka.actor.ActorSystemImpl(default)] 1
//[INFO] [12/10/2019 13:52:01.595] [default-akka.test.stream-dispatcher-8] [akka.actor.ActorSystemImpl(default)] 2
//[INFO] [12/10/2019 13:52:01.595] [default-akka.test.stream-dispatcher-8] [akka.actor.ActorSystemImpl(default)] 3
//[WARN] [12/10/2019 13:52:01.596] [default-akka.test.stream-dispatcher-8] [RestartWithBackoffSource(akka://default)] Restarting graph due to failure. stack_trace:  (docs.stream.operators.source.Restart$CantConnectToDatabase: darn)
Java
// could throw if for example it used a database connection to get rows
Source<Creator<Integer>, NotUsed> flakySource =
    Source.from(
        Arrays.<Creator<Integer>>asList(
            () -> 1,
            () -> 2,
            () -> 3,
            () -> {
              throw new RuntimeException("darn");
            }));
Source<Creator<Integer>, NotUsed> forever =
    RestartSource.onFailuresWithBackoff(
        Duration.ofSeconds(1), Duration.ofSeconds(10), 0.1, () -> flakySource);
forever.runWith(
    Sink.foreach((Creator<Integer> nr) -> system.log().info("{}", nr.create())), system);
// logs
// [INFO] [12/10/2019 13:51:58.300] [default-akka.test.stream-dispatcher-7]
// [akka.actor.ActorSystemImpl(default)] 1
// [INFO] [12/10/2019 13:51:58.301] [default-akka.test.stream-dispatcher-7]
// [akka.actor.ActorSystemImpl(default)] 2
// [INFO] [12/10/2019 13:51:58.302] [default-akka.test.stream-dispatcher-7]
// [akka.actor.ActorSystemImpl(default)] 3
// [WARN] [12/10/2019 13:51:58.310] [default-akka.test.stream-dispatcher-7]
// [RestartWithBackoffSource(akka://default)] Restarting graph due to failure. stack_trace:
// (RuntimeException: darn)
// --> 1 second gap
// [INFO] [12/10/2019 13:51:59.379] [default-akka.test.stream-dispatcher-8]
// [akka.actor.ActorSystemImpl(default)] 1
// [INFO] [12/10/2019 13:51:59.382] [default-akka.test.stream-dispatcher-8]
// [akka.actor.ActorSystemImpl(default)] 2
// [INFO] [12/10/2019 13:51:59.383] [default-akka.test.stream-dispatcher-8]
// [akka.actor.ActorSystemImpl(default)] 3
// [WARN] [12/10/2019 13:51:59.386] [default-akka.test.stream-dispatcher-8]
// [RestartWithBackoffSource(akka://default)] Restarting graph due to failure. stack_trace:
// (RuntimeException: darn)
// --> 2 second gap
// [INFO] [12/10/2019 13:52:01.594] [default-akka.test.stream-dispatcher-8]
// [akka.actor.ActorSystemImpl(default)] 1
// [INFO] [12/10/2019 13:52:01.595] [default-akka.test.stream-dispatcher-8]
// [akka.actor.ActorSystemImpl(default)] 2
// [INFO] [12/10/2019 13:52:01.595] [default-akka.test.stream-dispatcher-8]
// [akka.actor.ActorSystemImpl(default)] 3
// [WARN] [12/10/2019 13:52:01.596] [default-akka.test.stream-dispatcher-8]
// [RestartWithBackoffSource(akka://default)] Restarting graph due to failure. stack_trace:
// (RuntimeException: darn)

Finally, to be able to stop the restarting, a kill switch can be used. The kill switch is inserted right after the restart source. The inner source is the same as above so emits 3 elements and then fails. A killswitch is used to be able to stop the source being restarted:

Scala
val flakySource: Source[() => Int, NotUsed] =
  Source(List(() => 1, () => 2, () => 3, () => throw CantConnectToDatabase("darn")))
val stopRestarting: UniqueKillSwitch =
  RestartSource
    .onFailuresWithBackoff(1.second, 10.seconds, 0.1)(() => flakySource)
    .viaMat(KillSwitches.single)(Keep.right)
    .toMat(Sink.foreach(nr => println(s"Nr ${nr()}")))(Keep.left)
    .run()
//... from some where else
// stop the source from restarting
stopRestarting.shutdown()
Java
Source<Creator<Integer>, NotUsed> flakySource =
    Source.from(
        Arrays.<Creator<Integer>>asList(
            () -> 1,
            () -> 2,
            () -> 3,
            () -> {
              throw new RuntimeException("darn");
            }));
UniqueKillSwitch stopRestarting =
    RestartSource.onFailuresWithBackoff(
            Duration.ofSeconds(1), Duration.ofSeconds(10), 0.1, () -> flakySource)
        .viaMat(KillSwitches.single(), Keep.right())
        .toMat(Sink.foreach(nr -> System.out.println("nr " + nr.create())), Keep.left())
        .run(system);
// ... from some where else
// stop the source from restarting
stopRestarting.shutdown();

Reactive Streams semantics

emits when the wrapped source emits

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.