Streams Cookbook
Loading

Streams Cookbook

Introduction

This is a collection of patterns to demonstrate various usage of the Akka Streams API by solving small targeted problems in the format of "recipes". The purpose of this page is to give inspiration and ideas how to approach various small tasks involving streams. The recipes in this page can be used directly as-is, but they are most powerful as starting points: customization of the code snippets is warmly encouraged.

This part also serves as supplementary material for the main body of documentation. It is a good idea to have this page open while reading the manual and look for examples demonstrating various streaming concepts as they appear in the main body of documentation.

If you need a quick reference of the available processing stages used in the recipes see Overview of built-in stages and their semantics.

Working with Flows

In this collection we show simple recipes that involve linear flows. The recipes in this section are rather general, more targeted recipes are available as separate sections (Buffers and working with rate, Working with streaming IO).

Logging elements of a stream

Situation: During development it is sometimes helpful to see what happens in a particular section of a stream.

The simplest solution is to simply use a map operation and use println to print the elements received to the console. While this recipe is rather simplistic, it is often suitable for a quick debug session.

val loggedSource = mySource.map { elem => println(elem); elem }

Another approach to logging is to use log() operation which allows configuring logging for elements flowing through the stream as well as completion and erroring.

// customise log levels
mySource.log("before-map")
  .withAttributes(Attributes.logLevels(onElement = Logging.WarningLevel))
  .map(analyse)

// or provide custom logging adapter
implicit val adapter = Logging(system, "customLogger")
mySource.log("custom")

Flattening a stream of sequences

Situation: A stream is given as a stream of sequence of elements, but a stream of elements needed instead, streaming all the nested elements inside the sequences separately.

The mapConcat operation can be used to implement a one-to-many transformation of elements using a mapper function in the form of In => immutable.Seq[Out]. In this case we want to map a Seq of elements to the elements in the collection itself, so we can just call mapConcat(identity).

val myData: Source[List[Message], Unit] = someDataSource
val flattened: Source[Message, Unit] = myData.mapConcat(identity)

Draining a stream to a strict collection

Situation: A finite sequence of elements is given as a stream, but a scala collection is needed instead.

In this recipe we will use the grouped stream operation that groups incoming elements into a stream of limited size collections (it can be seen as the almost opposite version of the "Flattening a stream of sequences" recipe we showed before). By using a grouped(MaxAllowedSeqSize) we create a stream of groups with maximum size of MaxAllowedSeqSize and then we take the first element of this stream by attaching a Sink.head. What we get is a Future containing a sequence with all the elements of the original up to MaxAllowedSeqSize size (further elements are dropped).

val strict: Future[immutable.Seq[Message]] =
  myData.grouped(MaxAllowedSeqSize).runWith(Sink.head)

Calculating the digest of a ByteString stream

Situation: A stream of bytes is given as a stream of ByteStrings and we want to calculate the cryptographic digest of the stream.

This recipe uses a PushPullStage to host a mutable MessageDigest class (part of the Java Cryptography API) and update it with the bytes arriving from the stream. When the stream starts, the onPull handler of the stage is called, which just bubbles up the pull event to its upstream. As a response to this pull, a ByteString chunk will arrive (onPush) which we use to update the digest, then it will pull for the next chunk.

Eventually the stream of ByteStrings depletes and we get a notification about this event via onUpstreamFinish. At this point we want to emit the digest value, but we cannot do it in this handler directly. Instead we call ctx.absorbTermination() signalling to our context that we do not yet want to finish. When the environment decides that we can emit further elements onPull is called again, and we see ctx.isFinishing returning true (since the upstream source has been depleted already). Since we only want to emit a final element it is enough to call ctx.pushAndFinish passing the digest ByteString to be emitted.

import akka.stream.stage._
def digestCalculator(algorithm: String) = new PushPullStage[ByteString, ByteString] {
  val digest = MessageDigest.getInstance(algorithm)

  override def onPush(chunk: ByteString, ctx: Context[ByteString]): SyncDirective = {
    digest.update(chunk.toArray)
    ctx.pull()
  }

  override def onPull(ctx: Context[ByteString]): SyncDirective = {
    if (ctx.isFinishing) ctx.pushAndFinish(ByteString(digest.digest()))
    else ctx.pull()
  }

  override def onUpstreamFinish(ctx: Context[ByteString]): TerminationDirective = {
    // If the stream is finished, we need to emit the last element in the onPull block.
    // It is not allowed to directly emit elements from a termination block
    // (onUpstreamFinish or onUpstreamFailure)
    ctx.absorbTermination()
  }
}

