IO (Scala)
Loading

IO (Scala)

Introduction

This documentation is in progress and some sections may be incomplete. More will be coming.

Components

ByteString

A primary goal of Akka's IO support is to only communicate between actors with immutable objects. When dealing with network IO on the jvm Array[Byte] and ByteBuffer are commonly used to represent collections of Bytes, but they are mutable. Scala's collection library also lacks a suitably efficient immutable collection for Bytes. Being able to safely and efficiently move Bytes around is very important for this IO support, so ByteString was developed.

ByteString is a Rope-like data structure that is immutable and efficient. When 2 ByteStrings are concatenated together they are both stored within the resulting ByteString instead of copying both to a new Array. Operations such as drop and take return ByteStrings that still reference the original Array, but just change the offset and length that is visible. Great care has also been taken to make sure that the internal Array cannot be modified. Whenever a potentially unsafe Array is used to create a new ByteString a defensive copy is created. If you require a ByteString that only blocks a much memory as necessary for it's content, use the compact method to get a CompactByteString instance. If the ByteString represented only a slice of the original array, this will result in copying all bytes in that slice.

ByteString inherits all methods from IndexedSeq, and it also has some new ones. For more information, look up the akka.util.ByteString class and it's companion object in the ScalaDoc.

ByteString also comes with it's own optimized builder and iterator classes ByteStringBuilder and ByteIterator which provides special features in addition to the standard builder / iterator methods:

Compatibility with java.io

A ByteStringBuilder can be wrapped in a java.io.OutputStream via the asOutputStream method. Likewise, ByteIterator can we wrapped in a java.io.InputStream via asInputStream. Using these, akka.io applications can integrate legacy code based on java.io streams.

Encoding and decoding of binary data

ByteStringBuilder and ByteIterator support encoding and decoding of binary data. As an example, consider a stream of binary data frames with the following format:

frameLen: Int
n: Int
m: Int
n times {
  a: Short
  b: Long
}
data: m times Double

In this example, the data is to be stored in arrays of a, b and data.

Decoding of such frames can be efficiently implemented in the following fashion:

implicit val byteOrder = java.nio.ByteOrder.BIG_ENDIAN

val FrameDecoder = for {
  frameLenBytes  IO.take(4)
  frameLen = frameLenBytes.iterator.getInt
  frame  IO.take(frameLen)
} yield {
  val in = frame.iterator

  val n = in.getInt
  val m = in.getInt

  val a = Array.newBuilder[Short]
  val b = Array.newBuilder[Long]

  for (i  1 to n) {
    a += in.getShort
    b += in.getInt
  }

  val data = Array.ofDim[Double](m)
  in.getDoubles(data)

  (a.result, b.result, data)
}

This implementation naturally follows the example data format. In a true Scala application, one might, of course, want use specialized immutable Short/Long/Double containers instead of mutable Arrays.

After extracting data from a ByteIterator, the remaining content can also be turned back into a ByteString using the toSeq method

val n = in.getInt
val m = in.getInt
// ... in.get...
val rest: ByteString = in.toSeq

with no copying from bytes to rest involved. In general, conversions from ByteString to ByteIterator and vice versa are O(1) for non-chunked ByteStrings and (at worst) O(nChunks) for chunked ByteStrings.

Encoding of data also is very natural, using ByteStringBuilder

implicit val byteOrder = java.nio.ByteOrder.BIG_ENDIAN

val a: Array[Short]
val b: Array[Long]
val data: Array[Double]

val frameBuilder = ByteString.newBuilder

val n = a.length
val m = data.length

frameBuilder.putInt(n)
frameBuilder.putInt(m)

for (i  0 to n - 1) {
  frameBuilder.putShort(a(i))
  frameBuilder.putLong(b(i))
}
frameBuilder.putDoubles(data)
val frame = frameBuilder.result()

The encoded data then can be sent over socket (see IOManager):

val socket: IO.SocketHandle
socket.write(ByteString.newBuilder.putInt(frame.length).result)
socket.write(frame)

IO.Handle

