HTTP

HTTP

Module stability: SOLID

When using Akkas embedded servlet container

Akka supports the JSR for REST called JAX-RS (JSR-311). It allows you to create interaction with your actors through HTTP + REST

You can deploy your REST services directly into the Akka kernel. All you have to do is to drop the JAR with your application containing the REST services into the ‘$AKKA_HOME/deploy’ directory and specify in your akka.conf what resource packages to scan for (more on that below) and optionally define a “boot class” (if you need to create any actors or do any config). WAR deployment is coming soon.

Boot configuration class

The boot class is needed for Akka to bootstrap the application and should contain the initial supervisor configuration of any actors in the module.

The boot class should be a regular POJO with a default constructor in which the initial configuration is done. The boot class then needs to be defined in the ‘$AKKA_HOME/config/akka.conf’ config file like this:

akka {
  boot = ["sample.java.Boot", "sample.scala.Boot"]   # FQN to the class doing initial actor
                                                     # supervisor bootstrap, should be defined in default constructor
  ...
}

After you’ve placed your service-jar into the $AKKA_HOME/deploy directory, you’ll need to tell Akka where to look for your services, and you do that by specifying what packages you want Akka to scan for services, and that’s done in akka.conf in the http-section:

akka {
  http {
    ...
    resource-packages = ["com.bar","com.foo.bar"] # List with all resource packages for your Jersey services
    ...
}

When deploying in another servlet container:

If you deploy Akka in another JEE container, don’t forget to create an Akka initialization and cleanup hook:

package com.my //<--- your own package
import akka.util.AkkaLoader
import akka.remote.BootableRemoteActorService
import akka.actor.BootableActorLoaderService
import javax.servlet.{ServletContextListener, ServletContextEvent}

 /**
  * This class can be added to web.xml mappings as a listener to start and postStop Akka.
  *<web-app>
  * ...
  *  <listener>
  *    <listener-class>com.my.Initializer</listener-class>
  *  </listener>
  * ...
  *</web-app>
  */
class Initializer extends ServletContextListener {
   lazy val loader = new AkkaLoader
   def contextDestroyed(e: ServletContextEvent): Unit = loader.shutdown
   def contextInitialized(e: ServletContextEvent): Unit =
     loader.boot(true, new BootableActorLoaderService with BootableRemoteActorService) //<--- Important
//     loader.boot(true, new BootableActorLoaderService {}) // If you don't need akka-remote
 }

For Java users, it’s currently only possible to use BootableActorLoaderService, but you’ll need to use: akka.actor.DefaultBootableActorLoaderService

Then you just declare it in your web.xml:

<web-app>
...
  <listener>
    <listener-class>your.package.Initializer</listener-class>
  </listener>
...
</web-app>

Also, you need to map the servlet that will handle your Jersey/JAX-RS calls, you use Jerseys ServletContainer servlet.

<web-app>
...
  <servlet>
    <servlet-name>Akka</servlet-name>
    <servlet-class>com.sun.jersey.spi.container.servlet.ServletContainer</servlet-class>
    <!-- And you want to configure your services -->
    <init-param>
      <param-name>com.sun.jersey.config.property.resourceConfigClass</param-name>
      <param-value>com.sun.jersey.api.core.PackagesResourceConfig</param-value>
    </init-param>
    <init-param>
       <param-name>com.sun.jersey.config.property.packages</param-name>
       <param-value>your.resource.package.here;and.another.here;and.so.on</param-value>
    </init-param>
  </servlet>
  <servlet-mapping>
    <url-pattern>*</url-pattern>
    <servlet-name>Akka</servlet-name>
  </servlet-mapping>
...
</web-app>

Adapting your own Akka Initializer for the Servlet Container

If you want to use akka-camel or any other modules that have their own “Bootable“‘s you’ll need to write your own Initializer, which is _ultra_ simple, see below for an example on how to include Akka-camel.

package com.my //<--- your own package
import akka.remote.BootableRemoteActorService
import akka.actor.BootableActorLoaderService
import akka.camel.CamelService
import javax.servlet.{ServletContextListener, ServletContextEvent}

 /**
  * This class can be added to web.xml mappings as a listener to start and postStop Akka.
  *<web-app>
  * ...
  *  <listener>
  *    <listener-class>com.my.Initializer</listener-class>
  *  </listener>
  * ...
  *</web-app>
  */
class Initializer extends ServletContextListener {
   lazy val loader = new AkkaLoader
   def contextDestroyed(e: ServletContextEvent): Unit = loader.shutdown
   def contextInitialized(e: ServletContextEvent): Unit =
     loader.boot(true, new BootableActorLoaderService with BootableRemoteActorService with CamelService) //<--- Important
 }

Using Akka with the Pinky REST/MVC framework

Pinky has a slick Akka integration. Read more here

jetty-run in SBT

If you want to use jetty-run in SBT you need to exclude the version of Jetty that is bundled in akka-http:

override def ivyXML =
  <dependencies>
    <dependency org="se.scalablesolutions.akka" name="akka-http" rev="AKKA_VERSION_GOES_HERE">
      <exclude module="jetty"/>
    </dependency>
  </dependencies>

Mist - Lightweight Asynchronous HTTP

The Mist layer was developed to provide a direct connection between the servlet container and Akka actors with the goal of handling the incoming HTTP request as quickly as possible in an asynchronous manner. The motivation came from the simple desire to treat REST calls as completable futures, that is, effectively passing the request along an actor message chain to be resumed at the earliest possible time. The primary constraint was to not block any existing threads and secondarily, not create additional ones. Mist is very simple and works both with Jetty Continuations as well as with Servlet API 3.0 (tested using Jetty-8.0.0.M1). When the servlet handles a request, a message is created typed to represent the method (e.g. Get, Post, etc.), the request is suspended and the message is sent (fire-and-forget) to the root endpoint actor. That’s it. There are no POJOs required to host the service endpoints and the request is treated as any other. The message can be resumed (completed) using a number of helper methods that set the proper HTTP response status code.

Complete runnable example can be found here: https://github.com/buka/akka-mist-sample

Endpoints

Endpoints are actors that handle request messages. Minimally there must be an instance of the RootEndpoint and then at least one more (to implement your services).

Preparations

In order to use Mist you have to register the MistServlet in web.xml or do the analogous for the embedded server if running in Akka Microkernel:

<servlet>
  <servlet-name>akkaMistServlet</servlet-name>
  <servlet-class>akka.http.AkkaMistServlet</servlet-class>
  <!-- <async-supported>true</async-supported> Enable this for Servlet 3.0 support -->
</servlet>

<servlet-mapping>
  <servlet-name>akkaMistServlet</servlet-name>
  <url-pattern>/*</url-pattern>
</servlet-mapping>

Then you also have to add the following dependencies to your SBT build definition:

val jettyWebapp = "org.eclipse.jetty" % "jetty-webapp" % "8.0.0.M2" % "test"
val javaxServlet30 = "org.mortbay.jetty" % "servlet-api" % "3.0.20100224" % "provided"

Attention: You have to use SBT 0.7.5.RC0 or higher in order to be able to work with that Jetty version.

An Example

Startup

In this example, we’ll use the built-in RootEndpoint class and implement our own service from that. Here the services are started in the boot loader and attached to the top level supervisor.

class Boot {
  val factory = SupervisorFactory(
    SupervisorConfig(
      OneForOneStrategy(List(classOf[Exception]), 3, 100),
        //
        // in this particular case, just boot the built-in default root endpoint
        //
      Supervise(
        actorOf[RootEndpoint],
        Permanent) ::
      Supervise(
        actorOf[SimpleAkkaAsyncHttpService],
        Permanent)
      :: Nil))
  factory.newInstance.start
}

Defining the Endpoint The service is an actor that mixes in the Endpoint trait. Here the dispatcher is taken from the Akka configuration file which allows for custom tuning of these actors, though naturally, any dispatcher can be used.

URI Handling

Rather than use traditional annotations to pair HTTP request and class methods, Mist uses hook and provide functions. This offers a great deal of flexibility in how a given endpoint responds to a URI. A hook function is simply a filter, returning a Boolean to indicate whether or not the endpoint will handle the URI. This can be as simple as a straight match or as fancy as you need. If a hook for a given URI returns true, the matching provide function is called to obtain an actor to which the message can be delivered. Notice in the example below, in one case, the same actor is returned and in the other, a new actor is created and returned. Note that URI hooking is non-exclusive and a message can be delivered to multiple actors (see next example).

Plumbing

Hook and provider functions are attached to a parent endpoint, in this case the root, by sending it the Endpoint.Attach message. Finally, bind the handleHttpRequest function of the Endpoint trait to the actor’s receive function and we’re done.

class SimpleAkkaAsyncHttpService extends Actor with Endpoint {
  final val ServiceRoot = "/simple/"
  final val ProvideSameActor = ServiceRoot + "same"
  final val ProvideNewActor = ServiceRoot + "new"

    //
    // use the configurable dispatcher
    //
  self.dispatcher = Endpoint.Dispatcher

    //
    // there are different ways of doing this - in this case, we'll use a single hook function
    //  and discriminate in the provider; alternatively we can pair hooks & providers
    //
  def hook(uri: String): Boolean = ((uri == ProvideSameActor) || (uri == ProvideNewActor))
  def provide(uri: String): ActorRef = {
    if (uri == ProvideSameActor) same
    else actorOf[BoringActor].start()
  }

    //
    // this is where you want attach your endpoint hooks
    //
  override def preStart() = {
      //
      // we expect there to be one root and that it's already been started up
      // obviously there are plenty of other ways to obtaining this actor
      //  the point is that we need to attach something (for starters anyway)
      //  to the root
      //
      val root = Actor.registry.actorsFor(classOf[RootEndpoint]).head
      root ! Endpoint.Attach(hook, provide)
    }

    //
    // since this actor isn't doing anything else (i.e. not handling other messages)
    //  just assign the receive func like so...
    // otherwise you could do something like:
    //  def myrecv = {...}
    //  def receive = myrecv orElse _recv
    //
  def receive = handleHttpRequest

  //
  // this will be our "same" actor provided with ProvideSameActor endpoint is hit
  //
  lazy val same = actorOf[BoringActor].start()
}

Handling requests

Messages are handled just as any other that are received by your actor. The servlet requests and response are not hidden and can be accessed directly as shown below.

/**
 * Define a service handler to respond to some HTTP requests
 */
class BoringActor extends Actor {
  import java.util.Date
  import javax.ws.rs.core.MediaType

  var gets = 0
  var posts = 0
  var lastget: Option[Date] = None
  var lastpost: Option[Date] = None

  def receive = {
    // handle a get request
    case get: Get =>
      // the content type of the response.
      // similar to @Produces annotation
      get.response.setContentType(MediaType.TEXT_HTML)

      //
      // "work"
      //
      gets += 1
      lastget = Some(new Date)

      //
      // respond
      //
      val res = "<p>Gets: "+gets+" Posts: "+posts+"</p><p>Last Get: "+lastget.getOrElse("Never").toString+" Last Post: "+lastpost.getOrElse("Never").toString+"</p>"
      get.OK(res)

    // handle a post request
    case post:Post =>
      // the expected content type of the request
      // similar to @Consumes
      if (post.request.getContentType startsWith MediaType.APPLICATION_FORM_URLENCODED) {
        // the content type of the response.
        // similar to @Produces annotation
        post.response.setContentType(MediaType.TEXT_HTML)

        // "work"
        posts += 1
        lastpost = Some(new Date)

        // respond
        val res = "<p>Gets: "+gets+" Posts: "+posts+"</p><p>Last Get: "+lastget.getOrElse("Never").toString+" Last Post: "+lastpost.getOrElse("Never").toString+"</p>"
        post.OK(res)
      } else {
        post.UnsupportedMediaType("Content-Type request header missing or incorrect (was '" + post.request.getContentType + "' should be '" + MediaType.APPLICATION_FORM_URLENCODED + "')")
      }
    }

    case other: RequestMethod =>
      other.NotAllowed("Invalid method for this endpoint")
  }
}

