Dataflow Concurrency (Scala)

Dataflow Concurrency (Scala)

Description

Akka implements Oz-style dataflow concurrency by using a special API for Futures (Scala) that allows single assignment variables and multiple lightweight (event-based) processes/threads.

Dataflow concurrency is deterministic. This means that it will always behave the same. If you run it once and it yields output 5 then it will do that every time, run it 10 million times, same result. If it on the other hand deadlocks the first time you run it, then it will deadlock every single time you run it. Also, there is no difference between sequential code and concurrent code. These properties makes it very easy to reason about concurrency. The limitation is that the code needs to be side-effect free, e.g. deterministic. You can't use exceptions, time, random etc., but need to treat the part of your program that uses dataflow concurrency as a pure function with input and output.

The best way to learn how to program with dataflow variables is to read the fantastic book Concepts, Techniques, and Models of Computer Programming. By Peter Van Roy and Seif Haridi.

The documentation is not as complete as it should be, something we will improve shortly. For now, besides above listed resources on dataflow concurrency, I recommend you to read the documentation for the GPars implementation, which is heavily influenced by the Akka implementation:

Getting Started

Scala's Delimited Continuations plugin is required to use the Dataflow API. To enable the plugin when using sbt, your project must inherit the AutoCompilerPlugins trait and contain a bit of configuration as is seen in this example:

autoCompilerPlugins := true,
libraryDependencies <+= scalaVersion { v => compilerPlugin("org.scala-lang.plugins" % "continuations" % <scalaVersion>) },
scalacOptions += "-P:continuations:enable",

Dataflow Variables

Dataflow Variable defines four different operations:

  1. Define a Dataflow Variable
val x = Promise[Int]()
  1. Wait for Dataflow Variable to be bound (must be contained within a Future.flow block as described in the next section)
x()
  1. Bind Dataflow Variable (must be contained within a Future.flow block as described in the next section)
x << 3
  1. Bind Dataflow Variable with a Future (must be contained within a Future.flow block as described in the next section)
x << y

A Dataflow Variable can only be bound once. Subsequent attempts to bind the variable will be ignored.

Dataflow Delimiter

Dataflow is implemented in Akka using Scala's Delimited Continuations. To use the Dataflow API the code must be contained within a Future.flow block. For example:

import Future.flow
implicit val dispatcher = ...

val a = Future( ... )
val b = Future( ... )
val c = Promise[Int]()

flow {
  c << (a() + b())
}

val result = Await.result(c, timeout)

The flow method also returns a Future for the result of the contained expression, so the previous example could also be written like this:

import Future.flow
implicit val dispatcher = ...

val a = Future( ... )
val b = Future( ... )

val c = flow {
  a() + b()
}

val result = Await.result(c, timeout)

Examples

Most of these examples are taken from the Oz wikipedia page

To run these examples:

  1. Start REPL
$ sbt
> project akka-actor
> console
Welcome to Scala version 2.9.1 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_25).
Type in expressions to have them evaluated.
Type :help for more information.

scala>

2. Paste the examples (below) into the Scala REPL. Note: Do not try to run the Oz version, it is only there for reference.

  1. Have fun.

Simple DataFlowVariable example

This example is from Oz wikipedia page: http://en.wikipedia.org/wiki/Oz_(programming_language). Sort of the "Hello World" of dataflow concurrency.

Example in Oz:

thread
  Z = X+Y     % will wait until both X and Y are bound to a value.
  {Browse Z}  % shows the value of Z.
end
thread X = 40 end
thread Y = 2 end

Example in Akka:

import akka.dispatch._
import Future.flow
implicit val dispatcher = ...

val x, y, z = Promise[Int]()

flow {
  z << x() + y()
  println("z = " + z())
}
flow { x << 40 }
flow { y << 2 }

Example of using DataFlowVariable with recursion

Using DataFlowVariable and recursion to calculate sum.

Example in Oz:

fun {Ints N Max}
  if N == Max then nil
  else
    {Delay 1000}
    N|{Ints N+1 Max}
  end
end

fun {Sum S Stream}
  case Stream of nil then S
  [] H|T then S|{Sum H+S T} end
end

local X Y in
  thread X = {Ints 0 1000} end
  thread Y = {Sum 0 X} end
  {Browse Y}
end

Example in Akka:

import akka.dispatch._
import Future.flow
implicit val dispatcher = ...

def ints(n: Int, max: Int): List[Int] = {
  if (n == max) Nil
  else n :: ints(n + 1, max)
}

def sum(s: Int, stream: List[Int]): List[Int] = stream match {
  case Nil => s :: Nil
  case h :: t => s :: sum(h + s, t)
}

val x, y = Promise[List[Int]]()

flow { x << ints(0, 1000) }
flow { y << sum(0, x()) }
flow { println("List of sums: " + y()) }

Example using concurrent Futures

Shows how to have a calculation run in another thread.

Example in Akka:

import akka.dispatch._
import Future.flow
implicit val dispatcher = ...

// create four 'Int' data flow variables
val x, y, z, v = Promise[Int]()

flow {
  println("Thread 'main'")

  x << 1
  println("'x' set to: " + x())

  println("Waiting for 'y' to be set...")

  if (x() > y()) {
    z << x
    println("'z' set to 'x': " + z())
  } else {
    z << y
    println("'z' set to 'y': " + z())
  }
}

flow {
  y << Future {
    println("Thread 'setY', sleeping")
    Thread.sleep(2000)
    2
  }
  println("'y' set to: " + y())
}

flow {
  println("Thread 'setV'")
  v << y
  println("'v' set to 'y': " + v())
}

Contents