RetryFlow.withBackoff

Wrap the given FlowFlow and retry individual elements in that stream with an exponential backoff. A decider function tests every emitted element and can return a new element to be sent to the wrapped flow for another try.

Error handling

Signature

Scala
def withBackoff[In, Out, Mat](minBackoff: FiniteDuration,maxBackoff: FiniteDuration,randomFactor: Double,maxRetries: Int,flow: Flow[In, Out, Mat])(decideRetry: (In, Out) => Option[In]): Flow[In, Out, Mat]
Java
<In, Out, Mat> Flow<In, Out, Mat> withBackoff(
    Duration minBackoff,
    Duration maxBackoff,
    double randomFactor,
    int maxRetries,
    Flow<In, Out, Mat> flow,
    akka.japi.function.Function2<In, Out, Optional<In>> decideRetry)

API documentation

RetryFlowRetryFlow

Description

When an element is emitted by the wrapped flow it is passed to the decideRetry function, which may return an element to retry in the flow.

The retry backoff is controlled by the minBackoff, maxBackoff and randomFactor parameters. At most maxRetries will be made after the initial try.

The wrapped flow must have one-in one-out semantics. It may not filter, nor duplicate elements. The RetryFlow will fail if two elements are emitted from the flow, it will be stuck “forever” if nothing is emitted. Just one element will be emitted into the flow at any time. The flow needs to emit an element before the next will be emitted to it.

Elements are retried as long as maxRetries is not reached and the decideRetry function returns a new element to be sent to flow. The decideRetry function gets passed in the original element sent to the flow and the element emitted by it. When decideRetry returns NoneOptional.empty, no retries will be issued, and the response will be emitted downstream.

Note

This API was added in Akka 2.6.0 and may be changed in further patch releases.

This example wraps a flow handling IntsIntegers, and retries elements unless the result is 0 or negative, or maxRetries is hit.

Scala
val flow: Flow[Int, Int, NotUsed] = // ???

val retryFlow: Flow[Int, Int, NotUsed] =
  RetryFlow.withBackoff(minBackoff = 10.millis, maxBackoff = 5.seconds, randomFactor = 0d, maxRetries = 3, flow)(
    decideRetry = {
      case (_, result) if result > 0 => Some(result)
      case _                         => None
    })
Java
Flow<Integer, Integer, NotUsed> flow = // ...
    // the wrapped flow

Flow<Integer, Integer, NotUsed> retryFlow =
    RetryFlow.withBackoff(
        minBackoff,
        maxBackoff,
        randomFactor,
        maxRetries,
        flow,
        (in, out) -> {
          if (out > 0) return Optional.of(out);
          else return Optional.empty();
        });

Reactive Streams semantics

emits when the wrapped flow emits, and either maxRetries is reached or decideRetry returns NoneOptional.empty

backpressures during backoff, when the wrapped flow backpressures, or when downstream backpressures

completes when upstream or the wrapped flow completes

cancels when downstream or the wrapped flow cancels

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.