Source.queue

Materialize a SourceQueue onto which elements can be pushed for emitting from the source.

Source operators

Signature

def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, SourceQueueWithComplete[T]]

Description

Materialize a SourceQueue onto which elements can be pushed for emitting from the source. The queue contains a buffer, if elements are pushed onto the queue faster than the source is consumed the overflow will be handled with a strategy specified by the user. Functionality for tracking when an element has been emitted is available through SourceQueue.offer.

Using Source.queue you can push elements to the queue and they will be emitted to the stream if there is demand from downstream, otherwise they will be buffered until request for demand is received. Elements in the buffer will be discarded if downstream is terminated.

In combination with the queue, the throttle operator can be used to control the processing to a given limit, e.g. 5 elements per 3 seconds.

Example

Scala
sourceval bufferSize = 10
val elementsToProcess = 5

val queue = Source
  .queue[Int](bufferSize, OverflowStrategy.backpressure)
  .throttle(elementsToProcess, 3.second)
  .map(x => x * x)
  .toMat(Sink.foreach(x => println(s"completed $x")))(Keep.left)
  .run()

val source = Source(1 to 10)

implicit val ec = system.dispatcher
source
  .mapAsync(1)(x => {
    queue.offer(x).map {
      case QueueOfferResult.Enqueued    => println(s"enqueued $x")
      case QueueOfferResult.Dropped     => println(s"dropped $x")
      case QueueOfferResult.Failure(ex) => println(s"Offer failed ${ex.getMessage}")
      case QueueOfferResult.QueueClosed => println("Source Queue closed")
    }
  })
  .runWith(Sink.ignore)
Java
sourceint bufferSize = 10;
int elementsToProcess = 5;

SourceQueueWithComplete<Integer> sourceQueue =
    Source.<Integer>queue(bufferSize, OverflowStrategy.backpressure())
        .throttle(elementsToProcess, Duration.ofSeconds(3))
        .map(x -> x * x)
        .to(Sink.foreach(x -> System.out.println("got: " + x)))
        .run(mat);

Source<Integer, NotUsed> source = Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));

source.map(x -> sourceQueue.offer(x)).runWith(Sink.ignore(), mat);

Reactive Streams semantics

emits when there is demand and the queue contains elements

completes when downstream completes

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.