Dynamic stream handling

Dynamic stream handling

Controlling graph completion with KillSwitch

A KillSwitch allows the completion of graphs of FlowShape from the outside. It consists of a flow element that can be linked to a graph of FlowShape needing completion control. The KillSwitch trait allows to complete or fail the graph(s).

trait KillSwitch {
  /**
   * After calling [[KillSwitch#shutdown()]] the linked [[Graph]]s of [[FlowShape]] are completed normally.
   */
  def shutdown(): Unit
  /**
   * After calling [[KillSwitch#abort()]] the linked [[Graph]]s of [[FlowShape]] are failed.
   */
  def abort(ex: Throwable): Unit
}

After the first call to either shutdown or abort, all subsequent calls to any of these methods will be ignored. Graph completion is performed by both

  • completing its downstream
  • cancelling (in case of shutdown) or failing (in case of abort) its upstream.

A KillSwitch can control the completion of one or multiple streams, and therefore comes in two different flavours.

UniqueKillSwitch

UniqueKillSwitch allows to control the completion of one materialized Graph of FlowShape. Refer to the below for usage examples.

  • Shutdown
val countingSrc = Source(Stream.from(1)).delay(1.second, DelayOverflowStrategy.backpressure)
val lastSnk = Sink.last[Int]

val (killSwitch, last) = countingSrc
  .viaMat(KillSwitches.single)(Keep.right)
  .toMat(lastSnk)(Keep.both)
  .run()

doSomethingElse()

killSwitch.shutdown()

Await.result(last, 1.second) shouldBe 2
  • Abort
val countingSrc = Source(Stream.from(1)).delay(1.second, DelayOverflowStrategy.backpressure)
val lastSnk = Sink.last[Int]

val (killSwitch, last) = countingSrc
  .viaMat(KillSwitches.single)(Keep.right)
  .toMat(lastSnk)(Keep.both).run()

val error = new RuntimeException("boom!")
killSwitch.abort(error)

Await.result(last.failed, 1.second) shouldBe error

SharedKillSwitch

A SharedKillSwitch allows to control the completion of an arbitrary number graphs of FlowShape. It can be materialized multiple times via its flow method, and all materialized graphs linked to it are controlled by the switch. Refer to the below for usage examples.

  • Shutdown
val countingSrc = Source(Stream.from(1)).delay(1.second, DelayOverflowStrategy.backpressure)
val lastSnk = Sink.last[Int]
val sharedKillSwitch = KillSwitches.shared("my-kill-switch")

val last = countingSrc
  .via(sharedKillSwitch.flow)
  .runWith(lastSnk)

val delayedLast = countingSrc
  .delay(1.second, DelayOverflowStrategy.backpressure)
  .via(sharedKillSwitch.flow)
  .runWith(lastSnk)

doSomethingElse()

sharedKillSwitch.shutdown()

Await.result(last, 1.second) shouldBe 2
Await.result(delayedLast, 1.second) shouldBe 1
  • Abort
val countingSrc = Source(Stream.from(1)).delay(1.second)
val lastSnk = Sink.last[Int]
val sharedKillSwitch = KillSwitches.shared("my-kill-switch")

val last1 = countingSrc.via(sharedKillSwitch.flow).runWith(lastSnk)
val last2 = countingSrc.via(sharedKillSwitch.flow).runWith(lastSnk)

val error = new RuntimeException("boom!")
sharedKillSwitch.abort(error)

Await.result(last1.failed, 1.second) shouldBe error
Await.result(last2.failed, 1.second) shouldBe error

Note

A UniqueKillSwitch is always a result of a materialization, whilst SharedKillSwitch needs to be constructed before any materialization takes place.

Contents