Source.tick

A periodical repetition of an arbitrary object.

Source operators

Signature

def tick[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T): Source[T, Cancellable]

Description

A periodical repetition of an arbitrary object. Delay of first tick is specified separately from interval of the following ticks.

If downstream is applying backpressure when the time period has passed the tick is dropped.

The source materializes a CancellableCancellable that can be used to complete the source.

Note

The element must be immutable as the source can be materialized several times and may pass it between threads, see the second example for achieving a periodical element that changes over time.

See also:

  • repeat Stream a single object repeatedly.
  • cycle Stream iterator in cycled manner.

Examples

This first example prints to standard out periodically:

Scala
Source
  .tick(
    1.second, // delay of first tick
    1.second, // delay of subsequent ticks
    "tick" // element emitted each tick
  )
  .runForeach(println)
Java
Source.tick(
        Duration.ofSeconds(1), // delay of first tick
        Duration.ofSeconds(1), // delay of subsequent ticks
        "tick" // element emitted each tick
        )
    .runForeach(System.out::println, system);

You can also use the tick to periodically emit a value, in this sample we use the tick to trigger a query to an actor using ask and emit the response downstream. For this usage, what is important is that it was emitted, not the actual tick value.

Scala
val periodicActorResponse: Source[String, Cancellable] = Source
  .tick(1.second, 1.second, "tick")
  .mapAsync(1) { _ =>
    implicit val timeout: Timeout = 3.seconds
    val response: Future[MyActor.Response] = myActor.ask(MyActor.Query)
    response
  }
  .map(_.text);
Java
Source<String, Cancellable> periodicActorResponse =
    Source.tick(Duration.ofSeconds(1), Duration.ofSeconds(1), "tick")
        .mapAsync(
            1,
            notUsed -> {
              CompletionStage<MyActor.Response> response =
                  AskPattern.ask(
                      myActor, MyActor.Query::new, Duration.ofSeconds(3), system.scheduler());
              return response;
            })
        .map(response -> response.text);

A neat trick is to combine this with zipLatest to combine a stream of elements with a value that is updated periodically instead of having to trigger a query for each element:

Scala
val zipWithLatestResponse: Flow[Int, (Int, String), NotUsed] =
  Flow[Int].zipLatest(periodicActorResponse);
Java
Flow<Integer, Pair<Integer, String>, NotUsed> zipWithLatestResponse =
    Flow.of(Integer.class).zipLatest(periodicActorResponse);

Reactive Streams semantics

emits periodically, if there is downstream backpressure ticks are skipped

completes when the materialized Cancellable is cancelled

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.