Timeouts Messages will expire according to the default timeout (specified in akka.conf). Individual messages can also be updated using the timeout method. One thing that may seem unexpected is that when an expired request returns to the caller, it will have a status code of OK (200). Mist will add an HTTP header to such responses to help clients, if applicable. By default, the header will be named “Async-Timeout” with a value of “expired” - both of which are configurable.

Another Example - multiplexing handlers

As noted above, hook functions are non-exclusive. This means multiple actors can handle the same request if desired. In this next example, the hook functions are identical (yes, the same one could have been reused) and new instances of both A and B actors will be created to handle the Post. A third mediator is inserted to coordinate the results of these actions and respond to the caller.

package sample.mist

import akka.actor._
import akka.actor.Actor._
import akka.http._

import javax.servlet.http.HttpServletResponse

class InterestingService extends Actor with Endpoint {
  final val ServiceRoot = "/interesting/"
  final val Multi = ServiceRoot + "multi/"
  // use the configurable dispatcher
  self.dispatcher = Endpoint.Dispatcher

  //
  // The "multi" endpoint shows forking off multiple actions per request
  // It is triggered by POSTing to http://localhost:9998/interesting/multi/{foo}
  //  Try with/without a header named "Test-Token"
  //  Try with/without a form parameter named "Data"
  def hookMultiActionA(uri: String): Boolean = uri startsWith Multi
  def provideMultiActionA(uri: String): ActorRef = actorOf(new ActionAActor(complete)).start()

