IO (Scala)

IO (Scala)

Introduction

This documentation is in progress and some sections may be incomplete. More will be coming.

Components

ByteString

A primary goal of Akka's IO module is to only communicate between actors with immutable objects. When dealing with network IO on the jvm Array[Byte] and ByteBuffer are commonly used to represent collections of Bytes, but they are mutable. Scala's collection library also lacks a suitably efficient immutable collection for Bytes. Being able to safely and efficiently move Bytes around is very important for this IO module, so ByteString was developed.

ByteString is a Rope-like data structure that is immutable and efficient. When 2 ByteStrings are concatenated together they are both stored within the resulting ByteString instead of copying both to a new Array. Operations such as drop and take return ByteStrings that still reference the original Array, but just change the offset and length that is visible. Great care has also been taken to make sure that the internal Array cannot be modified. Whenever a potentially unsafe Array is used to create a new ByteString a defensive copy is created.

ByteString inherits all methods from IndexedSeq, and it also has some new ones. For more information, look up the akka.util.ByteString class and it's companion object in the ScalaDoc.

IO.Handle

IO.Handle is an immutable reference to a Java NIO Channel. Passing mutable Channels between Actors could lead to unsafe behavior, so instead subclasses of the IO.Handle trait are used. Currently there are 2 concrete subclasses: IO.SocketHandle (representing a SocketChannel) and IO.ServerHandle (representing a ServerSocketChannel).

IOManager

The IOManager takes care of the low level IO details. Each ActorSystem has it's own IOManager, which can be accessed calling IOManager(system: ActorSystem). Actors communicate with the IOManager with specific messages. The messages sent from an Actor to the IOManager are handled automatically when using certain methods and the messages sent from an IOManager are handled within an Actor's receive method.

Connecting to a remote host:

val address = new InetSocketAddress("remotehost", 80)
val socket = IOManager(actorSystem).connect(address)
val socket = IOManager(actorSystem).connect("remotehost", 80)

Creating a server:

val address = new InetSocketAddress("localhost", 80)
val serverSocket = IOManager(actorSystem).listen(address)
val serverSocket = IOManager(actorSystem).listen("localhost", 80)

Receiving messages from the IOManager:

def receive = {

  case IO.Listening(server, address) =>
    println("The server is listening on socket " + address)

  case IO.Connected(socket, address) =>
    println("Successfully connected to " + address)

  case IO.NewClient(server) =>
    println("New incoming connection on server")
    val socket = server.accept()
    println("Writing to new client socket")
    socket.write(bytes)
    println("Closing socket")
    socket.close()

  case IO.Read(socket, bytes) =>
    println("Received incoming data from socket")

  case IO.Closed(socket: IO.SocketHandle, cause) =>
    println("Socket has closed, cause: " + cause)

  case IO.Closed(server: IO.ServerHandle, cause) =>
    println("Server socket has closed, cause: " + cause)

}

IO.Iteratee

Included with Akka's IO module is a basic implementation of Iteratees. Iteratees are an effective way of handling a stream of data without needing to wait for all the data to arrive. This is especially useful when dealing with non blocking IO since we will usually receive data in chunks which may not include enough information to process, or it may contain much more data then we currently need.

This Iteratee implementation is much more basic then what is usually found. There is only support for ByteString input, and enumerators aren't used. The reason for this limited implementation is to reduce the amount of explicit type signatures needed and to keep things simple. It is important to note that Akka's Iteratees are completely optional, incoming data can be handled in any way, including other Iteratee libraries.

Iteratees work by processing the data that it is given and returning either the result (with any unused input) or a continuation if more input is needed. They are monadic, so methods like flatMap can be used to pass the result of an Iteratee to another.

The basic Iteratees included in the IO module can all be found in the ScalaDoc under akka.actor.IO, and some of them are covered in the example below.

Examples

Http Server

This example will create a simple high performance HTTP server. We begin with our imports:

import akka.actor._
import akka.util.{ ByteString, ByteStringBuilder }
import java.net.InetSocketAddress

Some commonly used constants:

object HttpConstants {
  val SP = ByteString(" ")
  val HT = ByteString("\t")
  val CRLF = ByteString("\r\n")
  val COLON = ByteString(":")
  val PERCENT = ByteString("%")
  val PATH = ByteString("/")
  val QUERY = ByteString("?")
}

And case classes to hold the resulting request:

