Typed Actors (Scala)
Loading

Typed Actors (Scala)

Akka Typed Actors is an implementation of the Active Objects pattern. Essentially turning method invocations into asynchronous dispatch instead of synchronous that has been the default way since Smalltalk came out.

Typed Actors consist of 2 "parts", a public interface and an implementation, and if you've done any work in "enterprise" Java, this will be very familiar to you. As with normal Actors you have an external API (the public interface instance) that will delegate methodcalls asynchronously to a private instance of the implementation.

The advantage of Typed Actors vs. Actors is that with TypedActors you have a static contract, and don't need to define your own messages, the downside is that it places some limitations on what you can do and what you can't, i.e. you cannot use become/unbecome.

Typed Actors are implemented using JDK Proxies which provide a pretty easy-worked API to intercept method calls.

Note

Just as with regular Akka Actors, Typed Actors process one call at a time.

When to use Typed Actors

Typed actors are nice for bridging between actor systems (the “inside”) and non-actor code (the “outside”), because they allow you to write normal OO-looking code on the outside. Think of them like doors: their practicality lies in interfacing between private sphere and the public, but you don’t want that many doors inside your house, do you? For a longer discussion see this blog post.

A bit more background: TypedActors can very easily be abused as RPC, and that is an abstraction which is well-known to be leaky. Hence TypedActors are not what we think of first when we talk about making highly scalable concurrent software easier to write correctly. They have their niche, use them sparingly.

The tools of the trade

Before we create our first Typed Actor we should first go through the tools that we have at our disposal, it's located in akka.actor.TypedActor.

import akka.actor.TypedActor

//Returns the Typed Actor Extension
val extension = TypedActor(system) //system is an instance of ActorSystem

//Returns whether the reference is a Typed Actor Proxy or not
TypedActor(system).isTypedActor(someReference)

//Returns the backing Akka Actor behind an external Typed Actor Proxy
TypedActor(system).getActorRefFor(someReference)

//Returns the current ActorContext,
// method only valid within methods of a TypedActor implementation
val c: ActorContext = TypedActor.context

//Returns the external proxy of the current Typed Actor,
// method only valid within methods of a TypedActor implementation
val s: Squarer = TypedActor.self[Squarer]

//Returns a contextual instance of the Typed Actor Extension
//this means that if you create other Typed Actors with this,
//they will become children to the current Typed Actor.
TypedActor(TypedActor.context)

Warning

Same as not exposing this of an Akka Actor, it's important not to expose this of a Typed Actor, instead you should pass the external proxy reference, which is obtained from within your Typed Actor as TypedActor.self, this is your external identity, as the ActorRef is the external identity of an Akka Actor.

Creating Typed Actors

To create a Typed Actor you need to have one or more interfaces, and one implementation.

Our example interface:

import scala.concurrent.{ Promise, Future, Await }
import scala.concurrent.duration._
import akka.actor.{ ActorContext, TypedActor, TypedProps }

trait Squarer {
  // typed actor iface methods ...
}

Our example implementation of that interface:

import scala.concurrent.{ Promise, Future, Await }
import scala.concurrent.duration._
import akka.actor.{ ActorContext, TypedActor, TypedProps }

class SquarerImpl(val name: String) extends Squarer {

  def this() = this("default")
  // typed actor impl methods ...
}

The most trivial way of creating a Typed Actor instance of our Squarer:

val mySquarer: Squarer =
  TypedActor(system).typedActorOf(TypedProps[SquarerImpl]())

First type is the type of the proxy, the second type is the type of the implementation. If you need to call a specific constructor you do it like this:

val otherSquarer: Squarer =
  TypedActor(system).typedActorOf(TypedProps(classOf[Squarer],
    new SquarerImpl("foo")), "name")

Since you supply a Props, you can specify which dispatcher to use, what the default timeout should be used and more. Now, our Squarer doesn't have any methods, so we'd better add those.

import scala.concurrent.{ Promise, Future, Await }
import scala.concurrent.duration._
import akka.actor.{ ActorContext, TypedActor, TypedProps }

trait Squarer {
  def squareDontCare(i: Int): Unit //fire-forget

  def square(i: Int): Future[Int] //non-blocking send-request-reply

  def squareNowPlease(i: Int): Option[Int] //blocking send-request-reply

  def squareNow(i: Int): Int //blocking send-request-reply
}

Alright, now we've got some methods we can call, but we need to implement those in SquarerImpl.

import scala.concurrent.{ Promise, Future, Await }
import scala.concurrent.duration._
import akka.actor.{ ActorContext, TypedActor, TypedProps }

class SquarerImpl(val name: String) extends Squarer {

  def this() = this("default")
  import TypedActor.dispatcher //So we can create Promises

  def squareDontCare(i: Int): Unit = i * i //Nobody cares :(

  def square(i: Int): Future[Int] = Promise.successful(i * i).future

