Circuit Breaker

Why are they used?

A circuit breaker is used to provide stability and prevent cascading failures in distributed systems. These should be used in conjunction with judicious timeouts at the interfaces between remote systems to prevent the failure of a single component from bringing down all components.

As an example, we have a web application interacting with a remote third party web service.
Let’s say the third party has oversold their capacity and their database melts down under load.
Assume that the database fails in such a way that it takes a very long time to hand back an error to the third party web service. This in turn makes calls fail after a long period of time. Back to our web application, the users have noticed that their form submissions take much longer seeming to hang. Well the users do what they know to do which is use the refresh button, adding more requests to their already running requests. This eventually causes the failure of the web application due to resource exhaustion. This will affect all users, even those who are not using functionality dependent on this third party web service.

Introducing circuit breakers on the web service call would cause the requests to begin to fail-fast, letting the user know that something is wrong and that they need not refresh their request. This also confines the failure behavior to only those users that are using functionality dependent on the third party, other users are no longer affected as there is no resource exhaustion. Circuit breakers can also allow savvy developers to mark portions of the site that use the functionality unavailable, or perhaps show some cached content as appropriate while the breaker is open.

The Akka library provides an implementation of a circuit breaker called CircuitBreakerCircuitBreaker which has the behavior described below.

What do they do?

  • During normal operation, a circuit breaker is in the Closed state:

    • Exceptions or calls exceeding the configured callTimeout increment a failure counter
    • Successes reset the failure count to zero
    • When the failure counter reaches a maxFailures count, the breaker is tripped into Open state
  • While in Open state:

  • In Half-Open state:

    • The first call attempted is allowed through without failing fast
    • All other calls fail-fast with an exception just as in Open state
    • If the first call succeeds, the breaker is reset back to Closed state and the resetTimeout is reset
    • If the first call fails, the breaker is tripped again into the Open state (as for exponential backoff circuit breaker, the resetTimeout is multiplied by the exponential backoff factor)
  • State transition listeners:

  • Calls result listeners:

circuit-breaker-states.png

Examples

Initialization

Here’s how a named CircuitBreakerCircuitBreaker is configured with the name data-access:

  • 5 maximum failures
  • a call timeout of 10 seconds
  • a reset timeout of 1 minute
sourceakka.circuit-breaker.data-access {
  max-failures = 5
  call-timeout = 10s
  reset-timeout = 1m
}

The circuit breaker is created on first access with the same name, subsequent lookups will return the same circuit breaker instance. Looking up the circuit breaker and using it looks like this:

Scala
sourceval circuitBreaker = CircuitBreaker("data-access")(context.system)
Java
sourceCircuitBreaker circuitBreaker =
    CircuitBreaker.lookup("data-access", context.getSystem());

Future & Synchronous based API

Once a circuit breaker actor has been initialized, interacting with that actor is done by either using the Future based API or the synchronous API. Both of these APIs are considered Call Protection because whether synchronously or asynchronously, the purpose of the circuit breaker is to protect your system from cascading failures while making a call to another service.

In the future based API, we use the withCircuitBreakercallWithCircuitBreakerCS which takes an asynchronous method (some method wrapped in a FutureCompletionState), for instance a call to retrieve data from a service, and we pipe the result back to the sender. If for some reason the service in this example isn’t responding, or there is another issue, the circuit breaker will open and stop trying to hit the service again and again until the timeout is reached.

Scala
sourceclass DataAccess(
    context: ActorContext[DataAccess.Command],
    id: String,
    service: ThirdPartyWebService,
    circuitBreaker: CircuitBreaker) {
  import DataAccess._

  private def active(): Behavior[Command] = {
    Behaviors.receiveMessagePartial {
      case Handle(value, replyTo) =>
        val futureResult: Future[Done] = circuitBreaker.withCircuitBreaker {
          service.call(id, value)
        }
        context.pipeToSelf(futureResult) {
          case Success(_)         => HandleSuceeded(replyTo)
          case Failure(exception) => HandleFailed(replyTo, exception)
        }
        Behaviors.same
      case HandleSuceeded(replyTo) =>
        replyTo ! StatusReply.Ack
        Behaviors.same
      case HandleFailed(replyTo, exception) =>
        context.log.warn("Failed to call web service", exception)
        replyTo ! StatusReply.error("Dependency service not available")
        Behaviors.same
    }

  }
}
Java
sourceclass DataAccess extends AbstractBehavior<DataAccess.Command> {

  public interface Command {}

  public static class Handle implements Command {
    final String value;
    final ActorRef<StatusReply<Done>> replyTo;

    public Handle(String value, ActorRef<StatusReply<Done>> replyTo) {
      this.value = value;
      this.replyTo = replyTo;
    }
  }

  private final class HandleFailed implements Command {
    final Throwable failure;
    final ActorRef<StatusReply<Done>> replyTo;

    public HandleFailed(Throwable failure, ActorRef<StatusReply<Done>> replyTo) {
      this.failure = failure;
      this.replyTo = replyTo;
    }
  }

  private final class HandleSuceeded implements Command {
    final ActorRef<StatusReply<Done>> replyTo;

    public HandleSuceeded(ActorRef<StatusReply<Done>> replyTo) {
      this.replyTo = replyTo;
    }
  }

  private final class CircuitBreakerStateChange implements Command {
    final String newState;

    public CircuitBreakerStateChange(String newState) {
      this.newState = newState;
    }
  }

  public static Behavior<Command> create(String id, ThirdPartyWebService service) {
    return Behaviors.setup(
        context -> {
          CircuitBreaker circuitBreaker =
              CircuitBreaker.lookup("data-access", context.getSystem());
          return new DataAccess(context, id, service, circuitBreaker);
        });
  }

  private final String id;
  private final ThirdPartyWebService service;
  private final CircuitBreaker circuitBreaker;

  public DataAccess(
      ActorContext<Command> context,
      String id,
      ThirdPartyWebService service,
      CircuitBreaker circuitBreaker) {
    super(context);
    this.id = id;
    this.service = service;
    this.circuitBreaker = circuitBreaker;
  }

  @Override
  public Receive<Command> createReceive() {
    return newReceiveBuilder()
        .onMessage(Handle.class, this::onHandle)
        .onMessage(HandleSuceeded.class, this::onHandleSucceeded)
        .onMessage(HandleFailed.class, this::onHandleFailed)
        .build();
  }

  private Behavior<Command> onHandle(Handle handle) {
    CompletionStage<Done> futureResult =
        circuitBreaker.callWithCircuitBreakerCS(() -> service.call(id, handle.value));
    getContext()
        .pipeToSelf(
            futureResult,
            (done, throwable) -> {
              if (throwable != null) {
                return new HandleFailed(throwable, handle.replyTo);
              } else {
                return new HandleSuceeded(handle.replyTo);
              }
            });
    return this;
  }

  private Behavior<Command> onHandleSucceeded(HandleSuceeded handleSuceeded) {
    handleSuceeded.replyTo.tell(StatusReply.ack());
    return this;
  }

  private Behavior<Command> onHandleFailed(HandleFailed handleFailed) {
    getContext().getLog().warn("Failed to call web service", handleFailed.failure);
    handleFailed.replyTo.tell(StatusReply.error("Dependency service not available"));
    return this;
  }

}

The Synchronous API would also wrap your call with the circuit breaker logic, however, it uses the withSyncCircuitBreakercallWithSyncCircuitBreaker and receives a method that is not wrapped in a FutureCompletionState.

The CircuitBreaker will execute all callbacks on the default system dispatcher.

Control failure count explicitly