val digest: Source[ByteString, Unit] = data.transform(() => digestCalculator("SHA-256"))

Parsing lines from a stream of ByteStrings

Situation: A stream of bytes is given as a stream of ByteStrings containing lines terminated by line ending characters (or, alternatively, containing binary frames delimited by a special delimiter byte sequence) which needs to be parsed.

The Framing helper object contains a convenience method to parse messages from a stream of ByteStrings:

import akka.stream.io.Framing
val linesStream = rawData.via(Framing.delimiter(
  ByteString("\r\n"), maximumFrameLength = 100, allowTruncation = true))
  .map(_.utf8String)

Implementing reduce-by-key

Situation: Given a stream of elements, we want to calculate some aggregated value on different subgroups of the elements.

The "hello world" of reduce-by-key style operations is wordcount which we demonstrate below. Given a stream of words we first create a new stream wordStreams that groups the words according to the identity function, i.e. now we have a stream of streams, where every substream will serve identical words.

To count the words, we need to process the stream of streams (the actual groups containing identical words). By mapping over the groups and using fold (remember that fold automatically materializes and runs the stream it is used on) we get a stream with elements of Future[String,Int]. Now all we need is to flatten this stream, which can be achieved by calling mapAsync with identity function.

There is one tricky issue to be noted here. The careful reader probably noticed that we put a buffer between the mapAsync() operation that flattens the stream of futures and the actual stream of futures. The reason for this is that the substreams produced by groupBy() can only complete when the original upstream source completes. This means that mapAsync() cannot pull for more substreams because it still waits on folding futures to finish, but these futures never finish if the additional group streams are not consumed. This typical deadlock situation is resolved by this buffer which either able to contain all the group streams (which ensures that they are already running and folding) or fails with an explicit failure instead of a silent deadlock.

// split the words into separate streams first
val wordStreams: Source[(String, Source[String, Unit]), Unit] = words.groupBy(identity)

// add counting logic to the streams
val countedWords: Source[Future[(String, Int)], Unit] = wordStreams.map {
  case (word, wordStream) =>
    wordStream.runFold((word, 0)) {
      case ((w, count), _) => (w, count + 1)
    }
}

// get a stream of word counts
val counts: Source[(String, Int), Unit] =
  countedWords
    .buffer(MaximumDistinctWords, OverflowStrategy.fail)
    .mapAsync(4)(identity)

By extracting the parts specific to wordcount into

  • a groupKey function that defines the groups
  • a foldZero that defines the zero element used by the fold on the substream given the group key
  • a fold function that does the actual reduction

we get a generalized version below:

def reduceByKey[In, K, Out](
  maximumGroupSize: Int,
  groupKey: (In) => K,
  foldZero: (K) => Out)(fold: (Out, In) => Out): Flow[In, (K, Out), Unit] = {

  val groupStreams = Flow[In].groupBy(groupKey)
  val reducedValues = groupStreams.map {
    case (key, groupStream) =>
      groupStream.runFold((key, foldZero(key))) {
        case ((key, aggregated), elem) => (key, fold(aggregated, elem))
      }
  }

  reducedValues.buffer(maximumGroupSize, OverflowStrategy.fail).mapAsync(4)(identity)
}

val wordCounts = words.via(reduceByKey(
  MaximumDistinctWords,
  groupKey = (word: String) => word,
  foldZero = (key: String) => 0)(fold = (count: Int, elem: String) => count + 1))

Note

Please note that the reduce-by-key version we discussed above is sequential, in other words it is NOT a parallelization pattern like mapReduce and similar frameworks.

Sorting elements to multiple groups with groupBy

Situation: The groupBy operation strictly partitions incoming elements, each element belongs to exactly one group. Sometimes we want to map elements into multiple groups simultaneously.

