monitor

Materializes to a FlowMonitor that monitors messages flowing through or completion of the operators.

Watching status operators

Signature

Source.monitorSource.monitor Flow.monitorFlow.monitor

Description

Materializes to a FlowMonitor that monitors messages flowing through or completion of the stream. Elements pass through unchanged. Note that the FlowMonitor inserts a memory barrier every time it processes an event, and may therefore affect performance. The provided FlowMonitor contains a state field you can use to peek and get information about the stream.

Example

The example below uses the monitorMat variant of monitor. The only difference between the two operators is that monitorMat has a combine argument so we can decide which materialization value to keep. In the sample below be Keep.right so only the FlowMonitor[Int] is returned.

Scala
sourceval source: Source[Int, NotUsed] =
  Source.fromIterator(() => Iterator.from(0))

def printMonitorState(flowMonitor: FlowMonitor[Int]) =
  flowMonitor.state match {
    case FlowMonitorState.Initialized =>
      println("Stream is initialized but hasn't processed any element")
    case FlowMonitorState.Received(msg) =>
      println(s"Last element processed: $msg")
    case FlowMonitorState.Failed(cause) =>
      println(s"Stream failed with cause $cause")
    case FlowMonitorState.Finished => println(s"Stream completed already")
  }

val monitoredSource: Source[Int, FlowMonitor[Int]] = source.take(6).throttle(5, 1.second).monitorMat(Keep.right)
val (flowMonitor, futureDone) =
  monitoredSource.toMat(Sink.foreach(println))(Keep.both).run()

// If we peek in the monitor too early, it's possible it was not initialized yet.
printMonitorState(flowMonitor)

// Periodically check the monitor
Source.tick(200.millis, 400.millis, "").runForeach(_ => printMonitorState(flowMonitor))
Java
sourceprivate static <T> void printMonitorState(FlowMonitorState.StreamState<T> state) {
  if (state == FlowMonitorState.finished()) {
    System.out.println("Stream is initialized but hasn't processed any element");
  } else if (state instanceof FlowMonitorState.Received) {
    FlowMonitorState.Received msg = (FlowMonitorState.Received) state;
    System.out.println("Last message received: " + msg.msg());
  } else if (state instanceof FlowMonitorState.Failed) {
    Throwable cause = ((FlowMonitorState.Failed) state).cause();
    System.out.println("Stream failed with cause: " + cause.getMessage());
  } else {
    System.out.println("Stream completed already");
  }
}
  Source<Integer, FlowMonitor<Integer>> monitoredSource =
      Source.fromIterator(() -> Arrays.asList(0, 1, 2, 3, 4, 5).iterator())
          .throttle(5, Duration.ofSeconds(1))
          .monitorMat(Keep.right());

  Pair<FlowMonitor<Integer>, CompletionStage<Done>> run =
      monitoredSource.toMat(Sink.foreach(System.out::println), Keep.both()).run(actorSystem);

  FlowMonitor<Integer> monitor = run.first();

  // If we peek in the monitor too early, it's possible it was not initialized yet.
  printMonitorState(monitor.state());

  // Periodically check the monitor
  Source.tick(Duration.ofMillis(200), Duration.ofMillis(400), "")
      .runForeach(__ -> printMonitorState(monitor.state()), actorSystem);

When run, the sample code will produce something similar to:

Stream is initialized but hasn't processed any element
0
1
2
Last element processed: 2
3
4
5
Stream completed already

Reactive Streams semantics

emits when upstream emits an element

backpressures when downstream backpressures

completes when upstream completes

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.