akka.contrib.pattern
Class ReliableProxy

java.lang.Object
  extended by akka.contrib.pattern.ReliableProxy
All Implemented Interfaces:
Actor, ActorLogging, FSM<ReliableProxy.State,scala.collection.immutable.Vector<ReliableProxy.Message>>, LoggingFSM<ReliableProxy.State,scala.collection.immutable.Vector<ReliableProxy.Message>>, ReliableProxyDebugLogging, Listeners

public class ReliableProxy
extends java.lang.Object
implements Actor, LoggingFSM<ReliableProxy.State,scala.collection.immutable.Vector<ReliableProxy.Message>>, ReliableProxyDebugLogging

A ReliableProxy is a means to wrap a remote actor reference in order to obtain certain improved delivery guarantees:

- as long as the proxy is not terminated before it sends all of its queued messages then no messages will be lost - messages re-sent due to the first point will not be delivered out-of-order, message ordering is preserved

These guarantees are valid for the communication between the two end-points of the reliable &ldquo;tunnel&rdquo;, which usually spans an unreliable network.

Note that the ReliableProxy guarantees at-least-once, not exactly-once, delivery.

Delivery from the remote end-point to the target actor is still subject to in-JVM delivery semantics (i.e. not strictly guaranteed due to possible OutOfMemory situations or other VM errors).

You can create a reliable connection like this:

In Scala:


 val proxy = context.actorOf(ReliableProxy.props(target, 100.millis, 120.seconds)
 
or in Java:

 final ActorRef proxy = getContext().actorOf(ReliableProxy.props(
   target, Duration.create(100, "millis"), Duration.create(120, "seconds")));
 

'''''Please note:''''' the tunnel is uni-directional, and original sender information is retained, hence replies by the wrapped target reference will go back in the normal &ldquo;unreliable&rdquo; way unless also secured by a ReliableProxy from the remote end.

==Message Types==

This actor is an FSM, hence it offers the service of transition callbacks to those actors which subscribe using the SubscribeTransitionCallBack and UnsubscribeTransitionCallBack messages; see FSM for more documentation. The proxy will transition into ReliableProxy.Active state when ACKs are outstanding and return to the ReliableProxy.Idle state when every message send so far has been confirmed by the peer end-point.

The initial state of the proxy is ReliableProxy.Connecting. In this state the proxy will repeatedly send Identify messages to ActorSelection(targetPath) in order to obtain a new ActorRef for the target. When an ActorIdentity for the target is received a new tunnel will be created, a ReliableProxy.TargetChanged message containing the target ActorRef will be sent to the proxy's transition subscribers and the proxy will transition into either the ReliableProxy.Idle or ReliableProxy.Active state, depending if there are any outstanding messages that need to be delivered. If maxConnectAttempts is defined this actor will stop itself after Identify is sent maxConnectAttempts times.

While in the Idle or Active states, if a communication failure causes the tunnel to terminate via Remote Deathwatch the proxy will transition into the ReliableProxy.Connecting state as described above. After reconnecting TargetChanged will be sent only if the target ActorRef has changed.

If this actor is stopped and it still has outstanding messages a ReliableProxy.ProxyTerminated message will be sent to the transition subscribers. It contains an Unsent object with the outstanding messages.

If an ReliableProxy.Unsent message is sent to this actor the messages contained within it will be relayed through the tunnel to the target.

Any other message type sent to this actor will be delivered via a remote-deployed child actor to the designated target.

==Failure Cases==

All failures of either the local or the remote end-point are escalated to the parent of this actor; there are no specific error cases which are predefined.

==Arguments== See the constructor below for the arguments for this actor. However, prefer using props(akka.actor.ActorPath, scala.concurrent.duration.FiniteDuration, scala.Option, scala.Option) to this actor's constructor.

param: targetPath is the ActorPath to the actor to which all messages will be forwarded. targetPath can point to a local or remote actor, but the tunnel endpoint will be deployed remotely on the node where the target actor lives. param: retryAfter is the ACK timeout after which all outstanding messages will be resent. There is no limit on the queue size or the number of retries. param: reconnectAfter &nbsp;is an optional interval between connection attempts. It is also used as the interval between receiving a Terminated for the tunnel and attempting to reconnect to the target actor. The minimum recommended value for this is the value of the configuration setting akka.remote.retry-gate-closed-for. Use None to never reconnect after a disconnection. param: maxConnectAttempts &nbsp;is an optional maximum number of attempts to connect to the target actor. Use None for no limit. If reconnectAfter is None this value is ignored.


Nested Class Summary
static class ReliableProxy.Active$
           
static class ReliableProxy.Connecting$
           
static class ReliableProxy.Idle$
           
static class ReliableProxy.Message
           
static class ReliableProxy.Message$
           
static class ReliableProxy.ProxyTerminated
          ProxyTerminated is sent to transition subscribers during postStop.
static class ReliableProxy.ProxyTerminated$
           
static class ReliableProxy.Receiver
           
static interface ReliableProxy.State
           
static class ReliableProxy.TargetChanged
          TargetChanged is sent to transition subscribers when the initial connection is made the target and when the target ActorRef has changed (for example, the target system crashed and has been restarted).
static class ReliableProxy.TargetChanged$
           
static class ReliableProxy.Unsent
           
static class ReliableProxy.Unsent$
           
 
Nested classes/interfaces inherited from interface akka.actor.FSM
FSM.$minus$greater$, FSM.CurrentState<S>, FSM.CurrentState$, FSM.Event<D>, FSM.Event$, FSM.Failure, FSM.Failure$, FSM.LogEntry<S,D>, FSM.LogEntry$, FSM.Normal$, FSM.NullFunction$, FSM.Reason, FSM.Shutdown$, FSM.State$, FSM.StateTimeout$, FSM.StopEvent<S,D>, FSM.StopEvent$, FSM.SubscribeTransitionCallBack, FSM.SubscribeTransitionCallBack$, FSM.TimeoutMarker, FSM.TimeoutMarker$, FSM.Timer, FSM.Timer$, FSM.TransformHelper, FSM.Transition<S>, FSM.Transition$, FSM.UnsubscribeTransitionCallBack, FSM.UnsubscribeTransitionCallBack$
 
Nested classes/interfaces inherited from interface akka.actor.Actor
Actor.emptyBehavior$
 
Constructor Summary
ReliableProxy(ActorPath targetPath, scala.concurrent.duration.FiniteDuration retryAfter, scala.Option<scala.concurrent.duration.FiniteDuration> reconnectAfter, scala.Option<java.lang.Object> maxConnectAttempts)
           
 
Method Summary
static ReliableProxy.Active$ active()
           
 int attemptedReconnects()
           
static int compare(int a, int b)
          Wrap-around aware comparison of integers: differences limited to 2**31-1 in magnitude will work correctly.
 void createTunnel(ActorRef target)
           
 int currentSerial()
           
 ActorRef currentTarget()
           
 scala.concurrent.duration.FiniteDuration defaultConnectInterval()
           
static ReliableProxy.Idle$ idle()
           
 ReliableProxy.Connecting$ initialState()
           
 int lastAckSerial()
           
 void logResend(int size)
           
 scala.concurrent.duration.FiniteDuration nextBackoff()
          Returns the next retry interval duration.
 int nextSerial()
           
 void postStop()
          User overridable callback.
static Props props(ActorPath targetPath, scala.concurrent.duration.FiniteDuration retryAfter)
          Props with no reconnections.
static Props props(ActorPath targetPath, scala.concurrent.duration.FiniteDuration retryAfter, scala.concurrent.duration.FiniteDuration reconnectAfter)
          Props with no limit on reconnections.
static Props props(ActorPath targetPath, scala.concurrent.duration.FiniteDuration retryAfter, scala.concurrent.duration.FiniteDuration reconnectAfter, int maxReconnects)
          Java API Props.
static Props props(ActorPath targetPath, scala.concurrent.duration.FiniteDuration retryAfter, scala.Option<scala.concurrent.duration.FiniteDuration> reconnectAfter, scala.Option<java.lang.Object> maxReconnects)
          Scala API Props.
static Props receiver(ActorRef target, int currentSerial)
           
static ReliableProxy.Connecting$ reconnecting()
           
 java.lang.String reconnectTimer()
           
 scala.collection.immutable.Vector<ReliableProxy.Message> resend(scala.collection.immutable.Vector<ReliableProxy.Message> q)
           
 java.lang.String resendTimer()
           
 void resetBackoff()
          Reset backoff interval.
 long retryGateClosedFor()
           
 void scheduleReconnectTick()
           
 void scheduleTick()
           
 ReliableProxy.Message send(java.lang.Object msg, ActorRef snd)
           
 OneForOneStrategy supervisorStrategy()
          User overridable definition the strategy to use for supervising child actors.
 FSM.State<ReliableProxy.State,scala.collection.immutable.Vector<ReliableProxy.Message>> terminated()
           
 ActorRef tunnel()
           
 scala.collection.immutable.Vector<ReliableProxy.Message> updateSerial(scala.collection.immutable.Vector<ReliableProxy.Message> q)
           
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 
Methods inherited from interface akka.actor.LoggingFSM
advance, debugEvent, events, full, getLog, logDepth, pos, processEvent, states
 
Methods inherited from interface akka.actor.FSM
applyState, cancelTimer, currentState, Event, generation, handleEvent, handleEventDefault, handleTransition, initialize, isStateTimerActive, isTimerActive, logTermination, makeTransition, nextState, nextStateData, onTermination, onTransition, processMsg, register, setStateTimeout, setTimer, startWith, stateData, stateFunctions, stateName, StateTimeout, stateTimeouts, stay, stop, stop, stop, StopEvent, terminate, terminateEvent, timeoutFuture, timerGen, timers, total2pf, transform, transitionEvent, when, whenUnhandled
 
Methods inherited from interface akka.actor.Actor
aroundPostRestart, aroundPostStop, aroundPreRestart, aroundPreStart, aroundReceive, context, postRestart, preRestart, preStart, self, sender, unhandled
 
Methods inherited from interface akka.routing.Listeners
gossip, listenerManagement, listeners
 
Methods inherited from interface akka.actor.ActorLogging
_log, log
 
Methods inherited from interface akka.contrib.pattern.ReliableProxyDebugLogging
addSelf, debug, enabled, logDebug, logDebug
 

Constructor Detail

ReliableProxy

public ReliableProxy(ActorPath targetPath,
                     scala.concurrent.duration.FiniteDuration retryAfter,
                     scala.Option<scala.concurrent.duration.FiniteDuration> reconnectAfter,
                     scala.Option<java.lang.Object> maxConnectAttempts)
Method Detail

props

public static Props props(ActorPath targetPath,
                          scala.concurrent.duration.FiniteDuration retryAfter,
                          scala.Option<scala.concurrent.duration.FiniteDuration> reconnectAfter,
                          scala.Option<java.lang.Object> maxReconnects)
Scala API Props. Arguments are detailed in the ReliableProxy constructor.

Parameters:
targetPath - (undocumented)
retryAfter - (undocumented)
reconnectAfter - (undocumented)
maxReconnects - (undocumented)
Returns:
(undocumented)

props

public static Props props(ActorPath targetPath,
                          scala.concurrent.duration.FiniteDuration retryAfter,
                          scala.concurrent.duration.FiniteDuration reconnectAfter,
                          int maxReconnects)
Java API Props. Arguments are detailed in the ReliableProxy constructor.

Parameters:
targetPath - (undocumented)
retryAfter - (undocumented)
reconnectAfter - (undocumented)
maxReconnects - (undocumented)
Returns:
(undocumented)

props

public static Props props(ActorPath targetPath,
                          scala.concurrent.duration.FiniteDuration retryAfter,
                          scala.concurrent.duration.FiniteDuration reconnectAfter)
Props with no limit on reconnections. Arguments are detailed in the ReliableProxy constructor.

Parameters:
targetPath - (undocumented)
retryAfter - (undocumented)
reconnectAfter - (undocumented)
Returns:
(undocumented)

props

public static Props props(ActorPath targetPath,
                          scala.concurrent.duration.FiniteDuration retryAfter)
Props with no reconnections. Arguments are detailed in the ReliableProxy constructor.

Parameters:
targetPath - (undocumented)
retryAfter - (undocumented)
Returns:
(undocumented)

compare

public static int compare(int a,
                          int b)
Wrap-around aware comparison of integers: differences limited to 2**31-1 in magnitude will work correctly.

Parameters:
a - (undocumented)
b - (undocumented)
Returns:
(undocumented)

receiver

public static Props receiver(ActorRef target,
                             int currentSerial)

idle

public static ReliableProxy.Idle$ idle()

active

public static ReliableProxy.Active$ active()

reconnecting

public static ReliableProxy.Connecting$ reconnecting()

tunnel

public ActorRef tunnel()

currentSerial

public int currentSerial()

lastAckSerial

public int lastAckSerial()

currentTarget

public ActorRef currentTarget()

attemptedReconnects

public int attemptedReconnects()

resendTimer

public java.lang.String resendTimer()

reconnectTimer

public java.lang.String reconnectTimer()

retryGateClosedFor

public long retryGateClosedFor()

defaultConnectInterval

public scala.concurrent.duration.FiniteDuration defaultConnectInterval()

initialState

public ReliableProxy.Connecting$ initialState()

createTunnel

public void createTunnel(ActorRef target)

supervisorStrategy

public OneForOneStrategy supervisorStrategy()
Description copied from interface: Actor
User overridable definition the strategy to use for supervising child actors.

Specified by:
supervisorStrategy in interface Actor
Returns:
(undocumented)

postStop

public void postStop()
Description copied from interface: Actor
User overridable callback.

Is called asynchronously after 'actor.stop()' is invoked. Empty default implementation.

Specified by:
postStop in interface Actor
Specified by:
postStop in interface FSM<ReliableProxy.State,scala.collection.immutable.Vector<ReliableProxy.Message>>

scheduleTick

public void scheduleTick()

nextSerial

public int nextSerial()

send

public ReliableProxy.Message send(java.lang.Object msg,
                                  ActorRef snd)

updateSerial

public scala.collection.immutable.Vector<ReliableProxy.Message> updateSerial(scala.collection.immutable.Vector<ReliableProxy.Message> q)

resend

public scala.collection.immutable.Vector<ReliableProxy.Message> resend(scala.collection.immutable.Vector<ReliableProxy.Message> q)

logResend

public void logResend(int size)

terminated

public FSM.State<ReliableProxy.State,scala.collection.immutable.Vector<ReliableProxy.Message>> terminated()

scheduleReconnectTick

public void scheduleReconnectTick()

resetBackoff

public void resetBackoff()
Reset backoff interval.

This and nextBackoff are meant to be implemented by subclasses.


nextBackoff

public scala.concurrent.duration.FiniteDuration nextBackoff()
Returns the next retry interval duration. By default each interval is the same, reconnectAfter.

Returns:
(undocumented)