IO.Handle is an immutable reference to a Java NIO Channel. Passing mutable Channels between Actors could lead to unsafe behavior, so instead subclasses of the IO.Handle trait are used. Currently there are 2 concrete subclasses: IO.SocketHandle (representing a SocketChannel) and IO.ServerHandle (representing a ServerSocketChannel).

IOManager

The IOManager takes care of the low level IO details. Each ActorSystem has it's own IOManager, which can be accessed calling IOManager(system: ActorSystem). Actors communicate with the IOManager with specific messages. The messages sent from an Actor to the IOManager are handled automatically when using certain methods and the messages sent from an IOManager are handled within an Actor's receive method.

Connecting to a remote host:

val address = new InetSocketAddress("remotehost", 80)
val socket = IOManager(actorSystem).connect(address)
val socket = IOManager(actorSystem).connect("remotehost", 80)

Creating a server:

val address = new InetSocketAddress("localhost", 80)
val serverSocket = IOManager(actorSystem).listen(address)
val serverSocket = IOManager(actorSystem).listen("localhost", 80)

Receiving messages from the IOManager:

def receive = {

  case IO.Listening(server, address) =>
    println("The server is listening on socket " + address)

  case IO.Connected(socket, address) =>
    println("Successfully connected to " + address)

  case IO.NewClient(server) =>
    println("New incoming connection on server")
    val socket = server.accept()
    println("Writing to new client socket")
    socket.write(bytes)
    println("Closing socket")
    socket.close()

  case IO.Read(socket, bytes) =>
    println("Received incoming data from socket")

  case IO.Closed(socket: IO.SocketHandle, cause) =>
    println("Socket has closed, cause: " + cause)

  case IO.Closed(server: IO.ServerHandle, cause) =>
    println("Server socket has closed, cause: " + cause)

}

IO.Iteratee

Included with Akka's IO support is a basic implementation of Iteratees. Iteratees are an effective way of handling a stream of data without needing to wait for all the data to arrive. This is especially useful when dealing with non blocking IO since we will usually receive data in chunks which may not include enough information to process, or it may contain much more data than we currently need.

This Iteratee implementation is much more basic than what is usually found. There is only support for ByteString input, and enumerators aren't used. The reason for this limited implementation is to reduce the amount of explicit type signatures needed and to keep things simple. It is important to note that Akka's Iteratees are completely optional, incoming data can be handled in any way, including other Iteratee libraries.

Iteratees work by processing the data that it is given and returning either the result (with any unused input) or a continuation if more input is needed. They are monadic, so methods like flatMap can be used to pass the result of an Iteratee to another.

The basic Iteratees included in the IO support can all be found in the ScalaDoc under akka.actor.IO, and some of them are covered in the example below.

Examples

Http Server

This example will create a simple high performance HTTP server. We begin with our imports:

import akka.actor._
import akka.util.{ ByteString, ByteStringBuilder }
import java.net.InetSocketAddress

Some commonly used constants:

object HttpConstants {
  val SP = ByteString(" ")
  val HT = ByteString("\t")
  val CRLF = ByteString("\r\n")
  val COLON = ByteString(":")
  val PERCENT = ByteString("%")
  val PATH = ByteString("/")
  val QUERY = ByteString("?")
}

And case classes to hold the resulting request:

case class Request(meth: String, path: List[String], query: Option[String],
                   httpver: String, headers: List[Header], body: Option[ByteString])
case class Header(name: String, value: String)

Now for our first Iteratee. There are 3 main sections of a HTTP request: the request line, the headers, and an optional body. The main request Iteratee handles each section separately:

object HttpIteratees {
  import HttpConstants._

