|
|||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | ||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |
java.lang.Object akka.contrib.pattern.ReliableProxy
public class ReliableProxy
A ReliableProxy is a means to wrap a remote actor reference in order to obtain certain improved delivery guarantees:
- as long as the proxy is not terminated before it sends all of its queued messages then no messages will be lost - messages re-sent due to the first point will not be delivered out-of-order, message ordering is preserved
These guarantees are valid for the communication between the two end-points of the reliable “tunnel”, which usually spans an unreliable network.
Note that the ReliableProxy guarantees at-least-once, not exactly-once, delivery.
Delivery from the remote end-point to the target actor is still subject to in-JVM delivery semantics (i.e. not strictly guaranteed due to possible OutOfMemory situations or other VM errors).
You can create a reliable connection like this:
In Scala:
val proxy = context.actorOf(ReliableProxy.props(target, 100.millis, 120.seconds)
or in Java:
final ActorRef proxy = getContext().actorOf(ReliableProxy.props(
target, Duration.create(100, "millis"), Duration.create(120, "seconds")));
'''''Please note:''''' the tunnel is uni-directional, and original sender information is retained, hence replies by the wrapped target reference will go back in the normal “unreliable” way unless also secured by a ReliableProxy from the remote end.
==Message Types==
This actor is an FSM
, hence it offers the service of
transition callbacks to those actors which subscribe using the
SubscribeTransitionCallBack
and
UnsubscribeTransitionCallBack
messages; see
FSM
for more documentation. The proxy will
transition into ReliableProxy.Active
state when ACKs
are outstanding and return to the ReliableProxy.Idle
state when every message send so far has been confirmed by the peer end-point.
The initial state of the proxy is ReliableProxy.Connecting
. In this state the
proxy will repeatedly send Identify
messages to ActorSelection(targetPath)
in order to obtain a new ActorRef
for the target. When an ActorIdentity
for the target is received a new tunnel will be created, a ReliableProxy.TargetChanged
message containing the target ActorRef
will be sent to the proxy's transition subscribers
and the proxy will transition into either the ReliableProxy.Idle
or ReliableProxy.Active
state, depending if there are any outstanding messages that need to be delivered. If
maxConnectAttempts
is defined this actor will stop itself after Identify
is sent
maxConnectAttempts
times.
While in the Idle
or Active
states, if a communication failure causes the tunnel to
terminate via Remote Deathwatch the proxy will transition into the ReliableProxy.Connecting
state as described above. After reconnecting TargetChanged
will be sent only if the target
ActorRef
has changed.
If this actor is stopped and it still has outstanding messages a
ReliableProxy.ProxyTerminated
message will be sent to the
transition subscribers. It contains an Unsent
object with the outstanding messages.
If an ReliableProxy.Unsent
message is sent to this actor
the messages contained within it will be relayed through the tunnel to the target.
Any other message type sent to this actor will be delivered via a remote-deployed child actor to the designated target.
==Failure Cases==
All failures of either the local or the remote end-point are escalated to the parent of this actor; there are no specific error cases which are predefined.
==Arguments==
See the constructor below for the arguments for this actor. However, prefer using
props(akka.actor.ActorPath, scala.concurrent.duration.FiniteDuration, scala.Option
to this actor's constructor.
param: targetPath is the ActorPath
to the actor to which all messages will be forwarded.
targetPath
can point to a local or remote actor, but the tunnel endpoint will be
deployed remotely on the node where the target actor lives.
param: retryAfter is the ACK timeout after which all outstanding messages
will be resent. There is no limit on the queue size or the number of retries.
param: reconnectAfter is an optional interval between connection attempts.
It is also used as the interval between receiving a
Terminated
for the tunnel and
attempting to reconnect to the target actor. The minimum recommended value for this is
the value of the configuration setting akka.remote.retry-gate-closed-for
. Use None
to never reconnect after a disconnection.
param: maxConnectAttempts is an optional maximum number of attempts to connect to the
target actor. Use None
for no limit. If reconnectAfter
is None
this value is ignored.
Nested Class Summary | |
---|---|
static class |
ReliableProxy.Active$
|
static class |
ReliableProxy.Connecting$
|
static class |
ReliableProxy.Idle$
|
static class |
ReliableProxy.Message
|
static class |
ReliableProxy.Message$
|
static class |
ReliableProxy.ProxyTerminated
ProxyTerminated is sent to transition subscribers during postStop . |
static class |
ReliableProxy.ProxyTerminated$
|
static class |
ReliableProxy.Receiver
|
static interface |
ReliableProxy.State
|
static class |
ReliableProxy.TargetChanged
TargetChanged is sent to transition subscribers when the initial connection is made
the target and when the target ActorRef has changed (for example, the target system
crashed and has been restarted). |
static class |
ReliableProxy.TargetChanged$
|
static class |
ReliableProxy.Unsent
|
static class |
ReliableProxy.Unsent$
|
Nested classes/interfaces inherited from interface akka.actor.FSM |
---|
FSM.$minus$greater$, FSM.CurrentState<S>, FSM.CurrentState$, FSM.Event<D>, FSM.Event$, FSM.Failure, FSM.Failure$, FSM.LogEntry<S,D>, FSM.LogEntry$, FSM.Normal$, FSM.NullFunction$, FSM.Reason, FSM.Shutdown$, FSM.State$, FSM.StateTimeout$, FSM.StopEvent<S,D>, FSM.StopEvent$, FSM.SubscribeTransitionCallBack, FSM.SubscribeTransitionCallBack$, FSM.TimeoutMarker, FSM.TimeoutMarker$, FSM.Timer, FSM.Timer$, FSM.TransformHelper, FSM.Transition<S>, FSM.Transition$, FSM.UnsubscribeTransitionCallBack, FSM.UnsubscribeTransitionCallBack$ |
Nested classes/interfaces inherited from interface akka.actor.Actor |
---|
Actor.emptyBehavior$ |
Constructor Summary | |
---|---|
ReliableProxy(ActorPath targetPath,
scala.concurrent.duration.FiniteDuration retryAfter,
scala.Option<scala.concurrent.duration.FiniteDuration> reconnectAfter,
scala.Option<java.lang.Object> maxConnectAttempts)
|
Method Summary | |
---|---|
static ReliableProxy.Active$ |
active()
|
int |
attemptedReconnects()
|
static int |
compare(int a,
int b)
Wrap-around aware comparison of integers: differences limited to 2**31-1 in magnitude will work correctly. |
void |
createTunnel(ActorRef target)
|
int |
currentSerial()
|
ActorRef |
currentTarget()
|
scala.concurrent.duration.FiniteDuration |
defaultConnectInterval()
|
static ReliableProxy.Idle$ |
idle()
|
ReliableProxy.Connecting$ |
initialState()
|
int |
lastAckSerial()
|
void |
logResend(int size)
|
scala.concurrent.duration.FiniteDuration |
nextBackoff()
Returns the next retry interval duration. |
int |
nextSerial()
|
void |
postStop()
User overridable callback. |
static Props |
props(ActorPath targetPath,
scala.concurrent.duration.FiniteDuration retryAfter)
Props with no reconnections. |
static Props |
props(ActorPath targetPath,
scala.concurrent.duration.FiniteDuration retryAfter,
scala.concurrent.duration.FiniteDuration reconnectAfter)
Props with no limit on reconnections. |
static Props |
props(ActorPath targetPath,
scala.concurrent.duration.FiniteDuration retryAfter,
scala.concurrent.duration.FiniteDuration reconnectAfter,
int maxReconnects)
Java API Props. |
static Props |
props(ActorPath targetPath,
scala.concurrent.duration.FiniteDuration retryAfter,
scala.Option<scala.concurrent.duration.FiniteDuration> reconnectAfter,
scala.Option<java.lang.Object> maxReconnects)
Scala API Props. |
static Props |
receiver(ActorRef target,
int currentSerial)
|
static ReliableProxy.Connecting$ |
reconnecting()
|
java.lang.String |
reconnectTimer()
|
scala.collection.immutable.Vector<ReliableProxy.Message> |
resend(scala.collection.immutable.Vector<ReliableProxy.Message> q)
|
java.lang.String |
resendTimer()
|
void |
resetBackoff()
Reset backoff interval. |
long |
retryGateClosedFor()
|
void |
scheduleReconnectTick()
|
void |
scheduleTick()
|
ReliableProxy.Message |
send(java.lang.Object msg,
ActorRef snd)
|
OneForOneStrategy |
supervisorStrategy()
User overridable definition the strategy to use for supervising child actors. |
FSM.State<ReliableProxy.State,scala.collection.immutable.Vector<ReliableProxy.Message>> |
terminated()
|
ActorRef |
tunnel()
|
scala.collection.immutable.Vector<ReliableProxy.Message> |
updateSerial(scala.collection.immutable.Vector<ReliableProxy.Message> q)
|
Methods inherited from class java.lang.Object |
---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
Methods inherited from interface akka.actor.LoggingFSM |
---|
advance, debugEvent, events, full, getLog, logDepth, pos, processEvent, states |
Methods inherited from interface akka.actor.FSM |
---|
applyState, cancelTimer, currentState, Event, generation, handleEvent, handleEventDefault, handleTransition, initialize, isStateTimerActive, isTimerActive, logTermination, makeTransition, nextState, nextStateData, onTermination, onTransition, processMsg, register, setStateTimeout, setTimer, startWith, stateData, stateFunctions, stateName, StateTimeout, stateTimeouts, stay, stop, stop, stop, StopEvent, terminate, terminateEvent, timeoutFuture, timerGen, timers, total2pf, transform, transitionEvent, when, whenUnhandled |
Methods inherited from interface akka.actor.Actor |
---|
aroundPostRestart, aroundPostStop, aroundPreRestart, aroundPreStart, aroundReceive, context, postRestart, preRestart, preStart, self, sender, unhandled |
Methods inherited from interface akka.routing.Listeners |
---|
gossip, listenerManagement, listeners |
Methods inherited from interface akka.actor.ActorLogging |
---|
_log, log |
Methods inherited from interface akka.contrib.pattern.ReliableProxyDebugLogging |
---|
addSelf, debug, enabled, logDebug, logDebug |
Constructor Detail |
---|
public ReliableProxy(ActorPath targetPath, scala.concurrent.duration.FiniteDuration retryAfter, scala.Option<scala.concurrent.duration.FiniteDuration> reconnectAfter, scala.Option<java.lang.Object> maxConnectAttempts)
Method Detail |
---|
public static Props props(ActorPath targetPath, scala.concurrent.duration.FiniteDuration retryAfter, scala.Option<scala.concurrent.duration.FiniteDuration> reconnectAfter, scala.Option<java.lang.Object> maxReconnects)
ReliableProxy
constructor.
targetPath
- (undocumented)retryAfter
- (undocumented)reconnectAfter
- (undocumented)maxReconnects
- (undocumented)
public static Props props(ActorPath targetPath, scala.concurrent.duration.FiniteDuration retryAfter, scala.concurrent.duration.FiniteDuration reconnectAfter, int maxReconnects)
ReliableProxy
constructor.
targetPath
- (undocumented)retryAfter
- (undocumented)reconnectAfter
- (undocumented)maxReconnects
- (undocumented)
public static Props props(ActorPath targetPath, scala.concurrent.duration.FiniteDuration retryAfter, scala.concurrent.duration.FiniteDuration reconnectAfter)
ReliableProxy
constructor.
targetPath
- (undocumented)retryAfter
- (undocumented)reconnectAfter
- (undocumented)
public static Props props(ActorPath targetPath, scala.concurrent.duration.FiniteDuration retryAfter)
ReliableProxy
constructor.
targetPath
- (undocumented)retryAfter
- (undocumented)
public static int compare(int a, int b)
a
- (undocumented)b
- (undocumented)
public static Props receiver(ActorRef target, int currentSerial)
public static ReliableProxy.Idle$ idle()
public static ReliableProxy.Active$ active()
public static ReliableProxy.Connecting$ reconnecting()
public ActorRef tunnel()
public int currentSerial()
public int lastAckSerial()
public ActorRef currentTarget()
public int attemptedReconnects()
public java.lang.String resendTimer()
public java.lang.String reconnectTimer()
public long retryGateClosedFor()
public scala.concurrent.duration.FiniteDuration defaultConnectInterval()
public ReliableProxy.Connecting$ initialState()
public void createTunnel(ActorRef target)
public OneForOneStrategy supervisorStrategy()
Actor
supervisorStrategy
in interface Actor
public void postStop()
Actor
postStop
in interface Actor
postStop
in interface FSM<ReliableProxy.State,scala.collection.immutable.Vector<ReliableProxy.Message>>
public void scheduleTick()
public int nextSerial()
public ReliableProxy.Message send(java.lang.Object msg, ActorRef snd)
public scala.collection.immutable.Vector<ReliableProxy.Message> updateSerial(scala.collection.immutable.Vector<ReliableProxy.Message> q)
public scala.collection.immutable.Vector<ReliableProxy.Message> resend(scala.collection.immutable.Vector<ReliableProxy.Message> q)
public void logResend(int size)
public FSM.State<ReliableProxy.State,scala.collection.immutable.Vector<ReliableProxy.Message>> terminated()
public void scheduleReconnectTick()
public void resetBackoff()
This and nextBackoff are meant to be implemented by subclasses.
public scala.concurrent.duration.FiniteDuration nextBackoff()
|
|||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | ||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |