Source.unfold

Stream the result of a function as long as it returns a Some non empty Optional.

Source operators

Signature

Source.unfoldSource.unfold

Description

Stream the result of a function as long as it returns a Some non empty Optional. The value inside the option consists of a tuple pair where the first value is a state passed back into the next call to the function allowing to pass a state. The first invocation of the provided fold function will receive the zero state.

Warning

The same zero state object will be used for every materialization of the Source so it is mandatory that the state is immutable. For example a java.util.Iterator, Array or Java standard library collection would not be safe as the fold operation could mutate the value. If you must use a mutable value, combining with Source.lazySource to make sure a new mutable zero value is created for each materialization is one solution.

Note that for unfolding a source of elements through a blocking API, such as a network or filesystem resource you should prefer using unfoldResource.

Examples

This first sample starts at a user provided integer and counts down to zero using unfold :

Scala
sourcedef countDown(from: Int): Source[Int, NotUsed] =
  Source.unfold(from) { current =>
    if (current == 0) None
    else Some((current - 1, current))
  }
Java
sourcepublic static Source<Integer, NotUsed> countDown(Integer from) {
  return Source.unfold(
      from,
      current -> {
        if (current == 0) return Optional.empty();
        else return Optional.of(Pair.create(current - 1, current));
      });
}

It is also possible to express unfolds that don’t have an end, which will never return None Optional.empty and must be combined with for example .take(n) to not produce infinite streams. Here we have implemented the Fibonacci numbers (0, 1, 1, 2, 3, 5, 8, 13, etc.) with unfold:

Scala
sourcedef fibonacci: Source[BigInt, NotUsed] =
  Source.unfold((BigInt(0), BigInt(1))) {
    case (a, b) =>
      Some(((b, a + b), a))
  }
Java
sourcepublic static Source<BigInteger, NotUsed> fibonacci() {
  return Source.unfold(
      Pair.create(BigInteger.ZERO, BigInteger.ONE),
      current -> {
        BigInteger a = current.first();
        BigInteger b = current.second();
        Pair<BigInteger, BigInteger> next = Pair.create(b, a.add(b));
        return Optional.of(Pair.create(next, a));
      });
}

Reactive Streams semantics

emits when there is demand and the unfold function over the previous state returns non empty value

completes when the unfold function returns an empty value

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.