  def readRequest =
    for {
      requestLine  readRequestLine
      (meth, (path, query), httpver) = requestLine
      headers  readHeaders
      body  readBody(headers)
    } yield Request(meth, path, query, httpver, headers, body)

In the above code readRequest takes the results of 3 different Iteratees (readRequestLine, readHeaders, readBody) and combines them into a single Request object. readRequestLine actually returns a tuple, so we extract it's individual components. readBody depends on values contained within the header section, so we must pass those to the method.

The request line has 3 parts to it: the HTTP method, the requested URI, and the HTTP version. The parts are separated by a single space, and the entire request line ends with a CRLF.

def ascii(bytes: ByteString): String = bytes.decodeString("US-ASCII").trim

def readRequestLine =
  for {
    meth  IO takeUntil SP
    uri  readRequestURI
    _  IO takeUntil SP // ignore the rest
    httpver  IO takeUntil CRLF
  } yield (ascii(meth), uri, ascii(httpver))

Reading the request method is simple as it is a single string ending in a space. The simple Iteratee that performs this is IO.takeUntil(delimiter: ByteString): Iteratee[ByteString]. It keeps consuming input until the specified delimiter is found. Reading the HTTP version is also a simple string that ends with a CRLF.

The ascii method is a helper that takes a ByteString and parses it as a US-ASCII String.

Reading the request URI is a bit more complicated because we want to parse the individual components of the URI instead of just returning a simple string:

def readRequestURI = IO peek 1 flatMap {
  case PATH 
    for {
      path  readPath
      query  readQuery
    } yield (path, query)
  case _  sys.error("Not Implemented")
}

For this example we are only interested in handling absolute paths. To detect if we the URI is an absolute path we use IO.peek(length: Int): Iteratee[ByteString], which returns a ByteString of the request length but doesn't actually consume the input. We peek at the next bit of input and see if it matches our PATH constant (defined above as ByteString("/")). If it doesn't match we throw an error, but for a more robust solution we would want to handle other valid URIs.

Next we handle the path itself:

def readPath = {
  def step(segments: List[String]): IO.Iteratee[List[String]] =
    IO peek 1 flatMap {
      case PATH  IO drop 1 flatMap (_  readUriPart(pathchar) flatMap (
        segment  step(segment :: segments)))
      case _  segments match {
        case "" :: rest  IO Done rest.reverse
        case _           IO Done segments.reverse
      }
    }
  step(Nil)
}

The step method is a recursive method that takes a List of the accumulated path segments. It first checks if the remaining input starts with the PATH constant, and if it does, it drops that input, and returns the readUriPart Iteratee which has it's result added to the path segment accumulator and the step method is run again.

If after reading in a path segment the next input does not start with a path, we reverse the accumulated segments and return it (dropping the last segment if it is blank).

Following the path we read in the query (if it exists):

def readQuery: IO.Iteratee[Option[String]] = IO peek 1 flatMap {
  case QUERY  IO drop 1 flatMap (_  readUriPart(querychar) map (Some(_)))
  case _      IO Done None
}

It is much simpler than reading the path since we aren't doing any parsing of the query since there is no standard format of the query string.

Both the path and query used the readUriPart Iteratee, which is next:

val alpha = Set.empty ++ ('a' to 'z') ++ ('A' to 'Z') map (_.toByte)
val digit = Set.empty ++ ('0' to '9') map (_.toByte)
val hexdigit = digit ++ (Set.empty ++ ('a' to 'f') ++ ('A' to 'F') map (_.toByte))
val subdelim = Set('!', '$', '&', '\'', '(', ')', '*', '+', ',', ';', '=') map
  (_.toByte)
val pathchar = alpha ++ digit ++ subdelim ++ (Set(':', '@') map (_.toByte))
val querychar = pathchar ++ (Set('/', '?') map (_.toByte))

def readUriPart(allowed: Set[Byte]): IO.Iteratee[String] = for {
  str ← IO takeWhile allowed map ascii
  pchar ← IO peek 1 map (_ == PERCENT)
  all ← if (pchar) readPChar flatMap (ch ⇒ readUriPart(allowed) map
    (str + ch + _))
  else IO Done str
} yield all

def readPChar = IO take 3 map {
  case Seq('%', rest @ _*) if rest forall hexdigit ⇒
    java.lang.Integer.parseInt(rest map (_.toChar) mkString, 16).toChar
}

Here we have several Sets that contain valid characters pulled from the URI spec. The readUriPart method takes a Set of valid characters (already mapped to Bytes) and will continue to match characters until it reaches on that is not part of the Set. If it is a percent encoded character then that is handled as a valid character and processing continues, or else we are done collecting this part of the URI.

Headers are next:

def readHeaders = {
  def step(found: List[Header]): IO.Iteratee[List[Header]] = {
    IO peek 2 flatMap {
      case CRLF  IO takeUntil CRLF flatMap (_  IO Done found)
      case _     readHeader flatMap (header  step(header :: found))
    }
  }
  step(Nil)
}

def readHeader =
  for {
    name  IO takeUntil COLON
    value  IO takeUntil CRLF flatMap readMultiLineValue
  } yield Header(ascii(name), ascii(value))

def readMultiLineValue(initial: ByteString): IO.Iteratee[ByteString] =
  IO peek 1 flatMap {
    case SP  IO takeUntil CRLF flatMap (
      bytes  readMultiLineValue(initial ++ bytes))
    case _  IO Done initial
  }

And if applicable, we read in the message body:

def readBody(headers: List[Header]) =
  if (headers.exists(header  header.name == "Content-Length" ||
    header.name == "Transfer-Encoding"))
    IO.takeAll map (Some(_))
  else
    IO Done None

Finally we get to the actual Actor:

class HttpServer(port: Int) extends Actor {

