Circuit Breaker
Why are they used?
A circuit breaker is used to provide stability and prevent cascading failures in distributed systems. These should be used in conjunction with judicious timeouts at the interfaces between remote systems to prevent the failure of a single component from bringing down all components.
As an example, we have a web application interacting with a remote third party web service. Let's say the third party has oversold their capacity and their database melts down under load. Assume that the database fails in such a way that it takes a very long time to hand back an error to the third party web service. This in turn makes calls fail after a long period of time. Back to our web application, the users have noticed that their form submissions take much longer seeming to hang. Well the users do what they know to do which is use the refresh button, adding more requests to their already running requests. This eventually causes the failure of the web application due to resource exhaustion. This will affect all users, even those who are not using functionality dependent on this third party web service.
Introducing circuit breakers on the web service call would cause the requests to begin to fail-fast, letting the user know that something is wrong and that they need not refresh their request. This also confines the failure behavior to only those users that are using functionality dependent on the third party, other users are no longer affected as there is no resource exhaustion. Circuit breakers can also allow savvy developers to mark portions of the site that use the functionality unavailable, or perhaps show some cached content as appropriate while the breaker is open.
The Akka library provides an implementation of a circuit breaker called akka.pattern.CircuitBreaker which has the behavior described below.
What do they do?
- During normal operation, a circuit breaker is in the Closed state:
- Exceptions or calls exceeding the configured callTimeout increment a failure counter
- Successes reset the failure count to zero
- When the failure counter reaches a maxFailures count, the breaker is tripped into Open state
- While in Open state:
- All calls fail-fast with a CircuitBreakerOpenException
- After the configured resetTimeout, the circuit breaker enters a Half-Open state
- In Half-Open state:
- The first call attempted is allowed through without failing fast
- All other calls fail-fast with an exception just as in Open state
- If the first call succeeds, the breaker is reset back to Closed state and the resetTimeout is reset
- If the first call fails, the breaker is tripped again into the Open state (as for exponential backoff circuit breaker, the resetTimeout is multiplied by the exponential backoff factor)
- State transition listeners:
- Callbacks can be provided for every state entry via onOpen, onClose, and onHalfOpen
- These are executed in the ExecutionContext provided.
Examples
Initialization
- Here's how a CircuitBreaker would be configured for:
- 5 maximum failures
- a call timeout of 10 seconds
- a reset timeout of 1 minute
Scala
import scala.concurrent.duration._
import akka.pattern.CircuitBreaker
import akka.pattern.pipe
import akka.actor.{ Actor, ActorLogging, ActorRef }
import scala.concurrent.Future
class DangerousActor extends Actor with ActorLogging {
import context.dispatcher
val breaker =
new CircuitBreaker(
context.system.scheduler,
maxFailures = 5,
callTimeout = 10.seconds,
resetTimeout = 1.minute).onOpen(notifyMeOnOpen())
def notifyMeOnOpen(): Unit =
log.warning("My CircuitBreaker is now open, and will not close for one minute")
Java
import akka.actor.UntypedActor;
import scala.concurrent.Future;
import akka.event.LoggingAdapter;
import scala.concurrent.duration.Duration;
import akka.pattern.CircuitBreaker;
import akka.event.Logging;
import static akka.pattern.Patterns.pipe;
import static akka.dispatch.Futures.future;
import java.util.concurrent.Callable;
public class DangerousJavaActor extends UntypedActor {
private final CircuitBreaker breaker;
private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
public DangerousJavaActor() {
this.breaker = new CircuitBreaker(
getContext().dispatcher(), getContext().system().scheduler(),
5, Duration.create(10, "s"), Duration.create(1, "m"))
.onOpen(new Runnable() {
public void run() {
notifyMeOnOpen();
}
});
}
public void notifyMeOnOpen() {
log.warning("My CircuitBreaker is now open, and will not close for one minute");
}
Call Protection
Here's how the CircuitBreaker would be used to protect an asynchronous call as well as a synchronous one:
Scala
def dangerousCall: String = "This really isn't that dangerous of a call after all"
def receive = {
case "is my middle name" =>
breaker.withCircuitBreaker(Future(dangerousCall)) pipeTo sender()
case "block for me" =>
sender() ! breaker.withSyncCircuitBreaker(dangerousCall)
}
Java
public String dangerousCall() {
return "This really isn't that dangerous of a call after all";
}
@Override
public void onReceive(Object message) {
if (message instanceof String) {
String m = (String) message;
if ("is my middle name".equals(m)) {
pipe(
breaker.callWithCircuitBreaker(() ->
future(() -> dangerousCall(), getContext().dispatcher())
), getContext().dispatcher()
).to(getSender());
}
if ("block for me".equals(m)) {
getSender().tell(breaker
.callWithSyncCircuitBreaker(
() -> dangerousCall()), getSelf());
}
}
}
Note
Using the CircuitBreaker companion object's apply or create methods will return a CircuitBreaker where callbacks are executed in the caller's thread. This can be useful if the asynchronous Future behavior is unnecessary, for example invoking a synchronous-only API.
Tell Pattern
The above Call Protection pattern works well when the return from a remote call is wrapped in a Future. However, when a remote call sends back a message or timeout to the caller Actor, the Call Protection pattern is awkward. CircuitBreaker doesn't support it natively at the moment, so you need to use below low-level power-user APIs, succeed and fail methods, as well as isClose, isOpen, isHalfOpen.
Note
The below examples doesn't make a remote call when the state is HalfOpen. Using the power-user APIs, it is your responsibility to judge when to make remote calls in HalfOpen.
Scala
import akka.actor.ReceiveTimeout
def receive = {
case "call" if breaker.isClosed => {
recipient ! "message"
}
case "response" => {
breaker.succeed()
}
case err: Throwable => {
breaker.fail()
}
case ReceiveTimeout => {
breaker.fail()
}
}
Java
@Override
public void onReceive(Object payload) {
if ( "call".equals(payload) && breaker.isClosed() ) {
target.tell("message", getSelf());
} else if ( "response".equals(payload) ) {
breaker.succeed();
} else if ( payload instanceof Throwable ) {
breaker.fail();
} else if ( payload instanceof ReceiveTimeout ) {
breaker.fail();
}
}
Contents