class ReliableProxy extends Actor with LoggingFSM[State, Vector[Message]] with ReliableProxyDebugLogging
A ReliableProxy is a means to wrap a remote actor reference in order to obtain certain improved delivery guarantees:
- as long as the proxy is not terminated before it sends all of its queued messages then no messages will be lost
- messages re-sent due to the first point will not be delivered out-of-order, message ordering is preserved
These guarantees are valid for the communication between the two end-points of the reliable “tunnel”, which usually spans an unreliable network.
Note that the ReliableProxy guarantees at-least-once, not exactly-once, delivery.
Delivery from the remote end-point to the target actor is still subject to in-JVM delivery semantics (i.e. not strictly guaranteed due to possible OutOfMemory situations or other VM errors).
You can create a reliable connection like this:
In Scala:
val proxy = context.actorOf(ReliableProxy.props(target, 100.millis, 120.seconds)
or in Java:
final ActorRef proxy = getContext().actorOf(ReliableProxy.props( target, Duration.create(100, "millis"), Duration.create(120, "seconds")));
Please note: the tunnel is uni-directional, and original sender information is retained, hence replies by the wrapped target reference will go back in the normal “unreliable” way unless also secured by a ReliableProxy from the remote end.
Message Types
This actor is an akka.actor.FSM, hence it offers the service of
transition callbacks to those actors which subscribe using the
SubscribeTransitionCallBack
and UnsubscribeTransitionCallBack
messages; see akka.actor.FSM for more documentation. The proxy will
transition into ReliableProxy.Active
state when ACKs
are outstanding and return to the ReliableProxy.Idle
state when every message send so far has been confirmed by the peer end-point.
The initial state of the proxy is ReliableProxy.Connecting
. In this state the
proxy will repeatedly send akka.actor.Identify messages to ActorSelection(targetPath)
in order to obtain a new ActorRef
for the target. When an akka.actor.ActorIdentity
for the target is received a new tunnel will be created, a ReliableProxy.TargetChanged
message containing the target ActorRef
will be sent to the proxy's transition subscribers
and the proxy will transition into either the ReliableProxy.Idle
or ReliableProxy.Active
state, depending if there are any outstanding messages that need to be delivered. If
maxConnectAttempts
is defined this actor will stop itself after Identify
is sent
maxConnectAttempts
times.
While in the Idle
or Active
states, if a communication failure causes the tunnel to
terminate via Remote Deathwatch the proxy will transition into the ReliableProxy.Connecting
state as described above. After reconnecting TargetChanged
will be sent only if the target
ActorRef
has changed.
If this actor is stopped and it still has outstanding messages a
ReliableProxy.ProxyTerminated message will be sent to the
transition subscribers. It contains an Unsent
object with the outstanding messages.
If an ReliableProxy.Unsent message is sent to this actor the messages contained within it will be relayed through the tunnel to the target.
Any other message type sent to this actor will be delivered via a remote-deployed child actor to the designated target.
Failure Cases
All failures of either the local or the remote end-point are escalated to the parent of this actor; there are no specific error cases which are predefined.
Arguments
See the constructor below for the arguments for this actor. However, prefer using akka.contrib.pattern.ReliableProxy#props to this actor's constructor.
- Annotations
- @deprecated
- Deprecated
(Since version 2.5.0) Use AtLeastOnceDelivery instead
- Source
- ReliableProxy.scala
- Alphabetic
- By Inheritance
- ReliableProxy
- ReliableProxyDebugLogging
- LoggingFSM
- FSM
- ActorLogging
- Listeners
- Actor
- AnyRef
- Any
- by CollectionsHaveToParArray
- by any2stringadd
- by StringFormat
- by Ensuring
- by ArrowAssoc
- Hide All
- Show All
- Public
- All
Instance Constructors
-
new
ReliableProxy(targetPath: ActorPath, retryAfter: FiniteDuration, reconnectAfter: Option[FiniteDuration], maxConnectAttempts: Option[Int])
- targetPath
is the
ActorPath
to the actor to which all messages will be forwarded.targetPath
can point to a local or remote actor, but the tunnel endpoint will be deployed remotely on the node where the target actor lives.- retryAfter
is the ACK timeout after which all outstanding messages will be resent. There is no limit on the queue size or the number of retries.
- reconnectAfter
is an optional interval between connection attempts. It is also used as the interval between receiving a
Terminated
for the tunnel and attempting to reconnect to the target actor. The minimum recommended value for this is the value of the configuration settingakka.remote.retry-gate-closed-for
. UseNone
to never reconnect after a disconnection.- maxConnectAttempts
is an optional maximum number of attempts to connect to the target actor. Use
None
for no limit. IfreconnectAfter
isNone
this value is ignored.
Type Members
-
final
class
TransformHelper extends AnyRef
- Definition Classes
- FSM
-
type
Event = actor.FSM.Event[Vector[Message]]
- Definition Classes
- FSM
-
type
Receive = PartialFunction[Any, Unit]
- Definition Classes
- Actor
-
type
State = actor.FSM.State[ReliableProxy.State, Vector[Message]]
- Definition Classes
- FSM
-
type
StateFunction = PartialFunction[Event, State]
- Definition Classes
- FSM
-
type
StopEvent = actor.FSM.StopEvent[ReliableProxy.State, Vector[Message]]
- Definition Classes
- FSM
-
type
Timeout = Option[FiniteDuration]
- Definition Classes
- FSM
-
type
TransitionHandler = PartialFunction[(ReliableProxy.State, ReliableProxy.State), Unit]
- Definition Classes
- FSM
Value Members
-
final
def
!=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
final
def
##(): Int
- Definition Classes
- AnyRef → Any
-
def
+(other: String): String
- Implicit
- This member is added by an implicit conversion from ReliableProxy to any2stringadd[ReliableProxy] performed by method any2stringadd in scala.Predef.
- Definition Classes
- any2stringadd
-
val
->: actor.FSM.->.type
This extractor is just convenience for matching a (S, S) pair, including a reminder what the new state is.
This extractor is just convenience for matching a (S, S) pair, including a reminder what the new state is.
- Definition Classes
- FSM
-
final
def
==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
val
Event: actor.FSM.Event.type
- Definition Classes
- FSM
-
val
StateTimeout: actor.FSM.StateTimeout.type
This case object is received in case of a state timeout.
This case object is received in case of a state timeout.
- Definition Classes
- FSM
-
val
StopEvent: actor.FSM.StopEvent.type
- Definition Classes
- FSM
-
def
addSelf(template: String): String
- Definition Classes
- ReliableProxyDebugLogging
-
def
aroundPostRestart(reason: Throwable): Unit
INTERNAL API.
INTERNAL API.
Can be overridden to intercept calls to
postRestart
. CallspostRestart
by default.- Attributes
- protected[akka]
- Definition Classes
- Actor
- Annotations
- @InternalApi()
-
def
aroundPostStop(): Unit
INTERNAL API.
INTERNAL API.
Can be overridden to intercept calls to
postStop
. CallspostStop
by default.- Attributes
- protected[akka]
- Definition Classes
- Actor
- Annotations
- @InternalApi()
-
def
aroundPreRestart(reason: Throwable, message: Option[Any]): Unit
INTERNAL API.
INTERNAL API.
Can be overridden to intercept calls to
preRestart
. CallspreRestart
by default.- Attributes
- protected[akka]
- Definition Classes
- Actor
- Annotations
- @InternalApi()
-
def
aroundPreStart(): Unit
INTERNAL API.
INTERNAL API.
Can be overridden to intercept calls to
preStart
. CallspreStart
by default.- Attributes
- protected[akka]
- Definition Classes
- Actor
- Annotations
- @InternalApi()
-
def
aroundReceive(receive: actor.Actor.Receive, msg: Any): Unit
INTERNAL API.
INTERNAL API.
Can be overridden to intercept calls to this actor's current behavior.
- receive
current behavior.
- msg
current message.
- Attributes
- protected[akka]
- Definition Classes
- Actor
- Annotations
- @InternalApi()
-
final
def
asInstanceOf[T0]: T0
- Definition Classes
- Any
- var attemptedReconnects: Int
-
final
def
cancelTimer(name: String): Unit
Cancel named timer, ensuring that the message is not subsequently delivered (no race).
Cancel named timer, ensuring that the message is not subsequently delivered (no race).
- name
of the timer to cancel
- Definition Classes
- FSM
-
def
clone(): AnyRef
- Attributes
- protected[java.lang]
- Definition Classes
- AnyRef
- Annotations
- @native() @HotSpotIntrinsicCandidate() @throws( ... )
-
implicit
val
context: ActorContext
Scala API: Stores the context for this actor, including self, and sender.
Scala API: Stores the context for this actor, including self, and sender. It is implicit to support operations such as
forward
.WARNING: Only valid within the Actor itself, so do not close over it and publish it to other threads!
akka.actor.ActorContext is the Scala API.
getContext
returns a akka.actor.AbstractActor.ActorContext, which is the Java API of the actor context.- Definition Classes
- Actor
- def createTunnel(target: ActorRef): Unit
- var currentSerial: Int
- var currentTarget: ActorRef
-
val
debug: Boolean
- Definition Classes
- ReliableProxyDebugLogging
- val defaultConnectInterval: FiniteDuration
-
def
enabled: Boolean
- Definition Classes
- ReliableProxyDebugLogging
-
def
ensuring(cond: (ReliableProxy) ⇒ Boolean, msg: ⇒ Any): ReliableProxy
- Implicit
- This member is added by an implicit conversion from ReliableProxy to Ensuring[ReliableProxy] performed by method Ensuring in scala.Predef.
- Definition Classes
- Ensuring
-
def
ensuring(cond: (ReliableProxy) ⇒ Boolean): ReliableProxy
- Implicit
- This member is added by an implicit conversion from ReliableProxy to Ensuring[ReliableProxy] performed by method Ensuring in scala.Predef.
- Definition Classes
- Ensuring
-
def
ensuring(cond: Boolean, msg: ⇒ Any): ReliableProxy
- Implicit
- This member is added by an implicit conversion from ReliableProxy to Ensuring[ReliableProxy] performed by method Ensuring in scala.Predef.
- Definition Classes
- Ensuring
-
def
ensuring(cond: Boolean): ReliableProxy
- Implicit
- This member is added by an implicit conversion from ReliableProxy to Ensuring[ReliableProxy] performed by method Ensuring in scala.Predef.
- Definition Classes
- Ensuring
-
final
def
eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
-
def
equals(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
def
formatted(fmtstr: String): String
- Implicit
- This member is added by an implicit conversion from ReliableProxy to StringFormat[ReliableProxy] performed by method StringFormat in scala.Predef.
- Definition Classes
- StringFormat
- Annotations
- @inline()
-
final
def
getClass(): Class[_]
- Definition Classes
- AnyRef → Any
- Annotations
- @native() @HotSpotIntrinsicCandidate()
-
def
getLog: IndexedSeq[LogEntry[ReliableProxy.State, Vector[Message]]]
Retrieve current rolling log in oldest-first order.
Retrieve current rolling log in oldest-first order. The log is filled with each incoming event before processing by the user supplied state handler. The log entries are lost when this actor is restarted.
- Attributes
- protected
- Definition Classes
- LoggingFSM
-
def
gossip(msg: Any)(implicit sender: ActorRef = Actor.noSender): Unit
Sends the supplied message to all current listeners using the provided sender() as sender.
Sends the supplied message to all current listeners using the provided sender() as sender.
- Attributes
- protected
- Definition Classes
- Listeners
-
final
def
goto(nextStateName: ReliableProxy.State): State
Produce transition to other state.
Produce transition to other state. Return this from a state function in order to effect the transition.
This method always triggers transition events, even for
A -> A
transitions. If you want to stay in the same state without triggering an state transition event use #stay instead.- nextStateName
state designator for the next state
- returns
state transition descriptor
- Definition Classes
- FSM
-
def
hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native() @HotSpotIntrinsicCandidate()
- val initialState: Connecting.type
-
final
def
initialize(): Unit
Verify existence of initial state and setup timers.
Verify existence of initial state and setup timers. This should be the last call within the constructor, or akka.actor.Actor#preStart and akka.actor.Actor#postRestart
An initial
currentState -> currentState
notification will be triggered by calling this method.- Definition Classes
- FSM
- See also
-
final
def
isInstanceOf[T0]: Boolean
- Definition Classes
- Any
-
final
def
isTimerActive(name: String): Boolean
Inquire whether the named timer is still active.
Inquire whether the named timer is still active. Returns true unless the timer does not exist, has previously been canceled or if it was a single-shot timer whose message was already received.
- Definition Classes
- FSM
- var lastAckSerial: Int
-
def
listenerManagement: actor.Actor.Receive
Chain this into the receive function.
Chain this into the receive function.
def receive = listenerManagement orElse …
- Attributes
- protected
- Definition Classes
- Listeners
-
val
listeners: Set[ActorRef]
- Attributes
- protected
- Definition Classes
- Listeners
-
def
log: LoggingAdapter
- Definition Classes
- ActorLogging
-
def
logDebug(template: String, arg1: Any): Unit
- Definition Classes
- ReliableProxyDebugLogging
-
def
logDebug(template: String, arg1: Any, arg2: Any): Unit
- Definition Classes
- ReliableProxyDebugLogging
-
def
logDepth: Int
- Definition Classes
- LoggingFSM
- def logResend(size: Int): Unit
-
def
logTermination(reason: Reason): Unit
By default FSM.Failure is logged at error level and other reason types are not logged.
By default FSM.Failure is logged at error level and other reason types are not logged. It is possible to override this behavior.
- Attributes
- protected
- Definition Classes
- FSM
-
final
def
ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
-
def
nextBackoff(): FiniteDuration
Returns the next retry interval duration.
Returns the next retry interval duration. By default each interval is the same, reconnectAfter.
- def nextSerial(): Int
-
final
def
nextStateData: Vector[Message]
Return next state data (available in onTransition handlers)
Return next state data (available in onTransition handlers)
- Definition Classes
- FSM
-
final
def
notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native() @HotSpotIntrinsicCandidate()
-
final
def
notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native() @HotSpotIntrinsicCandidate()
-
final
def
onTermination(terminationHandler: PartialFunction[StopEvent, Unit]): Unit
Set handler which is called upon termination of this FSM actor.
Set handler which is called upon termination of this FSM actor. Calling this method again will overwrite the previous contents.
- Definition Classes
- FSM
-
final
def
onTransition(transitionHandler: TransitionHandler): Unit
Set handler which is called upon each state transition, i.e.
Set handler which is called upon each state transition, i.e. not when staying in the same state. This may use the pair extractor defined in the FSM companion object like so:
onTransition { case Old -> New => doSomething }
It is also possible to supply a 2-ary function object:
onTransition(handler _) private def handler(from: S, to: S) { ... }
The underscore is unfortunately necessary to enable the nicer syntax shown above (it uses the implicit conversion total2pf under the hood).
Multiple handlers may be installed, and every one of them will be called, not only the first one matching.
- Definition Classes
- FSM
-
def
postRestart(reason: Throwable): Unit
User overridable callback: By default it calls
preStart()
.User overridable callback: By default it calls
preStart()
.- reason
the Throwable that caused the restart to happen Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash.
- Definition Classes
- Actor
- Annotations
- @throws( classOf[Exception] )
-
def
postStop(): Unit
Call
onTermination
hook; if you want to retain this behavior when overriding make sure to callsuper.postStop()
.Call
onTermination
hook; if you want to retain this behavior when overriding make sure to callsuper.postStop()
.Please note that this method is called by default from
preRestart()
, so override that one ifonTermination
shall not be called during restart.- Definition Classes
- ReliableProxy → FSM → Actor
-
def
preRestart(reason: Throwable, message: Option[Any]): Unit
Scala API: User overridable callback: By default it disposes of all children and then calls
postStop()
.Scala API: User overridable callback: By default it disposes of all children and then calls
postStop()
.- reason
the Throwable that caused the restart to happen
- message
optionally the current message the actor processed when failing, if applicable Is called on a crashed Actor right BEFORE it is restarted to allow clean up of resources before Actor is terminated.
- Definition Classes
- Actor
- Annotations
- @throws( classOf[Exception] )
-
def
preStart(): Unit
User overridable callback.
User overridable callback.
Is called when an Actor is started. Actors are automatically started asynchronously when created. Empty default implementation.
- Definition Classes
- Actor
- Annotations
- @throws( classOf[Exception] )
-
def
receive: Receive
Scala API: This defines the initial actor behavior, it must return a partial function with the actor logic.
- val reconnectTimer: String
- def resend(q: Vector[Message]): Vector[Message]
- val resendTimer: String
-
def
resetBackoff(): Unit
Reset backoff interval.
Reset backoff interval.
This and nextBackoff are meant to be implemented by subclasses.
- val retryGateClosedFor: Long
- def scheduleReconnectTick(): Unit
- def scheduleTick(): Unit
-
implicit final
val
self: ActorRef
The 'self' field holds the ActorRef for this actor.
The 'self' field holds the ActorRef for this actor.
Can be used to send messages to itself:
self ! message
- Definition Classes
- Actor
- def send(msg: Any, snd: ActorRef): Message
-
final
def
sender(): ActorRef
The reference sender Actor of the last received message.
The reference sender Actor of the last received message. Is defined if the message was sent from another Actor, else
deadLetters
in akka.actor.ActorSystem.WARNING: Only valid within the Actor itself, so do not close over it and publish it to other threads!
- Definition Classes
- Actor
-
final
def
setStateTimeout(state: ReliableProxy.State, timeout: Timeout): Unit
Set state timeout explicitly.
Set state timeout explicitly. This method can safely be used from within a state handler.
- Definition Classes
- FSM
-
final
def
setTimer(name: String, msg: Any, timeout: FiniteDuration, repeat: Boolean = false): Unit
Schedule named timer to deliver message after given delay, possibly repeating.
Schedule named timer to deliver message after given delay, possibly repeating. Any existing timer with the same name will automatically be canceled before adding the new timer.
- name
identifier to be used with cancelTimer()
- msg
message to be delivered
- timeout
delay of first message delivery and between subsequent messages
- repeat
send once if false, scheduleAtFixedRate if true
- Definition Classes
- FSM
-
final
def
startWith(stateName: ReliableProxy.State, stateData: Vector[Message], timeout: Timeout = None): Unit
Set initial state.
Set initial state. Call this method from the constructor before the #initialize method. If different state is needed after a restart this method, followed by #initialize, can be used in the actor life cycle hooks akka.actor.Actor#preStart and akka.actor.Actor#postRestart.
- stateName
initial state designator
- stateData
initial state data
- timeout
state timeout for the initial state, overriding the default timeout for that state
- Definition Classes
- FSM
-
final
def
stateData: Vector[Message]
Return current state data (i.e.
Return current state data (i.e. object of type D)
- Definition Classes
- FSM
-
final
def
stateName: ReliableProxy.State
Return current state name (i.e.
Return current state name (i.e. object of type S)
- Definition Classes
- FSM
-
final
def
stay(): State
Produce "empty" transition descriptor.
Produce "empty" transition descriptor. Return this from a state function when no state change is to be effected.
No transition event will be triggered by #stay. If you want to trigger an event like
S -> S
foronTransition
to handle usegoto
instead.- returns
descriptor for staying in current state
- Definition Classes
- FSM
-
final
def
stop(reason: Reason, stateData: Vector[Message]): State
Produce change descriptor to stop this FSM actor including specified reason.
Produce change descriptor to stop this FSM actor including specified reason.
- Definition Classes
- FSM
-
final
def
stop(reason: Reason): State
Produce change descriptor to stop this FSM actor including specified reason.
Produce change descriptor to stop this FSM actor including specified reason.
- Definition Classes
- FSM
-
final
def
stop(): State
Produce change descriptor to stop this FSM actor with reason "Normal".
Produce change descriptor to stop this FSM actor with reason "Normal".
- Definition Classes
- FSM
-
def
supervisorStrategy: OneForOneStrategy
User overridable definition the strategy to use for supervising child actors.
User overridable definition the strategy to use for supervising child actors.
- Definition Classes
- ReliableProxy → Actor
-
final
def
synchronized[T0](arg0: ⇒ T0): T0
- Definition Classes
- AnyRef
- def terminated(): State
-
def
toParArray: ParArray[T]
- Implicit
- This member is added by an implicit conversion from ReliableProxy to CollectionsHaveToParArray[ReliableProxy, T] performed by method CollectionsHaveToParArray in scala.collection.parallel. This conversion will take place only if an implicit value of type (ReliableProxy) ⇒ GenTraversableOnce[T] is in scope.
- Definition Classes
- CollectionsHaveToParArray
-
def
toString(): String
- Definition Classes
- AnyRef → Any
-
implicit final
def
total2pf(transitionHandler: (ReliableProxy.State, ReliableProxy.State) ⇒ Unit): TransitionHandler
Convenience wrapper for using a total function instead of a partial function literal.
Convenience wrapper for using a total function instead of a partial function literal. To be used with onTransition.
- Definition Classes
- FSM
-
final
def
transform(func: StateFunction): TransformHelper
- Definition Classes
- FSM
- var tunnel: ActorRef
-
def
unhandled(message: Any): Unit
User overridable callback.
User overridable callback.
Is called when a message isn't handled by the current behavior of the actor by default it fails with either a akka.actor.DeathPactException (in case of an unhandled akka.actor.Terminated message) or publishes an akka.actor.UnhandledMessage to the actor's system's akka.event.EventStream
- Definition Classes
- Actor
- def updateSerial(q: Vector[Message]): Vector[Message]
-
final
def
wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
final
def
wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @native() @throws( ... )
-
final
def
wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
final
def
when(stateName: ReliableProxy.State, stateTimeout: FiniteDuration = null)(stateFunction: StateFunction): Unit
Insert a new StateFunction at the end of the processing chain for the given state.
Insert a new StateFunction at the end of the processing chain for the given state. If the stateTimeout parameter is set, entering this state without a differing explicit timeout setting will trigger a StateTimeout event; the same is true when using #stay.
- stateName
designator for the state
- stateTimeout
default state timeout for this state
- stateFunction
partial function describing response to input
- Definition Classes
- FSM
-
final
def
whenUnhandled(stateFunction: StateFunction): Unit
Set handler which is called upon reception of unhandled messages.
Set handler which is called upon reception of unhandled messages. Calling this method again will overwrite the previous contents.
The current state may be queried using
.stateName
- Definition Classes
- FSM
-
def
→[B](y: B): (ReliableProxy, B)
- Implicit
- This member is added by an implicit conversion from ReliableProxy to ArrowAssoc[ReliableProxy] performed by method ArrowAssoc in scala.Predef.
- Definition Classes
- ArrowAssoc
Shadowed Implicit Value Members
-
def
->[B](y: B): (ReliableProxy, B)
- Implicit
- This member is added by an implicit conversion from ReliableProxy to ArrowAssoc[ReliableProxy] performed by method ArrowAssoc in scala.Predef.
- Shadowing
- This implicitly inherited member is shadowed by one or more members in this class.
To access this member you can use a type ascription:(reliableProxy: ArrowAssoc[ReliableProxy]).->(y)
- Definition Classes
- ArrowAssoc
- Annotations
- @inline()