Source.tick
A periodical repetition of an arbitrary object.
Signature
Description
A periodical repetition of an arbitrary object. Delay of first tick is specified separately from interval of the following ticks.
If downstream is applying backpressure when the time period has passed the tick is dropped.
The source materializes a Cancellable
Cancellable
that can be used to complete the source.
The element must be immutable as the source can be materialized several times and may pass it between threads, see the second example for achieving a periodical element that changes over time.
See also:
Examples
This first example prints to standard out periodically:
- Scala
-
source
Source .tick( 1.second, // delay of first tick 1.second, // delay of subsequent ticks "tick" // element emitted each tick ) .runForeach(println)
- Java
-
source
Source.tick( Duration.ofSeconds(1), // delay of first tick Duration.ofSeconds(1), // delay of subsequent ticks "tick" // element emitted each tick ) .runForeach(System.out::println, system);
You can also use the tick to periodically emit a value, in this sample we use the tick to trigger a query to an actor using ask and emit the response downstream. For this usage, what is important is that it was emitted, not the actual tick value.
- Scala
-
source
val periodicActorResponse: Source[String, Cancellable] = Source .tick(1.second, 1.second, "tick") .mapAsync(1) { _ => implicit val timeout: Timeout = 3.seconds val response: Future[MyActor.Response] = myActor.ask(MyActor.Query(_)) response } .map(_.text);
- Java
-
source
Source<String, Cancellable> periodicActorResponse = Source.tick(Duration.ofSeconds(1), Duration.ofSeconds(1), "tick") .mapAsync( 1, notUsed -> { CompletionStage<MyActor.Response> response = AskPattern.ask( myActor, MyActor.Query::new, Duration.ofSeconds(3), system.scheduler()); return response; }) .map(response -> response.text);
A neat trick is to combine this with zipLatest to combine a stream of elements with a value that is updated periodically instead of having to trigger a query for each element:
- Scala
-
source
val zipWithLatestResponse: Flow[Int, (Int, String), NotUsed] = Flow[Int].zipLatest(periodicActorResponse);
- Java
-
source
Flow<Integer, Pair<Integer, String>, NotUsed> zipWithLatestResponse = Flow.of(Integer.class).zipLatest(periodicActorResponse);
Reactive Streams semantics
emits periodically, if there is downstream backpressure ticks are skipped
completes when the materialized Cancellable
is cancelled