case class Request(meth: String, path: List[String], query: Option[String], httpver: String, headers: List[Header], body: Option[ByteString])
case class Header(name: String, value: String)

Now for our first Iteratee. There are 3 main sections of a HTTP request: the request line, the headers, and an optional body. The main request Iteratee handles each section separately:

object HttpIteratees {
  import HttpConstants._

  def readRequest =
    for {
      requestLine  readRequestLine
      (meth, (path, query), httpver) = requestLine
      headers  readHeaders
      body  readBody(headers)
    } yield Request(meth, path, query, httpver, headers, body)

In the above code readRequest takes the results of 3 different Iteratees (readRequestLine, readHeaders, readBody) and combines them into a single Request object. readRequestLine actually returns a tuple, so we extract it's individual components. readBody depends on values contained within the header section, so we must pass those to the method.

The request line has 3 parts to it: the HTTP method, the requested URI, and the HTTP version. The parts are separated by a single space, and the entire request line ends with a CRLF.

def ascii(bytes: ByteString): String = bytes.decodeString("US-ASCII").trim

def readRequestLine =
  for {
    meth  IO takeUntil SP
    uri  readRequestURI
    _  IO takeUntil SP // ignore the rest
    httpver  IO takeUntil CRLF
  } yield (ascii(meth), uri, ascii(httpver))

Reading the request method is simple as it is a single string ending in a space. The simple Iteratee that performs this is IO.takeUntil(delimiter: ByteString): Iteratee[ByteString]. It keeps consuming input until the specified delimiter is found. Reading the HTTP version is also a simple string that ends with a CRLF.

The ascii method is a helper that takes a ByteString and parses it as a US-ASCII String.

Reading the request URI is a bit more complicated because we want to parse the individual components of the URI instead of just returning a simple string:

def readRequestURI = IO peek 1 flatMap {
  case PATH 
    for {
      path  readPath
      query  readQuery
    } yield (path, query)
  case _  sys.error("Not Implemented")
}

For this example we are only interested in handling absolute paths. To detect if we the URI is an absolute path we use IO.peek(length: Int): Iteratee[ByteString], which returns a ByteString of the request length but doesn't actually consume the input. We peek at the next bit of input and see if it matches our PATH constant (defined above as ByteString("/")). If it doesn't match we throw an error, but for a more robust solution we would want to handle other valid URIs.

Next we handle the path itself:

def readPath = {
  def step(segments: List[String]): IO.Iteratee[List[String]] = IO peek 1 flatMap {
    case PATH  IO drop 1 flatMap (_  readUriPart(pathchar) flatMap (segment  step(segment :: segments)))
    case _  segments match {
      case "" :: rest  IO Done rest.reverse
      case _           IO Done segments.reverse
    }
  }
  step(Nil)
}

The step method is a recursive method that takes a List of the accumulated path segments. It first checks if the remaining input starts with the PATH constant, and if it does, it drops that input, and returns the readUriPart Iteratee which has it's result added to the path segment accumulator and the step method is run again.

If after reading in a path segment the next input does not start with a path, we reverse the accumulated segments and return it (dropping the last segment if it is blank).

Following the path we read in the query (if it exists):

def readQuery: IO.Iteratee[Option[String]] = IO peek 1 flatMap {
  case QUERY  IO drop 1 flatMap (_  readUriPart(querychar) map (Some(_)))
  case _      IO Done None
}

It is much simpler then reading the path since we aren't doing any parsing of the query since there is no standard format of the query string.

Both the path and query used the readUriPart Iteratee, which is next:

val alpha = Set.empty ++ ('a' to 'z') ++ ('A' to 'Z') map (_.toByte)
val digit = Set.empty ++ ('0' to '9') map (_.toByte)
val hexdigit = digit ++ (Set.empty ++ ('a' to 'f') ++ ('A' to 'F') map (_.toByte))
val subdelim = Set('!', '$', '&', '\'', '(', ')', '*', '+', ',', ';', '=') map (_.toByte)
val pathchar = alpha ++ digit ++ subdelim ++ (Set(':', '@') map (_.toByte))
val querychar = pathchar ++ (Set('/', '?') map (_.toByte))

def readUriPart(allowed: Set[Byte]): IO.Iteratee[String] = for {
  str ← IO takeWhile allowed map ascii
  pchar ← IO peek 1 map (_ == PERCENT)
  all ← if (pchar) readPChar flatMap (ch ⇒ readUriPart(allowed) map (str + ch + _)) else IO Done str
} yield all

def readPChar = IO take 3 map {
  case Seq('%', rest @ _*) if rest forall hexdigit ⇒
    java.lang.Integer.parseInt(rest map (_.toChar) mkString, 16).toChar
}

Here we have several Sets that contain valid characters pulled from the URI spec. The readUriPart method takes a Set of valid characters (already mapped to Bytes) and will continue to match characters until it reaches on that is not part of the Set. If it is a percent encoded character then that is handled as a valid character and processing continues, or else we are done collecting this part of the URI.

Headers are next:

def readHeaders = {
  def step(found: List[Header]): IO.Iteratee[List[Header]] = {
    IO peek 2 flatMap {
      case CRLF  IO takeUntil CRLF flatMap (_  IO Done found)
      case _     readHeader flatMap (header  step(header :: found))
    }
  }
  step(Nil)
}

def readHeader =
  for {
    name  IO takeUntil COLON
    value  IO takeUntil CRLF flatMap readMultiLineValue
  } yield Header(ascii(name), ascii(value))

def readMultiLineValue(initial: ByteString): IO.Iteratee[ByteString] = IO peek 1 flatMap {
  case SP  IO takeUntil CRLF flatMap (bytes  readMultiLineValue(initial ++ bytes))
  case _   IO Done initial
}

And if applicable, we read in the message body:

def readBody(headers: List[Header]) =
  if (headers.exists(header  header.name == "Content-Length" || header.name == "Transfer-Encoding"))
    IO.takeAll map (Some(_))
  else
    IO Done None

Finally we get to the actual Actor:

class HttpServer(port: Int) extends Actor {