To achieve the desired result, we attack the problem in two steps:

  • first, using a function topicMapper that gives a list of topics (groups) a message belongs to, we transform our stream of Message to a stream of (Message, Topic) where for each topic the message belongs to a separate pair will be emitted. This is achieved by using mapConcat
  • Then we take this new stream of message topic pairs (containing a separate pair for each topic a given message belongs to) and feed it into groupBy, using the topic as the group key.
val topicMapper: (Message) => immutable.Seq[Topic] = extractTopics

val messageAndTopic: Source[(Message, Topic), Unit] = elems.mapConcat { msg: Message =>
  val topicsForMessage = topicMapper(msg)
  // Create a (Msg, Topic) pair for each of the topics
  // the message belongs to
  topicsForMessage.map(msg -> _)
}

val multiGroups: Source[(Topic, Source[String, Unit]), Unit] = messageAndTopic
  .groupBy(_._2).map {
    case (topic, topicStream) =>
      // chopping of the topic from the (Message, Topic) pairs
      (topic, topicStream.map(_._1))
  }

Working with Graphs

In this collection we show recipes that use stream graph elements to achieve various goals.

Triggering the flow of elements programmatically

Situation: Given a stream of elements we want to control the emission of those elements according to a trigger signal. In other words, even if the stream would be able to flow (not being backpressured) we want to hold back elements until a trigger signal arrives.

This recipe solves the problem by simply zipping the stream of Message elments with the stream of Trigger signals. Since Zip produces pairs, we simply map the output stream selecting the first element of the pair.

val graph = FlowGraph.closed() { implicit builder =>
  import FlowGraph.Implicits._
  val zip = builder.add(Zip[Message, Trigger]())
  elements ~> zip.in0
  triggerSource ~> zip.in1
  zip.out ~> Flow[(Message, Trigger)].map { case (msg, trigger) => msg } ~> sink
}

Alternatively, instead of using a Zip, and then using map to get the first element of the pairs, we can avoid creating the pairs in the first place by using ZipWith which takes a two argument function to produce the output element. If this function would return a pair of the two argument it would be exactly the behavior of Zip so ZipWith is a generalization of zipping.

val graph = FlowGraph.closed() { implicit builder =>
  import FlowGraph.Implicits._
  val zip = builder.add(ZipWith((msg: Message, trigger: Trigger) => msg))

  elements ~> zip.in0
  triggerSource ~> zip.in1
  zip.out ~> sink
}

Balancing jobs to a fixed pool of workers

Situation: Given a stream of jobs and a worker process expressed as a Flow create a pool of workers that automatically balances incoming jobs to available workers, then merges the results.

We will express our solution as a function that takes a worker flow and the number of workers to be allocated and gives a flow that internally contains a pool of these workers. To achieve the desired result we will create a Flow from a graph.

The graph consists of a Balance node which is a special fan-out operation that tries to route elements to available downstream consumers. In a for loop we wire all of our desired workers as outputs of this balancer element, then we wire the outputs of these workers to a Merge element that will collect the results from the workers.

def balancer[In, Out](worker: Flow[In, Out, Any], workerCount: Int): Flow[In, Out, Unit] = {
  import FlowGraph.Implicits._

  Flow() { implicit b =>
    val balancer = b.add(Balance[In](workerCount, waitForAllDownstreams = true))
    val merge = b.add(Merge[Out](workerCount))

    for (_ <- 1 to workerCount) {
      // for each worker, add an edge from the balancer to the worker, then wire
      // it to the merge element
      balancer ~> worker ~> merge
    }

    (balancer.in, merge.out)
  }
}

val processedJobs: Source[Result, Unit] = myJobs.via(balancer(worker, 3))

Working with rate

This collection of recipes demonstrate various patterns where rate differences between upstream and downstream needs to be handled by other strategies than simple backpressure.

Dropping elements

Situation: Given a fast producer and a slow consumer, we want to drop elements if necessary to not slow down the producer too much.

This can be solved by using the most versatile rate-transforming operation, conflate. Conflate can be thought as a special fold operation that collapses multiple upstream elements into one aggregate element if needed to keep the speed of the upstream unaffected by the downstream.

When the upstream is faster, the fold process of the conflate starts. This folding needs a zero element, which is given by a seed function that takes the current element and produces a zero for the folding process. In our case this is identity so our folding state starts form the message itself. The folder function is also special: given the aggregate value (the last message) and the new element (the freshest element) our aggregate state becomes simply the freshest element. This choice of functions results in a simple dropping operation.