  val state = IO.IterateeRef.Map.async[IO.Handle]()(context.dispatcher)

  override def preStart {
    IOManager(context.system) listen new InetSocketAddress(port)
  }

  def receive = {

    case IO.NewClient(server) 
      val socket = server.accept()
      state(socket) flatMap (_  HttpServer.processRequest(socket))

    case IO.Read(socket, bytes) 
      state(socket)(IO Chunk bytes)

    case IO.Closed(socket, cause) 
      state(socket)(IO EOF)
      state -= socket

  }

}

And it's companion object:

object HttpServer {
  import HttpIteratees._

  def processRequest(socket: IO.SocketHandle): IO.Iteratee[Unit] =
    IO repeat {
      for {
        request  readRequest
      } yield {
        val rsp = request match {
          case Request("GET", "ping" :: Nil, _, _, headers, _) 
            OKResponse(ByteString("<p>pong</p>"),
              request.headers.exists {
                case Header(n, v) 
                  n.toLowerCase == "connection" && v.toLowerCase == "keep-alive"
              })
          case req 
            OKResponse(ByteString("<p>" + req.toString + "</p>"),
              request.headers.exists {
                case Header(n, v) 
                  n.toLowerCase == "connection" && v.toLowerCase == "keep-alive"
              })
        }
        socket write OKResponse.bytes(rsp).compact
        if (!rsp.keepAlive) socket.close()
      }
    }

}

And the OKResponse:

object OKResponse {
  import HttpConstants.CRLF

  val okStatus = ByteString("HTTP/1.1 200 OK")
  val contentType = ByteString("Content-Type: text/html; charset=utf-8")
  val cacheControl = ByteString("Cache-Control: no-cache")
  val date = ByteString("Date: ")
  val server = ByteString("Server: Akka")
  val contentLength = ByteString("Content-Length: ")
  val connection = ByteString("Connection: ")
  val keepAlive = ByteString("Keep-Alive")
  val close = ByteString("Close")

  def bytes(rsp: OKResponse) = {
    new ByteStringBuilder ++=
      okStatus ++= CRLF ++=
      contentType ++= CRLF ++=
      cacheControl ++= CRLF ++=
      date ++= ByteString(new java.util.Date().toString) ++= CRLF ++=
      server ++= CRLF ++=
      contentLength ++= ByteString(rsp.body.length.toString) ++= CRLF ++=
      connection ++= (if (rsp.keepAlive) keepAlive else close) ++= CRLF ++=
      CRLF ++= rsp.body result
  }

}
case class OKResponse(body: ByteString, keepAlive: Boolean)

A main method to start everything up:

object Main extends App {
  val port = Option(System.getenv("PORT")) map (_.toInt) getOrElse 8080
  val system = ActorSystem()
  val server = system.actorOf(Props(new HttpServer(port)))
}

Contents