  def hookMultiActionB(uri: String): Boolean = uri startsWith Multi
  def provideMultiActionB(uri: String): ActorRef = actorOf(new ActionBActor(complete)).start()

    //
    // this is where you want attach your endpoint hooks
    //
  override def preStart() = {
    //
    // we expect there to be one root and that it's already been started up
    // obviously there are plenty of other ways to obtaining this actor
    //  the point is that we need to attach something (for starters anyway)
    //  to the root
    //
    val root = Actor.registry.actorsFor(classOf[RootEndpoint]).head
    root ! Endpoint.Attach(hookMultiActionA, provideMultiActionA)
    root ! Endpoint.Attach(hookMultiActionB, provideMultiActionB)
  }

  //
  // since this actor isn't doing anything else (i.e. not handling other messages)
  //  just assign the receive func like so...
  // otherwise you could do something like:
  //  def myrecv = {...}
  //  def receive = myrecv orElse handleHttpRequest
  //
  def receive = handleHttpRequest

  //
  // this guy completes requests after other actions have occurred
  //
  lazy val complete = actorOf[ActionCompleteActor].start()
}

class ActionAActor(complete:ActorRef) extends Actor {
  import javax.ws.rs.core.MediaType

  def receive = {
    // handle a post request
    case post: Post =>
      // the expected content type of the request
      // similar to @Consumes
      if (post.request.getContentType startsWith MediaType.APPLICATION_FORM_URLENCODED) {
        // the content type of the response.
        // similar to @Produces annotation
        post.response.setContentType(MediaType.TEXT_HTML)

        // get the resource name
        val name = post.request.getRequestURI.substring("/interesting/multi/".length)
        if (name.length % 2 == 0) post.response.getWriter.write("<p>Action A verified request.</p>")
        else post.response.getWriter.write("<p>Action A could not verify request.</p>")

        // notify the next actor to coordinate the response
        complete ! post
      } else post.UnsupportedMediaType("Content-Type request header missing or incorrect (was '" + post.request.getContentType + "' should be '" + MediaType.APPLICATION_FORM_URLENCODED + "')")
    }
  }
}