By default, the circuit breaker treats Exception as failure in synchronized API, or failed FutureCompletionState as failure in future based API. On failure, the failure count will increment. If the failure count reaches the maxFailures, the circuit breaker will be opened. However, some applications may require certain exceptions to not increase the failure count. In other cases one may want to increase the failure count even if the call succeeded. Akka circuit breaker provides a way to achieve such use cases: withCircuitBreaker and withSyncCircuitBreakercallWithCircuitBreaker, callWithSyncCircuitBreaker and callWithCircuitBreakerCS.

All methods above accept an argument defineFailureFn

Type of defineFailureFn: Try[T] => BooleanBiFunction[Optional[T], Optional[Throwable], Boolean]

This is a function which takes in a Try[T] and returns a Boolean. The Try[T] correspond to the Future[T] of the protected call. The response of a protected call is modelled using Optional[T] for a successful return value and Optional[Throwable] for exceptions. This function should return true if the result of a call should increase the failure count, or false to not affect the count.

Scala
source
val evenNumberAsFailure: Try[Int] => Boolean = { case Success(n) => n % 2 == 0 case Failure(_) => true } val breaker = CircuitBreaker("dangerous-breaker") // this call will return 8888 and increase failure count at the same time breaker.withCircuitBreaker(Future(8888), evenNumberAsFailure)
Java
sourceBiFunction<Optional<Integer>, Optional<Throwable>, Boolean> evenNoAsFailure =
    (result, err) -> (result.isPresent() && result.get() % 2 == 0);

// this will return 8888 and increase failure count at the same time
return circuitBreaker.callWithSyncCircuitBreaker(() -> 8888, evenNoAsFailure);

Low level API

Instead of looking up a configured circuit breaker by name, it is also possible to construct it in the source code:

Scala
sourceimport akka.actor.typed.scaladsl.adapter._
val breaker =
  new CircuitBreaker(
    context.system.scheduler.toClassic,
    maxFailures = 5,
    callTimeout = 10.seconds,
    resetTimeout = 1.minute).onOpen(context.self ! BreakerOpen)
Java
sourcebreaker =
    CircuitBreaker.create(
            getContext().getSystem().classicSystem().getScheduler(),
            // maxFailures
            5,
            // callTimeout
            Duration.ofSeconds(10),
            // resetTimeout
            Duration.ofMinutes(1))
        .addOnOpenListener(() -> context.getSelf().tell(new BreakerOpen()));

This also allows for creating the circuit breaker with a specific execution context to run its callbacks on.

The low-level API allows you to describe the behavior of the CircuitBreakerCircuitBreaker in detail, including deciding what to return to the calling ActorActor in case of success or failure. This is especially useful when expecting the remote call to send a reply. CircuitBreakerCircuitBreaker doesn’t support Tell Protection (protecting against calls that expect a reply) natively at the moment. Thus, you need to use the low-level power-user APIs, succeedsucceed and failfail methods, as well as isClosedisClosed, isOpenisOpen, isHalfOpenisHalfOpen to implement it.

As can be seen in the examples below, a Tell Protection pattern could be implemented by using the succeedsucceed and failfail methods, which would count towards the CircuitBreakerCircuitBreaker counts. In the example, a call is made to the remote service if the breaker is closed or half open. Once a response is received, the succeedsucceed method is invoked, which tells the CircuitBreakerCircuitBreaker to keep the breaker closed. On the other hand, if an error or timeout is received we trigger a failfail, and the breaker accrues this failure towards its count for opening the breaker.