val droppyStream: Flow[Message, Message, Unit] =
  Flow[Message].conflate(seed = identity)((lastMessage, newMessage) => newMessage)

Dropping broadcast

Situation: The default Broadcast graph element is properly backpressured, but that means that a slow downstream consumer can hold back the other downstream consumers resulting in lowered throughput. In other words the rate of Broadcast is the rate of its slowest downstream consumer. In certain cases it is desirable to allow faster consumers to progress independently of their slower siblings by dropping elements if necessary.

One solution to this problem is to append a buffer element in front of all of the downstream consumers defining a dropping strategy instead of the default Backpressure. This allows small temporary rate differences between the different consumers (the buffer smooths out small rate variances), but also allows faster consumers to progress by dropping from the buffer of the slow consumers if necessary.

val graph = FlowGraph.closed(mySink1, mySink2, mySink3)((_, _, _)) { implicit b =>
  (sink1, sink2, sink3) =>
    import FlowGraph.Implicits._

    val bcast = b.add(Broadcast[Int](3))
    myElements ~> bcast

    bcast.buffer(10, OverflowStrategy.dropHead) ~> sink1
    bcast.buffer(10, OverflowStrategy.dropHead) ~> sink2
    bcast.buffer(10, OverflowStrategy.dropHead) ~> sink3
}

Collecting missed ticks

Situation: Given a regular (stream) source of ticks, instead of trying to backpressure the producer of the ticks we want to keep a counter of the missed ticks instead and pass it down when possible.

We will use conflate to solve the problem. Conflate takes two functions:

  • A seed function that produces the zero element for the folding process that happens when the upstream is faster than the downstream. In our case the seed function is a constant function that returns 0 since there were no missed ticks at that point.
  • A fold function that is invoked when multiple upstream messages needs to be collapsed to an aggregate value due to the insufficient processing rate of the downstream. Our folding function simply increments the currently stored count of the missed ticks so far.

As a result, we have a flow of Int where the number represents the missed ticks. A number 0 means that we were able to consume the tick fast enough (i.e. zero means: 1 non-missed tick + 0 missed ticks)

val missedTicks: Flow[Tick, Int, Unit] =
  Flow[Tick].conflate(seed = (_) => 0)(
    (missedTicks, tick) => missedTicks + 1)

Create a stream processor that repeats the last element seen

Situation: Given a producer and consumer, where the rate of neither is known in advance, we want to ensure that none of them is slowing down the other by dropping earlier unconsumed elements from the upstream if necessary, and repeating the last value for the downstream if necessary.

We have two options to implement this feature. In both cases we will use DetachedStage to build our custom element (DetachedStage is specifically designed for rate translating elements just like conflate, expand or buffer). In the first version we will use a provided initial value initial that will be used to feed the downstream if no upstream element is ready yet. In the onPush() handler we just overwrite the currentValue variable and immediately relieve the upstream by calling pull() (remember, implementations of DetachedStage are not allowed to call push() as a response to onPush() or call pull() as a response of onPull()). The downstream onPull handler is very similar, we immediately relieve the downstream by emitting currentValue.

import akka.stream.stage._
class HoldWithInitial[T](initial: T) extends DetachedStage[T, T] {
  private var currentValue: T = initial

  override def onPush(elem: T, ctx: DetachedContext[T]): UpstreamDirective = {
    currentValue = elem
    ctx.pull()
  }

  override def onPull(ctx: DetachedContext[T]): DownstreamDirective = {
    ctx.push(currentValue)
  }

}

While it is relatively simple, the drawback of the first version is that it needs an arbitrary initial element which is not always possible to provide. Hence, we create a second version where the downstream might need to wait in one single case: if the very first element is not yet available.

We introduce a boolean variable waitingFirstValue to denote whether the first element has been provided or not (alternatively an Option can be used for currentValue or if the element type is a subclass of AnyRef a null can be used with the same purpose). In the downstream onPull() handler the difference from the previous version is that we call holdDownstream() if the first element is not yet available and thus blocking our downstream. The upstream onPush() handler sets waitingFirstValue to false, and after checking if holdDownstream() has been called it either releaves the upstream producer, or both the upstream producer and downstream consumer by calling pushAndPull()