  def squareNowPlease(i: Int): Option[Int] = Some(i * i)

  def squareNow(i: Int): Int = i * i
}

Excellent, now we have an interface and an implementation of that interface, and we know how to create a Typed Actor from that, so let's look at calling these methods.

Method dispatch semantics

Methods returning:

  • Unit will be dispatched with fire-and-forget semantics, exactly like ActorRef.tell
  • scala.concurrent.Future[_] will use send-request-reply semantics, exactly like ActorRef.ask
  • scala.Option[_] or akka.japi.Option<?> will use send-request-reply semantics, but will block to wait for an answer, and return None if no answer was produced within the timeout, or scala.Some/akka.japi.Some containing the result otherwise. Any exception that was thrown during this call will be rethrown.
  • Any other type of value will use send-request-reply semantics, but will block to wait for an answer, throwing java.util.concurrent.TimeoutException if there was a timeout or rethrow any exception that was thrown during this call.

Messages and immutability

While Akka cannot enforce that the parameters to the methods of your Typed Actors are immutable, we strongly recommend that parameters passed are immutable.

One-way message send

mySquarer.squareDontCare(10)

As simple as that! The method will be executed on another thread; asynchronously.

Request-reply message send

val oSquare = mySquarer.squareNowPlease(10) //Option[Int]

This will block for as long as the timeout that was set in the Props of the Typed Actor, if needed. It will return None if a timeout occurs.

val iSquare = mySquarer.squareNow(10) //Int

This will block for as long as the timeout that was set in the Props of the Typed Actor, if needed. It will throw a java.util.concurrent.TimeoutException if a timeout occurs.

Request-reply-with-future message send

val fSquare = mySquarer.square(10) //A Future[Int]

This call is asynchronous, and the Future returned can be used for asynchronous composition.

Stopping Typed Actors

Since Akkas Typed Actors are backed by Akka Actors they must be stopped when they aren't needed anymore.

TypedActor(system).stop(mySquarer)

This asynchronously stops the Typed Actor associated with the specified proxy ASAP.

TypedActor(system).poisonPill(otherSquarer)

This asynchronously stops the Typed Actor associated with the specified proxy after it's done with all calls that were made prior to this call.

Typed Actor Hierarchies

Since you can obtain a contextual Typed Actor Extension by passing in an ActorContext you can create child Typed Actors by invoking typedActorOf(..) on that:

//Inside your Typed Actor
val childSquarer: Squarer =
  TypedActor(TypedActor.context).typedActorOf(TypedProps[SquarerImpl]())
//Use "childSquarer" as a Squarer

You can also create a child Typed Actor in regular Akka Actors by giving the ActorContext as an input parameter to TypedActor.get(…).

Supervisor Strategy

By having your Typed Actor implementation class implement TypedActor.Supervisor you can define the strategy to use for supervising child actors, as described in Supervision and Monitoring and Fault Tolerance (Scala).

Lifecycle callbacks

By having your Typed Actor implementation class implement any and all of the following:

  • TypedActor.PreStart
  • TypedActor.PostStop
  • TypedActor.PreRestart
  • TypedActor.PostRestart

You can hook into the lifecycle of your Typed Actor.

Receive arbitrary messages

If your implementation class of your TypedActor extends akka.actor.TypedActor.Receiver, all messages that are not MethodCall``s will be passed into the ``onReceive-method.

This allows you to react to DeathWatch Terminated-messages and other types of messages, e.g. when interfacing with untyped actors.

Proxying

You can use the typedActorOf that takes a TypedProps and an ActorRef to proxy the given ActorRef as a TypedActor. This is usable if you want to communicate remotely with TypedActors on other machines, just look them up with actorFor and pass the ActorRef to typedActorOf.

Note

The ActorRef needs to accept MethodCall messages.

Lookup & Remoting

Since TypedActors are backed by Akka Actors, you can use actorFor together with typedActorOf to proxy ActorRefs potentially residing on remote nodes.

val typedActor: Foo with Bar =
  TypedActor(system).
    typedActorOf(
      TypedProps[FooBar],
      system.actorFor("akka://SomeSystem@somehost:2552/user/some/foobar"))
//Use "typedActor" as a FooBar

Supercharging

Here's an example on how you can use traits to mix in behavior in your Typed Actors.

trait Foo {
  def doFoo(times: Int): Unit = println("doFoo(" + times + ")")
}

trait Bar {
  import TypedActor.dispatcher //So we have an implicit dispatcher for our Promise
  def doBar(str: String): Future[String] =
    Promise.successful(str.toUpperCase).future
}

class FooBar extends Foo with Bar
val awesomeFooBar: Foo with Bar =
  TypedActor(system).typedActorOf(TypedProps[FooBar]())

awesomeFooBar.doFoo(10)
val f = awesomeFooBar.doBar("yes")

TypedActor(system).poisonPill(awesomeFooBar)

Contents