class TimerBasedThrottler extends Actor with FSM[State, Data]
A throttler that uses a timer to control the message delivery rate.
Throttling
A throttler is an actor that is defined through a target actor and a rate
(of type akka.contrib.throttle.Throttler.Rate). You set or change the target and rate at any time through the
akka.contrib.throttle.Throttler.SetTarget and akka.contrib.throttle.Throttler.SetRate
messages, respectively. When you send the throttler any other message msg
, it will
put the message msg
into an internal queue and eventually send all queued messages to the target, at
a speed that respects the given rate. If no target is currently defined then the messages will be queued
and will be delivered as soon as a target gets set.
A throttler understands actor messages of type akka.contrib.throttle.Throttler.SetTarget, akka.contrib.throttle.Throttler.SetRate, in addition to any other messages, which the throttler will consider as messages to be sent to the target.
Transparency
Notice that the throttler forward
s messages, i.e., the target will see the original message sender
(and not the throttler) as the sender of the message.
Persistence
Throttlers usually use an internal queue to keep the messages that need to be sent to the target. You therefore cannot rely on the throttler's inbox size in order to learn how much messages are outstanding.
It is left to the implementation whether the internal queue is persisted over application restarts or actor failure.
Processing messages
The target should process messages as fast as possible. If the target requires substantial time to
process messages, it should distribute its work to other actors (using for example something like
a BalancingDispatcher
), otherwise the resulting system will always work below
the threshold rate.
Example: Suppose the throttler has a rate of 3msg/s and the target requires 1s to process a message. This system will only process messages at a rate of 1msg/s: the target will receive messages at at most 3msg/s but as it handles them synchronously and each of them takes 1s, its inbox will grow and grow. In such a situation, the target should distribute its messages to a set of worker actors so that individual messages can be handled in parallel.
Example
For example, if you set a rate like "3 messages in 1 second", the throttler will send the first three messages immediately to the target actor but will need to impose a delay before sending out further messages:
// A simple actor that prints whatever it receives class Printer extends Actor { def receive = { case x => println(x) } } val printer = system.actorOf(Props[Printer], "printer") // The throttler for this example, setting the rate val throttler = system.actorOf(Props(classOf[TimerBasedThrottler], 3 msgsPer 1.second)) // Set the target throttler ! SetTarget(Some(printer)) // These three messages will be sent to the printer immediately throttler ! "1" throttler ! "2" throttler ! "3" // These two will wait at least until 1 second has passed throttler ! "4" throttler ! "5"
Implementation notes
This throttler implementation internally installs a timer that repeats every rate.durationInMillis
and enables rate.numberOfCalls
additional calls to take place. A TimerBasedThrottler
uses very few system resources, provided the rate's duration is not too
fine-grained (which would cause a lot of timer invocations); for example, it does not store the calling history
as other throttlers may need to do.
However, a TimerBasedThrottler
only provides weak guarantees on the rate (see also
this blog post):
- Only delivery times are taken into account: if, for example, the throttler is used to throttle
requests to an external web service then only the start times of the web requests are considered.
If a web request takes very long on the server then more than
rate.numberOfCalls
-many requests may be observed on the server in an interval of durationrate.durationInMillis()
. - There may be intervals of duration
rate.durationInMillis()
that contain more thanrate.numberOfCalls
message deliveries: aTimerBasedThrottler
only makes guarantees for the intervals of its own timer, namely that no more thanrate.numberOfCalls
-many messages are delivered within such intervals. Other intervals on the timeline may contain more calls.
For some applications, these guarantees may not be sufficient.
Known issues
- If you change the rate using
SetRate(rate)
, the actual rate may in fact be higher for the overlapping period (i.e.,durationInMillis()
) of the new and old rate. Therefore, changing the rate frequently is not recommended with the current implementation. - The queue of messages to be delivered is not persisted in any way; actor or system failure will cause the queued messages to be lost.
- Source
- TimerBasedThrottler.scala
- See also
- Alphabetic
- By Inheritance
- TimerBasedThrottler
- FSM
- ActorLogging
- Listeners
- Actor
- AnyRef
- Any
- by any2stringadd
- by StringFormat
- by Ensuring
- by ArrowAssoc
- Hide All
- Show All
- Public
- All
Type Members
-
final
class
TransformHelper
extends AnyRef
- Definition Classes
- FSM
-
type
Event = actor.FSM.Event[Data]
- Definition Classes
- FSM
-
type
Receive = PartialFunction[Any, Unit]
- Definition Classes
- Actor
-
type
State = actor.FSM.State[TimerBasedThrottler.State, Data]
- Definition Classes
- FSM
-
type
StateFunction = PartialFunction[Event, State]
- Definition Classes
- FSM
-
type
StopEvent = actor.FSM.StopEvent[TimerBasedThrottler.State, Data]
- Definition Classes
- FSM
-
type
Timeout = Option[FiniteDuration]
- Definition Classes
- FSM
-
type
TransitionHandler = PartialFunction[(TimerBasedThrottler.State, TimerBasedThrottler.State), Unit]
- Definition Classes
- FSM
Value Members
-
val
->: actor.FSM.->.type
This extractor is just convenience for matching a (S, S) pair, including a reminder what the new state is.
This extractor is just convenience for matching a (S, S) pair, including a reminder what the new state is.
- Definition Classes
- FSM
-
val
Event: actor.FSM.Event.type
- Definition Classes
- FSM
-
val
StateTimeout: actor.FSM.StateTimeout.type
This case object is received in case of a state timeout.
This case object is received in case of a state timeout.
- Definition Classes
- FSM
-
val
StopEvent: actor.FSM.StopEvent.type
- Definition Classes
- FSM
-
final
def
cancelTimer(name: String): Unit
Cancel named timer, ensuring that the message is not subsequently delivered (no race).
Cancel named timer, ensuring that the message is not subsequently delivered (no race).
- name
of the timer to cancel
- Definition Classes
- FSM
-
implicit
val
context: ActorContext
Stores the context for this actor, including self, and sender.
Stores the context for this actor, including self, and sender. It is implicit to support operations such as
forward
.WARNING: Only valid within the Actor itself, so do not close over it and publish it to other threads!
akka.actor.ActorContext is the Scala API.
getContext
returns a akka.actor.UntypedActorContext, which is the Java API of the actor context.- Definition Classes
- Actor
-
final
def
goto(nextStateName: TimerBasedThrottler.State): State
Produce transition to other state.
Produce transition to other state. Return this from a state function in order to effect the transition.
This method always triggers transition events, even for
A -> A
transitions. If you want to stay in the same state without triggering an state transition event use #stay instead.- nextStateName
state designator for the next state
- returns
state transition descriptor
- Definition Classes
- FSM
-
final
def
initialize(): Unit
Verify existence of initial state and setup timers.
Verify existence of initial state and setup timers. This should be the last call within the constructor, or akka.actor.Actor#preStart and akka.actor.Actor#postRestart
An initial
currentState -> currentState
notification will be triggered by calling this method.- Definition Classes
- FSM
- See also
-
final
def
isTimerActive(name: String): Boolean
Inquire whether the named timer is still active.
Inquire whether the named timer is still active. Returns true unless the timer does not exist, has previously been canceled or if it was a single-shot timer whose message was already received.
- Definition Classes
- FSM
-
def
log: LoggingAdapter
- Definition Classes
- ActorLogging
-
final
def
nextStateData: Data
Return next state data (available in onTransition handlers)
Return next state data (available in onTransition handlers)
- Definition Classes
- FSM
-
final
def
onTermination(terminationHandler: PartialFunction[StopEvent, Unit]): Unit
Set handler which is called upon termination of this FSM actor.
Set handler which is called upon termination of this FSM actor. Calling this method again will overwrite the previous contents.
- Definition Classes
- FSM
-
final
def
onTransition(transitionHandler: TransitionHandler): Unit
Set handler which is called upon each state transition, i.e.
Set handler which is called upon each state transition, i.e. not when staying in the same state. This may use the pair extractor defined in the FSM companion object like so:
onTransition { case Old -> New => doSomething }
It is also possible to supply a 2-ary function object:
onTransition(handler _) private def handler(from: S, to: S) { ... }
The underscore is unfortunately necessary to enable the nicer syntax shown above (it uses the implicit conversion total2pf under the hood).
Multiple handlers may be installed, and every one of them will be called, not only the first one matching.
- Definition Classes
- FSM
-
def
postRestart(reason: Throwable): Unit
User overridable callback: By default it calls
preStart()
.User overridable callback: By default it calls
preStart()
.- reason
the Throwable that caused the restart to happen Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash.
- Definition Classes
- Actor
- Annotations
- @throws( classOf[Exception] )
-
def
postStop(): Unit
Call
onTermination
hook; if you want to retain this behavior when overriding make sure to callsuper.postStop()
. -
def
preRestart(reason: Throwable, message: Option[Any]): Unit
User overridable callback: By default it disposes of all children and then calls
postStop()
.User overridable callback: By default it disposes of all children and then calls
postStop()
.- reason
the Throwable that caused the restart to happen
- message
optionally the current message the actor processed when failing, if applicable Is called on a crashed Actor right BEFORE it is restarted to allow clean up of resources before Actor is terminated.
- Definition Classes
- Actor
- Annotations
- @throws( classOf[Exception] )
-
def
preStart(): Unit
User overridable callback.
User overridable callback.
Is called when an Actor is started. Actors are automatically started asynchronously when created. Empty default implementation.
- Definition Classes
- Actor
- Annotations
- @throws( classOf[Exception] )
- var rate: Rate
-
def
receive: Receive
This defines the initial actor behavior, it must return a partial function with the actor logic.
-
implicit final
val
self: ActorRef
The 'self' field holds the ActorRef for this actor.
The 'self' field holds the ActorRef for this actor.
Can be used to send messages to itself:
self ! message
- Definition Classes
- Actor
-
final
def
sender(): ActorRef
The reference sender Actor of the last received message.
The reference sender Actor of the last received message. Is defined if the message was sent from another Actor, else
deadLetters
in akka.actor.ActorSystem.WARNING: Only valid within the Actor itself, so do not close over it and publish it to other threads!
- Definition Classes
- Actor
-
final
def
setStateTimeout(state: TimerBasedThrottler.State, timeout: Timeout): Unit
Set state timeout explicitly.
Set state timeout explicitly. This method can safely be used from within a state handler.
- Definition Classes
- FSM
-
final
def
setTimer(name: String, msg: Any, timeout: FiniteDuration, repeat: Boolean = false): Unit
Schedule named timer to deliver message after given delay, possibly repeating.
Schedule named timer to deliver message after given delay, possibly repeating. Any existing timer with the same name will automatically be canceled before adding the new timer.
- name
identifier to be used with cancelTimer()
- msg
message to be delivered
- timeout
delay of first message delivery and between subsequent messages
- repeat
send once if false, scheduleAtFixedRate if true
- Definition Classes
- FSM
-
final
def
startWith(stateName: TimerBasedThrottler.State, stateData: Data, timeout: Timeout = None): Unit
Set initial state.
Set initial state. Call this method from the constructor before the #initialize method. If different state is needed after a restart this method, followed by #initialize, can be used in the actor life cycle hooks akka.actor.Actor#preStart and akka.actor.Actor#postRestart.
- stateName
initial state designator
- stateData
initial state data
- timeout
state timeout for the initial state, overriding the default timeout for that state
- Definition Classes
- FSM
-
final
def
stateData: Data
Return current state data (i.e.
Return current state data (i.e. object of type D)
- Definition Classes
- FSM
-
final
def
stateName: TimerBasedThrottler.State
Return current state name (i.e.
Return current state name (i.e. object of type S)
- Definition Classes
- FSM
-
final
def
stay(): State
Produce "empty" transition descriptor.
Produce "empty" transition descriptor. Return this from a state function when no state change is to be effected.
No transition event will be triggered by #stay. If you want to trigger an event like
S -> S
foronTransition
to handle usegoto
instead.- returns
descriptor for staying in current state
- Definition Classes
- FSM
-
final
def
stop(reason: Reason, stateData: Data): State
Produce change descriptor to stop this FSM actor including specified reason.
Produce change descriptor to stop this FSM actor including specified reason.
- Definition Classes
- FSM
-
final
def
stop(reason: Reason): State
Produce change descriptor to stop this FSM actor including specified reason.
Produce change descriptor to stop this FSM actor including specified reason.
- Definition Classes
- FSM
-
final
def
stop(): State
Produce change descriptor to stop this FSM actor with reason "Normal".
Produce change descriptor to stop this FSM actor with reason "Normal".
- Definition Classes
- FSM
-
def
supervisorStrategy: SupervisorStrategy
User overridable definition the strategy to use for supervising child actors.
User overridable definition the strategy to use for supervising child actors.
- Definition Classes
- Actor
-
implicit final
def
total2pf(transitionHandler: (TimerBasedThrottler.State, TimerBasedThrottler.State) ⇒ Unit): TransitionHandler
Convenience wrapper for using a total function instead of a partial function literal.
Convenience wrapper for using a total function instead of a partial function literal. To be used with onTransition.
- Definition Classes
- FSM
-
final
def
transform(func: StateFunction): TransformHelper
- Definition Classes
- FSM
-
def
unhandled(message: Any): Unit
User overridable callback.
User overridable callback.
Is called when a message isn't handled by the current behavior of the actor by default it fails with either a akka.actor.DeathPactException (in case of an unhandled akka.actor.Terminated message) or publishes an akka.actor.UnhandledMessage to the actor's system's akka.event.EventStream
- Definition Classes
- Actor
-
final
def
when(stateName: TimerBasedThrottler.State, stateTimeout: FiniteDuration = null)(stateFunction: StateFunction): Unit
Insert a new StateFunction at the end of the processing chain for the given state.
Insert a new StateFunction at the end of the processing chain for the given state. If the stateTimeout parameter is set, entering this state without a differing explicit timeout setting will trigger a StateTimeout event; the same is true when using #stay.
- stateName
designator for the state
- stateTimeout
default state timeout for this state
- stateFunction
partial function describing response to input
- Definition Classes
- FSM
-
final
def
whenUnhandled(stateFunction: StateFunction): Unit
Set handler which is called upon reception of unhandled messages.
Set handler which is called upon reception of unhandled messages. Calling this method again will overwrite the previous contents.
The current state may be queried using
.stateName
- Definition Classes
- FSM