import akka.stream.stage._
class HoldWithWait[T] extends DetachedStage[T, T] {
  private var currentValue: T = _
  private var waitingFirstValue = true

  override def onPush(elem: T, ctx: DetachedContext[T]): UpstreamDirective = {
    currentValue = elem
    waitingFirstValue = false
    if (ctx.isHoldingDownstream) ctx.pushAndPull(currentValue)
    else ctx.pull()
  }

  override def onPull(ctx: DetachedContext[T]): DownstreamDirective = {
    if (waitingFirstValue) ctx.holdDownstream()
    else ctx.push(currentValue)
  }

}

Globally limiting the rate of a set of streams

Situation: Given a set of independent streams that we cannot merge, we want to globally limit the aggregate throughput of the set of streams.

One possible solution uses a shared actor as the global limiter combined with mapAsync to create a reusable Flow that can be plugged into a stream to limit its rate.

As the first step we define an actor that will do the accounting for the global rate limit. The actor maintains a timer, a counter for pending permit tokens and a queue for possibly waiting participants. The actor has an open and closed state. The actor is in the open state while it has still pending permits. Whenever a request for permit arrives as a WantToPass message to the actor the number of available permits is decremented and we notify the sender that it can pass by answering with a MayPass message. If the amount of permits reaches zero, the actor transitions to the closed state. In this state requests are not immediately answered, instead the reference of the sender is added to a queue. Once the timer for replenishing the pending permits fires by sending a ReplenishTokens message, we increment the pending permits counter and send a reply to each of the waiting senders. If there are more waiting senders than permits available we will stay in the closed state.

object Limiter {
  case object WantToPass
  case object MayPass

  case object ReplenishTokens

  def props(maxAvailableTokens: Int, tokenRefreshPeriod: FiniteDuration,
            tokenRefreshAmount: Int): Props =
    Props(new Limiter(maxAvailableTokens, tokenRefreshPeriod, tokenRefreshAmount))
}

class Limiter(
  val maxAvailableTokens: Int,
  val tokenRefreshPeriod: FiniteDuration,
  val tokenRefreshAmount: Int) extends Actor {
  import Limiter._
  import context.dispatcher
  import akka.actor.Status

  private var waitQueue = immutable.Queue.empty[ActorRef]
  private var permitTokens = maxAvailableTokens
  private val replenishTimer = system.scheduler.schedule(
    initialDelay = tokenRefreshPeriod,
    interval = tokenRefreshPeriod,
    receiver = self,
    ReplenishTokens)

  override def receive: Receive = open

  val open: Receive = {
    case ReplenishTokens =>
      permitTokens = math.min(permitTokens + tokenRefreshAmount, maxAvailableTokens)
    case WantToPass =>
      permitTokens -= 1
      sender() ! MayPass
      if (permitTokens == 0) context.become(closed)
  }

  val closed: Receive = {
    case ReplenishTokens =>
      permitTokens = math.min(permitTokens + tokenRefreshAmount, maxAvailableTokens)
      releaseWaiting()
    case WantToPass =>
      waitQueue = waitQueue.enqueue(sender())
  }

  private def releaseWaiting(): Unit = {
    val (toBeReleased, remainingQueue) = waitQueue.splitAt(permitTokens)
    waitQueue = remainingQueue
    permitTokens -= toBeReleased.size
    toBeReleased foreach (_ ! MayPass)
    if (permitTokens > 0) context.become(open)
  }

  override def postStop(): Unit = {
    replenishTimer.cancel()
    waitQueue foreach (_ ! Status.Failure(new IllegalStateException("limiter stopped")))
  }
}

To create a Flow that uses this global limiter actor we use the mapAsync function with the combination of the ask pattern. We also define a timeout, so if a reply is not received during the configured maximum wait period the returned future from ask will fail, which will fail the corresponding stream as well.

def limitGlobal[T](limiter: ActorRef, maxAllowedWait: FiniteDuration): Flow[T, T, Unit] = {
  import akka.pattern.ask
  import akka.util.Timeout
  Flow[T].mapAsync(4)((element: T) => {
    import system.dispatcher
    implicit val triggerTimeout = Timeout(maxAllowedWait)
    val limiterTriggerFuture = limiter ? Limiter.WantToPass
    limiterTriggerFuture.map((_) => element)
  })

}

