|
|||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | ||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |
java.lang.Object akka.contrib.throttle.TimerBasedThrottler
public class TimerBasedThrottler
A throttler that uses a timer to control the message delivery rate.
== Throttling ==
A throttler is an actor that is defined through a target actor and a rate
(of type Throttler.Rate
). You set or change the target and rate at any time through the
Throttler.SetTarget
and Throttler.SetRate
messages, respectively. When you send the throttler any other message msg
, it will
put the message msg
into an internal queue and eventually send all queued messages to the target, at
a speed that respects the given rate. If no target is currently defined then the messages will be queued
and will be delivered as soon as a target gets set.
A throttler understands actor messages of type
Throttler.SetTarget
, Throttler.SetRate
, in
addition to any other messages, which the throttler will consider as messages to be sent to
the target.
== Transparency ==
Notice that the throttler forward
s messages, i.e., the target will see the original message sender
(and not the throttler) as the sender of the message.
== Persistence == Throttlers usually use an internal queue to keep the messages that need to be sent to the target. You therefore cannot rely on the throttler's inbox size in order to learn how much messages are outstanding.
It is left to the implementation whether the internal queue is persisted over application restarts or actor failure.
== Processing messages ==
The target should process messages as fast as possible. If the target requires substantial time to
process messages, it should distribute its work to other actors (using for example something like
a BalancingDispatcher
), otherwise the resulting system will always work below
the threshold rate.
Example: Suppose the throttler has a rate of 3msg/s and the target requires 1s to process a message. This system will only process messages at a rate of 1msg/s: the target will receive messages at at most 3msg/s but as it handles them synchronously and each of them takes 1s, its inbox will grow and grow. In such a situation, the target should distribute its messages to a set of worker actors so that individual messages can be handled in parallel.
==Example== For example, if you set a rate like "3 messages in 1 second", the throttler will send the first three messages immediately to the target actor but will need to impose a delay before sending out further messages:
// A simple actor that prints whatever it receives
class Printer extends Actor {
def receive = {
case x => println(x)
}
}
val printer = system.actorOf(Props[Printer], "printer")
// The throttler for this example, setting the rate
val throttler = system.actorOf(Props(classOf[TimerBasedThrottler], 3 msgsPer 1.second))
// Set the target
throttler ! SetTarget(Some(printer))
// These three messages will be sent to the printer immediately
throttler ! "1"
throttler ! "2"
throttler ! "3"
// These two will wait at least until 1 second has passed
throttler ! "4"
throttler ! "5"
==Implementation notes==
This throttler implementation internally installs a timer that repeats every rate.durationInMillis
and enables rate.numberOfCalls
additional calls to take place. A TimerBasedThrottler
uses very few system resources, provided the rate's duration is not too
fine-grained (which would cause a lot of timer invocations); for example, it does not store the calling history
as other throttlers may need to do.
However, a TimerBasedThrottler
only provides ''weak guarantees'' on the rate (see also
this blog post):
- Only ''delivery'' times are taken into account: if, for example, the throttler is used to throttle
requests to an external web service then only the start times of the web requests are considered.
If a web request takes very long on the server then more than rate.numberOfCalls
-many requests
may be observed on the server in an interval of duration rate.durationInMillis()
.
- There may be intervals of duration rate.durationInMillis()
that contain more than rate.numberOfCalls
message deliveries: a TimerBasedThrottler
only makes guarantees for the intervals
of its ''own'' timer, namely that no more than rate.numberOfCalls
-many messages are delivered within such intervals. Other intervals on the
timeline may contain more calls.
For some applications, these guarantees may not be sufficient.
==Known issues==
- If you change the rate using SetRate(rate)
, the actual rate may in fact be higher for the
overlapping period (i.e., durationInMillis()
) of the new and old rate. Therefore,
changing the rate frequently is not recommended with the current implementation.
- The queue of messages to be delivered is not persisted in any way; actor or system failure will
cause the queued messages to be lost.
Throttler
Nested Class Summary | |
---|---|
static class |
TimerBasedThrottler.Active$
|
static class |
TimerBasedThrottler.Data
|
static class |
TimerBasedThrottler.Data$
|
static class |
TimerBasedThrottler.Idle$
|
static class |
TimerBasedThrottler.Message
|
static class |
TimerBasedThrottler.Message$
|
static interface |
TimerBasedThrottler.State
|
static class |
TimerBasedThrottler.Tick$
|
Nested classes/interfaces inherited from interface akka.actor.FSM |
---|
FSM.$minus$greater$, FSM.CurrentState<S>, FSM.CurrentState$, FSM.Event<D>, FSM.Event$, FSM.Failure, FSM.Failure$, FSM.LogEntry<S,D>, FSM.LogEntry$, FSM.Normal$, FSM.NullFunction$, FSM.Reason, FSM.Shutdown$, FSM.State$, FSM.StateTimeout$, FSM.StopEvent<S,D>, FSM.StopEvent$, FSM.SubscribeTransitionCallBack, FSM.SubscribeTransitionCallBack$, FSM.TimeoutMarker, FSM.TimeoutMarker$, FSM.Timer, FSM.Timer$, FSM.TransformHelper, FSM.Transition<S>, FSM.Transition$, FSM.UnsubscribeTransitionCallBack, FSM.UnsubscribeTransitionCallBack$ |
Nested classes/interfaces inherited from interface akka.actor.Actor |
---|
Actor.emptyBehavior$ |
Constructor Summary | |
---|---|
TimerBasedThrottler(Throttler.Rate rate)
|
Method Summary | |
---|---|
Throttler.Rate |
rate()
|
Methods inherited from class java.lang.Object |
---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
Methods inherited from interface akka.actor.FSM |
---|
applyState, cancelTimer, currentState, debugEvent, Event, generation, handleEvent, handleEventDefault, handleTransition, initialize, isStateTimerActive, isTimerActive, logTermination, makeTransition, nextState, nextStateData, onTermination, onTransition, postStop, processEvent, processMsg, receive, register, setStateTimeout, setTimer, startWith, stateData, stateFunctions, stateName, StateTimeout, stateTimeouts, stay, stop, stop, stop, StopEvent, terminate, terminateEvent, timeoutFuture, timerGen, timers, total2pf, transform, transitionEvent, when, whenUnhandled |
Methods inherited from interface akka.actor.Actor |
---|
aroundPostRestart, aroundPostStop, aroundPreRestart, aroundPreStart, aroundReceive, context, postRestart, preRestart, preStart, self, sender, supervisorStrategy, unhandled |
Methods inherited from interface akka.routing.Listeners |
---|
gossip, listenerManagement, listeners |
Methods inherited from interface akka.actor.ActorLogging |
---|
_log, log |
Constructor Detail |
---|
public TimerBasedThrottler(Throttler.Rate rate)
Method Detail |
---|
public Throttler.Rate rate()
|
|||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | ||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |