Remoting (Java)

Remoting (Java)

For an introduction of remoting capabilities of Akka please see Location Transparency.

Preparing your ActorSystem for Remoting

The Akka remoting is a separate jar file. Make sure that you have the following dependency in your project:

<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-remote</artifactId>
  <version>2.0.5</version>
</dependency>

To enable remote capabilities in your Akka project you should, at a minimum, add the following changes to your application.conf file:

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    transport = "akka.remote.netty.NettyRemoteTransport"
    netty {
      hostname = "127.0.0.1"
      port = 2552
    }
 }
}

As you can see in the example above there are four things you need to add to get started:

  • Change provider from akka.actor.LocalActorRefProvider to akka.remote.RemoteActorRefProvider
  • Add host name - the machine you want to run the actor system on; this host name is exactly what is passed to remote systems in order to identify this system and consequently used for connecting back to this system if need be, hence set it to a reachable IP address or resolvable name in case you want to communicate across the network.
  • Add port number - the port the actor system should listen on, set to 0 to have it chosen automatically

Note

The port number needs to be unique for each actor system on the same machine even if the actor systems have different names. This is because each actor system has its own network subsystem listening for connections and handling messages as not to interfere with other actor systems.

The example above only illustrates the bare minimum of properties you have to add to enable remoting. There are lots of more properties that are related to remoting in Akka. We refer to the following reference file for more information:

#####################################
# Akka Remote Reference Config File #
#####################################

# This is the reference config file that contains all the default settings.
# Make your edits/overrides in your application.conf.

# comments about akka.actor settings left out where they are already in akka-
# actor.jar, because otherwise they would be repeated in config rendering.

akka {

  actor {

    serializers {
      proto = "akka.serialization.ProtobufSerializer"
    }


    serialization-bindings {
      # Since com.google.protobuf.Message does not extend Serializable but GeneratedMessage
      # does, need to use the more specific one here in order to avoid ambiguity
      "com.google.protobuf.GeneratedMessage" = proto
    }

    deployment {

      default {

        # if this is set to a valid remote address, the named actor will be deployed
        # at that node e.g. "akka://sys@host:port"
        remote = ""

        target {

          # A list of hostnames and ports for instantiating the children of a
          # non-direct router
          #   The format should be on "akka://sys@host:port", where:
          #    - sys is the remote actor system name
          #    - hostname can be either hostname or IP address the remote actor
          #      should connect to
          #    - port should be the port for the remote server on the other node
          # The number of actor instances to be spawned is still taken from the
          # nr-of-instances setting as for local routers; the instances will be
          # distributed round-robin among the given nodes.
          nodes = []

        }
      }
    }
  }

  remote {

    # Which implementation of akka.remote.RemoteTransport to use
    # default is a TCP-based remote transport based on Netty
    transport = "akka.remote.netty.NettyRemoteTransport"

    # Enable untrusted mode for full security of server managed actors, allows
    # untrusted clients to connect.
    untrusted-mode = off

    # Timeout for ACK of cluster operations, like checking actor out etc.
    remote-daemon-ack-timeout = 30s

    # If this is "on", Akka will log all inbound messages at DEBUG level, if off then they are not logged
    log-received-messages = off

    # If this is "on", Akka will log all outbound messages at DEBUG level, if off then they are not logged
    log-sent-messages = off

    # If this is "on", Akka will log all RemoteLifeCycleEvents at the level defined for each, if off then they are not logged
    log-remote-lifecycle-events = off

    # Each property is annotated with (I) or (O) or (I&O), where I stands for “inbound” and O for “outbound” connections.
    # The NettyRemoteTransport always starts the server role to allow inbound connections, and it starts
    # active client connections whenever sending to a destination which is not yet connected; if configured
    # it reuses inbound connections for replies, which is called a passive client connection (i.e. from server
    # to client).
    netty {

      # (O) In case of increased latency / overflow how long should we wait (blocking the sender)
      # until we deem the send to be cancelled?
      # 0 means "never backoff", any positive number will indicate time to block at most.
      backoff-timeout = 0ms

      # (I&O) Generate your own with '$AKKA_HOME/scripts/generate_config_with_secure_cookie.sh'
      # or using 'akka.util.Crypt.generateSecureCookie'
      secure-cookie = ""

      # (I) Should the remote server require that its peers share the same secure-cookie
      # (defined in the 'remote' section)?
      require-cookie = off

      # (I) Reuse inbound connections for outbound messages
      use-passive-connections = on

      # (I) The hostname or ip to bind the remoting to,
      # InetAddress.getLocalHost.getHostAddress is used if empty
      hostname = ""

      # (I) The default remote server port clients should connect to.
      # Default is 2552 (AKKA), use 0 if you want a random available port
      # This port needs to be unique for each actor system on the same machine.
      port = 2552

      # (O) The address of a local network interface (IP Address) to bind to when creating
      # outbound connections. Set to "" or "auto" for automatic selection of local address.
      outbound-local-address = "auto"

      # (I&O) Increase this if you want to be able to send messages with large payloads
      message-frame-size = 1 MiB

      # (O) Timeout duration
      connection-timeout = 120s

      # (I) Sets the size of the connection backlog
      backlog = 4096

      # (I) Length in akka.time-unit how long core threads will be kept alive if idling
      execution-pool-keepalive = 60s

      # (I) Size of the core pool of the remote execution unit
      execution-pool-size = 4

      # (I) Maximum channel size, 0 for off
      max-channel-memory-size = 0b

      # (I) Maximum total size of all channels, 0 for off
      max-total-memory-size = 0b

      # (O) Time between reconnect attempts for active clients
      reconnect-delay = 5s

      # (O) Read inactivity period (lowest resolution is seconds)
      # after which active client connection is shutdown;
      # will be re-established in case of new communication requests.
      # A value of 0 will turn this feature off
      read-timeout = 0s

      # (O) Write inactivity period (lowest resolution is seconds)
      # after which a heartbeat is sent across the wire.
      # A value of 0 will turn this feature off
      write-timeout = 10s

      # (O) Inactivity period of both reads and writes (lowest resolution is seconds)
      # after which active client connection is shutdown;
      # will be re-established in case of new communication requests
      # A value of 0 will turn this feature off
      all-timeout = 0s

      # (O) Maximum time window that a client should try to reconnect for
      reconnection-time-window = 600s

      # (I&O) Used to configure the number of I/O worker threads on server sockets
      server-socket-worker-pool {
        # Min number of threads to cap factor-based number to
        pool-size-min = 2

        # The pool size factor is used to determine thread pool size
        # using the following formula: ceil(available processors * factor).
        # Resulting size is then bounded by the pool-size-min and
        # pool-size-max values.
        pool-size-factor = 2.0

        # Max number of threads to cap factor-based number to
        pool-size-max = 128
      }

      # (I&O) Used to configure the number of I/O worker threads on client sockets
      client-socket-worker-pool {
        # Min number of threads to cap factor-based number to
        pool-size-min = 2

        # The pool size factor is used to determine thread pool size
        # using the following formula: ceil(available processors * factor).
        # Resulting size is then bounded by the pool-size-min and
        # pool-size-max values.
        pool-size-factor = 2.0

        # Max number of threads to cap factor-based number to
        pool-size-max = 128
      }
    }

    # The dispatcher used for the system actor "network-event-sender"
    network-event-sender-dispatcher {
      executor = thread-pool-executor
      type = PinnedDispatcher
    }
  }
}

Looking up Remote Actors

actorFor(path) will obtain an ActorRef to an Actor on a remote node:

ActorRef actor = context.actorFor("akka://[email protected]:2552/user/serviceA/retrieval");

As you can see from the example above the following pattern is used to find an ActorRef on a remote node:

akka://<actorsystemname>@<hostname>:<port>/<actor path>

For more details on how actor addresses and paths are formed and used, please refer to Actor References, Paths and Addresses.

Creating Actors Remotely

The configuration below instructs the system to deploy the actor "retrieval” on the specific host "app@10.0.0.1". The "app" in this case refers to the name of the ActorSystem (only showing deployment section):

akka {
  actor {
    deployment {
      /serviceA/retrieval {
        remote = "akka://[email protected]:2552"
      }
    }
  }
}

Logical path lookup is supported on the node you are on, i.e. to use the actor created above you would do the following:

ActorRef a1 = getContext().actorFor("/serviceA/retrieval");

This will obtain an ActorRef on a remote node:

ActorRef a2 = getContext().actorFor("akka://[email protected]:2552/user/serviceA/retrieval");

As you can see from the example above the following pattern is used to find an ActorRef on a remote node:

akka://<actorsystemname>@<hostname>:<port>/<actor path>

Programmatic Remote Deployment

To allow dynamically deployed systems, it is also possible to include deployment configuration in the Props which are used to create an actor: this information is the equivalent of a deployment section from the configuration file, and if both are given, the external configuration takes precedence.

With these imports:

import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.AddressFromURIString;
import akka.actor.Deploy;
import akka.actor.Props;
import akka.actor.ActorSystem;
import akka.remote.RemoteScope;

and a remote address like this:

Address addr = new Address("akka", "sys", "host", 1234);
addr = AddressFromURIString.parse("akka://sys@host:1234"); // the same

you can advise the system to create a child on that remote node like so:

ActorRef ref = system.actorOf(new Props(RemoteDeploymentDocSpec.Echo.class).withDeploy(new Deploy(new RemoteScope(addr))));

Serialization

When using remoting for actors you must ensure that the props and messages used for those actors are serializable. Failing to do so will cause the system to behave in an unintended way.

For more information please see Serialization (Java)

Routers with Remote Destinations

It is absolutely feasible to combine remoting with Routing (Java). This is also done via configuration:

akka {
  actor {
    deployment {
      /serviceA/aggregation {
        router = "round-robin"
        nr-of-instances = 10
        target {
          nodes = ["akka://[email protected]:2552", "akka://[email protected]:2552"]
        }
      }
    }
  }
}

This configuration setting will clone the actor “aggregation” 10 times and deploy it evenly distributed across the two given target nodes.

Description of the Remoting Sample

There is a more extensive remote example that comes with the Akka distribution. Please have a look here for more information: Remote Sample This sample demonstrates both, remote deployment and look-up of remote actors. First, let us have a look at the common setup for both scenarios (this is common.conf):

akka {

  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }

  remote {
    netty {
      hostname = "127.0.0.1"
    }
  }
}

This enables the remoting by installing the RemoteActorRefProvider and chooses the default remote transport. All other options will be set specifically for each show case.

Note

Be sure to replace the default IP 127.0.0.1 with the real address the system is reachable by if you deploy onto multiple machines!

Remote Lookup

In order to look up a remote actor, that one must be created first. For this purpose, we configure an actor system to listen on port 2552 (this is a snippet from application.conf):

calculator {
  include "common"

  akka {
    remote.netty.port = 2552
  }
}

Then the actor must be created. For all code which follows, assume these imports:

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.kernel.Bootable;
import com.typesafe.config.ConfigFactory;

The actor doing the work will be this one:

public class JSimpleCalculatorActor extends UntypedActor {
    @Override
    public void onReceive(Object message) {
      
        if (message instanceof Op.Add) {
            Op.Add add = (Op.Add) message;
            System.out.println("Calculating " + add.getN1() + " + " + add.getN2());
            getSender().tell(new Op.AddResult(add.getN1(), add.getN2(), add.getN1() + add.getN2()));
            
        } else if (message instanceof Op.Subtract) {
            Op.Subtract subtract = (Op.Subtract) message;
            System.out.println("Calculating " + subtract.getN1() + " - " + subtract.getN2());
            getSender().tell(new Op.SubtractResult(subtract.getN1(), subtract.getN2(), subtract.getN1() - subtract.getN2()));

        } else {
            unhandled(message);
        }
    }
}

and we start it within an actor system using the above configuration

public class JCalculatorApplication implements Bootable {
    private ActorSystem system;

    public JCalculatorApplication() {
        system = ActorSystem.create("CalculatorApplication", ConfigFactory.load().getConfig("calculator"));
        ActorRef actor = system.actorOf(new Props(JSimpleCalculatorActor.class), "simpleCalculator");
    }

    @Override
    public void startup() {
    }

    @Override
    public void shutdown() {
        system.shutdown();
    }
}

With the service actor up and running, we may look it up from another actor system, which will be configured to use port 2553 (this is a snippet from application.conf).

remotelookup {
  include "common"

  akka {
    remote.netty.port = 2553
  }
}

The actor which will query the calculator is a quite simple one for demonstration purposes

public class JLookupActor extends UntypedActor {

    @Override
    public void onReceive(Object message) throws Exception {
      
        if (message instanceof InternalMsg.MathOpMsg) {
          
            // send message to server actor
            InternalMsg.MathOpMsg msg = (InternalMsg.MathOpMsg) message;
            msg.getActor().tell(msg.getMathOp(), getSelf());
            
        } else if (message instanceof Op.MathResult) {
          
            // receive reply from server actor
            
            if (message instanceof Op.AddResult) {
                Op.AddResult result = (Op.AddResult) message;
                System.out.println("Add result: " + result.getN1() + " + " +
                        result.getN2() + " = " + result.getResult());
                
            } else if (message instanceof Op.SubtractResult) {
                Op.SubtractResult result = (Op.SubtractResult) message;
                System.out.println("Sub result: " + result.getN1() + " - " +
                        result.getN2() + " = " + result.getResult());
            }
        } else {
          unhandled(message);
        }
    }
}

and it is created from an actor system using the aforementioned client’s config.

public class JLookupApplication implements Bootable {
    private ActorSystem system;
    private ActorRef actor;
    private ActorRef remoteActor;

    public JLookupApplication() {
        system = ActorSystem.create("LookupApplication", ConfigFactory.load().getConfig("remotelookup"));
        actor = system.actorOf(new Props(JLookupActor.class));
        remoteActor = system.actorFor("akka://[email protected]:2552/user/simpleCalculator");
    }

    public void doSomething(Op.MathOp mathOp) {
        actor.tell(new InternalMsg.MathOpMsg(remoteActor, mathOp));
    }

    @Override
    public void startup() {
    }

    @Override
    public void shutdown() {
        system.shutdown();
    }
}

Requests which come in via doSomething will be sent to the client actor along with the reference which was looked up earlier. Observe how the actor system name using in actorFor matches the remote system’s name, as do IP and port number. Top-level actors are always created below the "/user" guardian, which supervises them.

Remote Deployment

Creating remote actors instead of looking them up is not visible in the source code, only in the configuration file. This section is used in this scenario (this is a snippet from application.conf):

remotecreation {
  include "common"

  akka {
    actor {
      deployment {
        /advancedCalculator {
          remote = "akka://[email protected]:2552"
        }
      }
    }

    remote.netty.port = 2554
  }
}

For all code which follows, assume these imports:

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.kernel.Bootable;
import com.typesafe.config.ConfigFactory;

The server actor can multiply or divide numbers:

public class JAdvancedCalculatorActor extends UntypedActor {
    @Override
    public void onReceive(Object message) throws Exception {
      
        if (message instanceof Op.Multiply) {
            Op.Multiply multiply = (Op.Multiply) message;
            System.out.println("Calculating " + multiply.getN1() + " * " + multiply.getN2());
            getSender().tell(new Op.MultiplicationResult(multiply.getN1(), multiply.getN2(), multiply.getN1() * multiply.getN2()));
            
        } else if (message instanceof Op.Divide) {
            Op.Divide divide = (Op.Divide) message;
            System.out.println("Calculating " + divide.getN1() + " / " + divide.getN2());
            getSender().tell(new Op.DivisionResult(divide.getN1(), divide.getN2(), divide.getN1() / divide.getN2()));

        } else {
            unhandled(message);
        }
    }
}

The client actor looks like in the previous example

public class JCreationActor extends UntypedActor {
    private static final NumberFormat formatter = new DecimalFormat("#0.00");

    @Override
    public void onReceive(Object message) throws Exception {
      
        if (message instanceof InternalMsg.MathOpMsg) {
            // forward math op to server actor
            InternalMsg.MathOpMsg msg = (InternalMsg.MathOpMsg) message;
            msg.getActor().tell(msg.getMathOp(), getSelf());
            
        } else if (message instanceof Op.MathResult) {
          
            // receive reply from server actor
          
            if (message instanceof Op.MultiplicationResult) {
                Op.MultiplicationResult result = (Op.MultiplicationResult) message;
                System.out.println("Mul result: " + result.getN1() + " * " +
                        result.getN2() + " = " + result.getResult());
                
            } else if (message instanceof Op.DivisionResult) {
                Op.DivisionResult result = (Op.DivisionResult) message;
                System.out.println("Div result: " + result.getN1() + " / " +
                        result.getN2() + " = " + formatter.format(result.getResult()));
            }
        } else {
          unhandled(message);
        }
    }
}

but the setup uses only actorOf:

public class JCreationApplication implements Bootable {
    private ActorSystem system;
    private ActorRef actor;
    private ActorRef remoteActor;

    public JCreationApplication() {
        system = ActorSystem.create("CreationApplication", ConfigFactory.load().getConfig("remotecreation"));
        actor = system.actorOf(new Props(JCreationActor.class));
        remoteActor = system.actorOf(new Props(JAdvancedCalculatorActor.class), "advancedCalculator");
    }

    public void doSomething(Op.MathOp mathOp) {
        actor.tell(new InternalMsg.MathOpMsg(remoteActor, mathOp));
    }

    @Override
    public void startup() {
    }

    @Override
    public void shutdown() {
        system.shutdown();
    }
}

Observe how the name of the server actor matches the deployment given in the configuration file, which will transparently delegate the actor creation to the remote node.

Remote Events

It is possible to listen to events that occur in Akka Remote, and to subscribe/unsubscribe to there events, you simply register as listener to the below described types in on the ActorSystem.eventStream.

Note

To subscribe to any outbound-related events, subscribe to RemoteClientLifeCycleEvent To subscribe to any inbound-related events, subscribe to RemoteServerLifeCycleEvent To subscribe to any remote events, subscribe to RemoteLifeCycleEvent

To intercept when an outbound connection is disconnected, you listen to RemoteClientDisconnected which holds the transport used (RemoteTransport) and the outbound address that was disconnected (Address).

To intercept when an outbound connection is connected, you listen to RemoteClientConnected which holds the transport used (RemoteTransport) and the outbound address that was connected to (Address).

To intercept when an outbound client is started you listen to RemoteClientStarted which holds the transport used (RemoteTransport) and the outbound address that it is connected to (Address).

To intercept when an outbound client is shut down you listen to RemoteClientShutdown which holds the transport used (RemoteTransport) and the outbound address that it was connected to (Address).

To intercept when an outbound message cannot be sent, you listen to RemoteClientWriteFailed which holds the payload that was not written (AnyRef), the cause of the failed send (Throwable), the transport used (RemoteTransport) and the outbound address that was the destination (Address).

For general outbound-related errors, that do not classify as any of the others, you can listen to RemoteClientError, which holds the cause (Throwable), the transport used (RemoteTransport) and the outbound address (Address).

To intercept when an inbound server is started (typically only once) you listen to RemoteServerStarted which holds the transport that it will use (RemoteTransport).

To intercept when an inbound server is shut down (typically only once) you listen to RemoteServerShutdown which holds the transport that it used (RemoteTransport).

To intercept when an inbound connection has been established you listen to RemoteServerClientConnected which holds the transport used (RemoteTransport) and optionally the address that connected (Option<Address>).

To intercept when an inbound connection has been disconnected you listen to RemoteServerClientDisconnected which holds the transport used (RemoteTransport) and optionally the address that disconnected (Option<Address>).

To intercept when an inbound remote client has been closed you listen to RemoteServerClientClosed which holds the transport used (RemoteTransport) and optionally the address of the remote client that was closed (Option<Address>).

Contents