Note

The global actor used for limiting introduces a global bottleneck. You might want to assign a dedicated dispatcher for this actor.

Working with IO

Chunking up a stream of ByteStrings into limited size ByteStrings

Situation: Given a stream of ByteStrings we want to produce a stream of ByteStrings containing the same bytes in the same sequence, but capping the size of ByteStrings. In other words we want to slice up ByteStrings into smaller chunks if they exceed a size threshold.

This can be achieved with a single PushPullStage. The main logic of our stage is in emitChunkOrPull() which implements the following logic:

  • if the buffer is empty, we pull for more bytes
  • if the buffer is nonEmpty, we split it according to the chunkSize. This will give a next chunk that we will emit, and an empty or nonempty remaining buffer.

Both onPush() and onPull() calls emitChunkOrPull() the only difference is that the push handler also stores the incoming chunk by appending to the end of the buffer.

import akka.stream.stage._

class Chunker(val chunkSize: Int) extends PushPullStage[ByteString, ByteString] {
  private var buffer = ByteString.empty

  override def onPush(elem: ByteString, ctx: Context[ByteString]): SyncDirective = {
    buffer ++= elem
    emitChunkOrPull(ctx)
  }

  override def onPull(ctx: Context[ByteString]): SyncDirective = emitChunkOrPull(ctx)

  private def emitChunkOrPull(ctx: Context[ByteString]): SyncDirective = {
    if (buffer.isEmpty) ctx.pull()
    else {
      val (emit, nextBuffer) = buffer.splitAt(chunkSize)
      buffer = nextBuffer
      ctx.push(emit)
    }
  }

}

val chunksStream = rawBytes.transform(() => new Chunker(ChunkLimit))

Limit the number of bytes passing through a stream of ByteStrings

Situation: Given a stream of ByteStrings we want to fail the stream if more than a given maximum of bytes has been consumed.

This recipe uses a PushStage to implement the desired feature. In the only handler we override, onPush() we just update a counter and see if it gets larger than maximumBytes. If a violation happens we signal failure, otherwise we forward the chunk we have received.

import akka.stream.stage._
class ByteLimiter(val maximumBytes: Long) extends PushStage[ByteString, ByteString] {
  private var count = 0

  override def onPush(chunk: ByteString, ctx: Context[ByteString]): SyncDirective = {
    count += chunk.size
    if (count > maximumBytes) ctx.fail(new IllegalStateException("Too much bytes"))
    else ctx.push(chunk)
  }
}

val limiter = Flow[ByteString].transform(() => new ByteLimiter(SizeLimit))

Compact ByteStrings in a stream of ByteStrings

Situation: After a long stream of transformations, due to their immutable, structural sharing nature ByteStrings may refer to multiple original ByteString instances unnecessarily retaining memory. As the final step of a transformation chain we want to have clean copies that are no longer referencing the original ByteStrings.

The recipe is a simple use of map, calling the compact() method of the ByteString elements. This does copying of the underlying arrays, so this should be the last element of a long chain if used.

val compacted: Source[ByteString, Unit] = data.map(_.compact)

Injecting keep-alive messages into a stream of ByteStrings

Situation: Given a communication channel expressed as a stream of ByteStrings we want to inject keep-alive messages but only if this does not interfere with normal traffic.

All this recipe needs is the MergePreferred element which is a version of a merge that is not fair. In other words, whenever the merge can choose because multiple upstream producers have elements to produce it will always choose the preferred upstream effectively giving it an absolute priority.

val tickToKeepAlivePacket: Flow[Tick, ByteString, Unit] = Flow[Tick]
  .conflate(seed = (tick) => keepaliveMessage)((msg, newTick) => msg)

val graph = FlowGraph.closed() { implicit builder =>
  import FlowGraph.Implicits._
  val unfairMerge = builder.add(MergePreferred[ByteString](1))

  // If data is available then no keepalive is injected
  dataStream ~> unfairMerge.preferred
  ticks ~> tickToKeepAlivePacket ~> unfairMerge ~> sink
}

Contents