Sink.takeLast

Collect the last n values emitted from the stream into a collection.

Sink operators

Signature

Sink.takeLastSink.takeLast

Description

Materializes into a Future CompletionStage of immutable.Seq[T] List<In> containing the last n collected elements when the stream completes. If the stream completes before signaling at least n elements, the Future CompletionStage will complete with the number of elements taken at that point. If the stream never completes, the Future CompletionStage will never complete. If there is a failure signaled in the stream the Future CompletionStage will be completed with failure.

Example

Scala
sourcecase class Student(name: String, gpa: Double)

val students = List(
  Student("Alison", 4.7),
  Student("Adrian", 3.1),
  Student("Alexis", 4),
  Student("Benita", 2.1),
  Student("Kendra", 4.2),
  Student("Jerrie", 4.3)).sortBy(_.gpa)

val sourceOfStudents = Source(students)

val result: Future[Seq[Student]] = sourceOfStudents.runWith(Sink.takeLast(3))

result.foreach { topThree =>
  println("#### Top students ####")
  topThree.reverse.foreach { s =>
    println(s"Name: ${s.name}, GPA: ${s.gpa}")
  }
}
/*
    #### Top students ####
    Name: Alison, GPA: 4.7
    Name: Jerrie, GPA: 4.3
    Name: Kendra, GPA: 4.2
 */
Java
sourceimport akka.japi.Pair;
import org.reactivestreams.Publisher;
// pair of (Name, GPA)
List<Pair<String, Double>> sortedStudents =
    Arrays.asList(
        new Pair<>("Benita", 2.1),
        new Pair<>("Adrian", 3.1),
        new Pair<>("Alexis", 4.0),
        new Pair<>("Kendra", 4.2),
        new Pair<>("Jerrie", 4.3),
        new Pair<>("Alison", 4.7));

Source<Pair<String, Double>, NotUsed> studentSource = Source.from(sortedStudents);

CompletionStage<List<Pair<String, Double>>> topThree =
    studentSource.runWith(Sink.takeLast(3), system);

topThree.thenAccept(
    result -> {
      System.out.println("#### Top students ####");
      for (int i = result.size() - 1; i >= 0; i--) {
        Pair<String, Double> s = result.get(i);
        System.out.println("Name: " + s.first() + ", " + "GPA: " + s.second());
      }
    });
/*
  #### Top students ####
  Name: Alison, GPA: 4.7
  Name: Jerrie, GPA: 4.3
  Name: Kendra, GPA: 4.2
*/

Reactive Streams semantics

cancels never

backpressures never

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.