Scala
sourceobject CircuitBreakingIntermediateActor {
  sealed trait Command
  case class Call(payload: String, replyTo: ActorRef[StatusReply[Done]]) extends Command
  private case class OtherActorReply(reply: Try[Done], originalReplyTo: ActorRef[StatusReply[Done]]) extends Command
  private case object BreakerOpen extends Command

  def apply(recipient: ActorRef[OtherActor.Command]): Behavior[Command] =
    Behaviors.setup { context =>
      implicit val askTimeout: Timeout = 11.seconds
      import context.executionContext
      import akka.actor.typed.scaladsl.adapter._
      val breaker =
        new CircuitBreaker(
          context.system.scheduler.toClassic,
          maxFailures = 5,
          callTimeout = 10.seconds,
          resetTimeout = 1.minute).onOpen(context.self ! BreakerOpen)

      Behaviors.receiveMessage {
        case Call(payload, replyTo) =>
          if (breaker.isClosed || breaker.isHalfOpen) {
            context.askWithStatus(recipient, OtherActor.Call(payload, _))(OtherActorReply(_, replyTo))
          } else {
            replyTo ! StatusReply.error("Service unavailable")
          }
          Behaviors.same
        case OtherActorReply(reply, originalReplyTo) =>
          if (reply.isSuccess) breaker.succeed()
          else breaker.fail()
          originalReplyTo ! StatusReply.fromTry(reply)
          Behaviors.same
        case BreakerOpen =>
          context.log.warn("Circuit breaker open")
          Behaviors.same
      }
    }
}
Java
sourcestatic class CircuitBreakingIntermediateActor
    extends AbstractBehavior<CircuitBreakingIntermediateActor.Command> {

  public interface Command {}

  public static class Call implements Command {
    final String payload;
    final ActorRef<StatusReply<Done>> replyTo;

    public Call(String payload, ActorRef<StatusReply<Done>> replyTo) {
      this.payload = payload;
      this.replyTo = replyTo;
    }
  }

  private class OtherActorReply implements Command {
    final Optional<Throwable> failure;
    final ActorRef<StatusReply<Done>> originalReplyTo;

    public OtherActorReply(
        Optional<Throwable> failure, ActorRef<StatusReply<Done>> originalReplyTo) {
      this.failure = failure;
      this.originalReplyTo = originalReplyTo;
    }
  }

  private class BreakerOpen implements Command {}

  private final ActorRef<OtherActor.Command> target;
  private final CircuitBreaker breaker;

  public CircuitBreakingIntermediateActor(
      ActorContext<Command> context, ActorRef<OtherActor.Command> targetActor) {
    super(context);
    this.target = targetActor;
    breaker =
        CircuitBreaker.create(
                getContext().getSystem().classicSystem().getScheduler(),
                // maxFailures
                5,
                // callTimeout
                Duration.ofSeconds(10),
                // resetTimeout
                Duration.ofMinutes(1))
            .addOnOpenListener(() -> context.getSelf().tell(new BreakerOpen()));
  }

  @Override
  public Receive<Command> createReceive() {
    return newReceiveBuilder()
        .onMessage(Call.class, this::onCall)
        .onMessage(OtherActorReply.class, this::onOtherActorReply)
        .onMessage(BreakerOpen.class, this::breakerOpened)
        .build();
  }

  private Behavior<Command> onCall(Call call) {
    if (breaker.isClosed() || breaker.isHalfOpen()) {
      getContext()
          .askWithStatus(
              Done.class,
              target,
              Duration.ofSeconds(11),
              (replyTo) -> new OtherActor.Call(call.payload, replyTo),
              (done, failure) -> new OtherActorReply(Optional.ofNullable(failure), call.replyTo));
    } else {
      call.replyTo.tell(StatusReply.error("Service unavailable"));
    }
    return this;
  }

  private Behavior<Command> onOtherActorReply(OtherActorReply otherActorReply) {
    if (otherActorReply.failure.isPresent()) {
      breaker.fail();
      getContext().getLog().warn("Service failure", otherActorReply.failure.get());
      otherActorReply.originalReplyTo.tell(StatusReply.error("Service unavailable"));
    } else {
      breaker.succeed();
      otherActorReply.originalReplyTo.tell(StatusReply.ack());
    }
    return this;
  }

  private Behavior<Command> breakerOpened(BreakerOpen breakerOpen) {
    getContext().getLog().warn("Circuit breaker open");
    return this;
  }
}
Note

This example always makes remote calls when the state is HalfOpen. Using the power-user APIs, it is your responsibility to judge when to make remote calls in HalfOpen.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.