class ActionBActor(complete:ActorRef) extends Actor {
  import javax.ws.rs.core.MediaType

  def receive = {
    // handle a post request
    case post: Post =>
      // the expected content type of the request
      // similar to @Consumes
      if (post.request.getContentType startsWith MediaType.APPLICATION_FORM_URLENCODED) {
        // pull some headers and form params
        def default(any: Any): String = ""

        val token = post.getHeaderOrElse("Test-Token", default)
        val data = post.getParameterOrElse("Data", default)

        val (resp, status) = (token, data) match {
          case ("", _) => ("No token provided", HttpServletResponse.SC_FORBIDDEN)
          case (_, "") => ("No data", HttpServletResponse.SC_ACCEPTED)
          case _ => ("Data accepted", HttpServletResponse.SC_OK)
        }

        // update the response body
        post.response.getWriter.write(resp)

        // notify the next actor to coordinate the response
        complete ! (post, status)
      } else post.UnsupportedMediaType("Content-Type request header missing or incorrect (was '" + post.request.getContentType + "' should be '" + MediaType.APPLICATION_FORM_URLENCODED + "')")
    }

    case other: RequestMethod =>
      other.NotAllowed("Invalid method for this endpoint")
  }
}

class ActionCompleteActor extends Actor {
  import collection.mutable.HashMap

  val requests = HashMap.empty[Int, Int]

  def receive = {
    case req: RequestMethod =>
      if (requests contains req.hashCode) complete(req)
      else requests += (req.hashCode -> 0)

    case t: Tuple2[RequestMethod, Int] =>
      if (requests contains t._1.hashCode) complete(t._1)
      else requests += (t._1.hashCode -> t._2)
  }

  def complete(req: RequestMethod) = requests.remove(req.hashCode) match {
      case Some(HttpServletResponse.SC_FORBIDDEN) => req.Forbidden("")
      case Some(HttpServletResponse.SC_ACCEPTED) => req.Accepted("")
      case Some(_) => req.OK("")
      case _ => {}
  }
}

Examples

Using the Akka Mist module with OAuth

https://gist.github.com/759501

Using the Akka Mist module with the Facebook Graph API and WebGL

Example project using Akka Mist with the Facebook Graph API and WebGL https://github.com/buka/fbgl1

Contents