Source.completionStageSource

Streams the elements of an asynchronous source once its given completion operator completes.

Source operators

Description

Streams the elements of an asynchronous source once its given completion operator completes. If the completion fails the stream is failed with that exception.

For the corresponding operator for the Scala standard library Future see futureSource.

Example

Suppose we are accessing a remote service that streams user data over HTTP/2 or a WebSocket. We can model that as a Source<User,NotUsed>Source[User,NotUsed] but that source will only be available once the connection has been established.

Java
import akka.NotUsed;
import akka.stream.javadsl.Source;

import java.util.concurrent.CompletionStage;

public class CompletionStageSource {

  public static void sourceCompletionStageSource() {
    UserRepository userRepository = null; // an abstraction over the remote service
    Source<User, CompletionStage<NotUsed>> userCompletionStageSource =
        Source.completionStageSource(userRepository.loadUsers());
    // ...
  }

  interface UserRepository {
    CompletionStage<Source<User, NotUsed>> loadUsers();
  }

  static class User {}
}

Reactive Streams semantics

emits the next value from the asynchronous source, once its completion operator has completed

completes after the asynchronous source completes

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.