Circuit-Breaker Actor - Version 2.4.20

Circuit-Breaker Actor

This is an alternative implementation of the Akka Circuit Breaker Pattern. The main difference is that it is intended to be used only for request-reply interactions with an actor using the Circuit-Breaker as a proxy of the target one in order to provide the same failfast functionalities and a protocol similar to the circuit-breaker implementation in Akka.

### Usage

Let's assume we have an actor wrapping a back-end service and able to respond to Request calls with a Response object containing an Either[String, String] to map successful and failed responses. The service is also potentially slowing down because of the workload.

A simple implementation can be given by this class

object SimpleService {
  case class Request(content: String)
  case class Response(content: Either[String, String])
  case object ResetCount
}

/**
 * This is a simple actor simulating a service
 * - Becoming slower with the increase of frequency of input requests
 * - Failing around 30% of the requests
 */
class SimpleService extends Actor with ActorLogging {
  import SimpleService._

  var messageCount = 0

  import context.dispatcher

  context.system.scheduler.schedule(1.second, 1.second, self, ResetCount)

  override def receive = {
    case ResetCount 
      messageCount = 0

    case Request(content) 
      messageCount += 1
      // simulate workload
      Thread.sleep(100 * messageCount)
      // Fails around 30% of the times
      if (Random.nextInt(100) < 70) {
        sender ! Response(Right(s"Successfully processed $content"))
      } else {
        sender ! Response(Left(s"Failure processing $content"))
      }

  }
}

If we want to interface with this service using the Circuit Breaker we can use two approaches:

Using a non-conversational approach:

class CircuitBreaker(potentiallyFailingService: ActorRef) extends Actor with ActorLogging {
  import SimpleService._

  val serviceCircuitBreaker =
    context.actorOf(
      CircuitBreakerPropsBuilder(maxFailures = 3, callTimeout = 2.seconds, resetTimeout = 30.seconds)
        .copy(
          failureDetector = {
          _ match {
            case Response(Left(_))  true
            case _                  false
          }
        })
        .props(potentiallyFailingService),
      "serviceCircuitBreaker")

  override def receive: Receive = {
    case AskFor(requestToForward) 
      serviceCircuitBreaker ! Request(requestToForward)

    case Right(Response(content)) 
      //handle response
      log.info("Got successful response {}", content)

    case Response(Right(content)) 
      //handle response
      log.info("Got successful response {}", content)

    case Response(Left(content)) 
      //handle response
      log.info("Got failed response {}", content)

    case CircuitOpenFailure(failedMsg) 
      log.warning("Unable to send message {}", failedMsg)
  }
}

Using the ask pattern, in this case it is useful to be able to map circuit open failures to the same type of failures returned by the service (a Left[String] in our case):

class CircuitBreakerAsk(potentiallyFailingService: ActorRef) extends Actor with ActorLogging {
  import SimpleService._
  import akka.pattern._

  implicit val askTimeout: Timeout = 2.seconds

  val serviceCircuitBreaker =
    context.actorOf(
      CircuitBreakerPropsBuilder(maxFailures = 3, callTimeout = askTimeout, resetTimeout = 30.seconds)
        .copy(
          failureDetector = {
          _ match {
            case Response(Left(_))  true
            case _                  false
          }
        })
        .copy(
          openCircuitFailureConverter = { failure 
          Left(s"Circuit open when processing ${failure.failedMsg}")
        })
        .props(potentiallyFailingService),
      "serviceCircuitBreaker")

  import context.dispatcher

  override def receive: Receive = {
    case AskFor(requestToForward) 
      (serviceCircuitBreaker ? Request(requestToForward)).mapTo[Either[String, String]].onComplete {
        case Success(Right(successResponse)) 
          //handle response
          log.info("Got successful response {}", successResponse)

        case Success(Left(failureResponse)) 
          //handle response
          log.info("Got successful response {}", failureResponse)

        case Failure(exception) 
          //handle response
          log.info("Got successful response {}", exception)

      }
  }
}

If it is not possible to define define a specific error response, you can map the Open Circuit notification to a failure. That also means that your CircuitBreakerActor will be useful to protect you from time out for extra workload or temporary failures in the target actor. You can decide to do that in two ways:

The first is to use the askWithCircuitBreaker method on the ActorRef or ActorSelection instance pointing to your circuit breaker proxy (enabled by importing import akka.contrib.circuitbreaker.Implicits.askWithCircuitBreaker)

class CircuitBreakerAskWithCircuitBreaker(potentiallyFailingService: ActorRef) extends Actor with ActorLogging {
  import SimpleService._
  import akka.contrib.circuitbreaker.Implicits.askWithCircuitBreaker

  implicit val askTimeout: Timeout = 2.seconds

  val serviceCircuitBreaker =
    context.actorOf(
      CircuitBreakerPropsBuilder(maxFailures = 3, callTimeout = askTimeout, resetTimeout = 30.seconds)
        .props(target = potentiallyFailingService),
      "serviceCircuitBreaker")

  import context.dispatcher

  override def receive: Receive = {
    case AskFor(requestToForward) 
      serviceCircuitBreaker.askWithCircuitBreaker(Request(requestToForward)).mapTo[String].onComplete {
        case Success(successResponse) 
          //handle response
          log.info("Got successful response {}", successResponse)

        case Failure(exception) 
          //handle response
          log.info("Got successful response {}", exception)

      }
  }
}

The second is to map the future response of your ask pattern application with the failForOpenCircuit enabled by importing import akka.contrib.circuitbreaker.Implicits.futureExtensions

class CircuitBreakerAskWithFailure(potentiallyFailingService: ActorRef) extends Actor with ActorLogging {
  import SimpleService._
  import akka.pattern._
  import akka.contrib.circuitbreaker.Implicits.futureExtensions

  implicit val askTimeout: Timeout = 2.seconds

  val serviceCircuitBreaker =
    context.actorOf(
      CircuitBreakerPropsBuilder(maxFailures = 3, callTimeout = askTimeout, resetTimeout = 30.seconds)
        .props(target = potentiallyFailingService),
      "serviceCircuitBreaker")

  import context.dispatcher

  override def receive: Receive = {
    case AskFor(requestToForward) 
      (serviceCircuitBreaker ? Request(requestToForward)).failForOpenCircuit.mapTo[String].onComplete {
        case Success(successResponse) 
          //handle response
          log.info("Got successful response {}", successResponse)

        case Failure(exception) 
          //handle response
          log.info("Got successful response {}", exception)

      }
  }
}

#### Direct Communication With The Target Actor

To send messages to the target actor without expecting any response you can wrap your message in a TellOnly or a Passthrough envelope. The difference between the two is that TellOnly will forward the message only when in closed mode and Passthrough will do it in any state. You can for example use the Passthrough envelope to wrap a PoisonPill message to terminate the target actor. That will cause the circuit breaker proxy to be terminated too

Contents