  val state = IO.IterateeRef.Map.async[IO.Handle]()(context.dispatcher)

  override def preStart {
    IOManager(context.system) listen new InetSocketAddress(port)
  }

  def receive = {

    case IO.NewClient(server) 
      val socket = server.accept()
      state(socket) flatMap (_  HttpServer.processRequest(socket))

    case IO.Read(socket, bytes) 
      state(socket)(IO Chunk bytes)

    case IO.Closed(socket, cause) 
      state(socket)(IO EOF None)
      state -= socket

  }

}

And it's companion object:

object HttpServer {
  import HttpIteratees._

  def processRequest(socket: IO.SocketHandle): IO.Iteratee[Unit] =
    IO repeat {
      for {
        request  readRequest
      } yield {
        val rsp = request match {
          case Request("GET", "ping" :: Nil, _, _, headers, _) 
            OKResponse(ByteString("<p>pong</p>"),
              request.headers.exists { case Header(n, v)  n.toLowerCase == "connection" && v.toLowerCase == "keep-alive" })
          case req 
            OKResponse(ByteString("<p>" + req.toString + "</p>"),
              request.headers.exists { case Header(n, v)  n.toLowerCase == "connection" && v.toLowerCase == "keep-alive" })
        }
        socket write OKResponse.bytes(rsp).compact
        if (!rsp.keepAlive) socket.close()
      }
    }

}

And the OKResponse:

object OKResponse {
  import HttpConstants.CRLF

  val okStatus = ByteString("HTTP/1.1 200 OK")
  val contentType = ByteString("Content-Type: text/html; charset=utf-8")
  val cacheControl = ByteString("Cache-Control: no-cache")
  val date = ByteString("Date: ")
  val server = ByteString("Server: Akka")
  val contentLength = ByteString("Content-Length: ")
  val connection = ByteString("Connection: ")
  val keepAlive = ByteString("Keep-Alive")
  val close = ByteString("Close")

  def bytes(rsp: OKResponse) = {
    new ByteStringBuilder ++=
      okStatus ++= CRLF ++=
      contentType ++= CRLF ++=
      cacheControl ++= CRLF ++=
      date ++= ByteString(new java.util.Date().toString) ++= CRLF ++=
      server ++= CRLF ++=
      contentLength ++= ByteString(rsp.body.length.toString) ++= CRLF ++=
      connection ++= (if (rsp.keepAlive) keepAlive else close) ++= CRLF ++= CRLF ++= rsp.body result
  }

}
case class OKResponse(body: ByteString, keepAlive: Boolean)

A main method to start everything up:

object Main extends App {
  val port = Option(System.getenv("PORT")) map (_.toInt) getOrElse 8080
  val system = ActorSystem()
  val server = system.actorOf(Props(new HttpServer(port)))
}

Contents