onCompleteWithBreaker
Signature
def onCompleteWithBreaker[T](breaker: CircuitBreaker)(future: ⇒ Future[T]): Directive1[Try[T]]
Description
Evaluates its parameter of type Future[T]
CompletionStage<T>
protecting it with the specified CircuitBreakerCircuitBreaker. Refer to Circuit Breaker for a detailed description of this pattern.
If the CircuitBreakerCircuitBreaker is open, the request is rejected with a CircuitBreakerOpenRejectionCircuitBreakerOpenRejection. Note that in this case the request’s entity databytes stream is cancelled, and the connection is closed as a consequence.
Otherwise, the same behaviour provided by onComplete is to be expected.
Example
- Scala
-
def divide(a: Int, b: Int): Future[Int] = Future { a / b } val resetTimeout = 1.second val breaker = new CircuitBreaker(system.scheduler, maxFailures = 1, callTimeout = 5.seconds, resetTimeout ) val route = path("divide" / IntNumber / IntNumber) { (a, b) => onCompleteWithBreaker(breaker)(divide(a, b)) { case Success(value) => complete(s"The result was $value") case Failure(ex) => complete((InternalServerError, s"An error occurred: ${ex.getMessage}")) } } // tests: Get("/divide/10/2") ~> route ~> check { responseAs[String] shouldEqual "The result was 5" } Get("/divide/10/0") ~> Route.seal(route) ~> check { status shouldEqual InternalServerError responseAs[String] shouldEqual "An error occurred: / by zero" } // opens the circuit breaker Get("/divide/10/2") ~> route ~> check { rejection shouldBe a[CircuitBreakerOpenRejection] } Thread.sleep(resetTimeout.toMillis + 200) Get("/divide/10/2") ~> route ~> check { responseAs[String] shouldEqual "The result was 5" }
- Java
-
// import static scala.compat.java8.JFunction.func; // import static akka.http.javadsl.server.PathMatchers.*; final int maxFailures = 1; final FiniteDuration callTimeout = FiniteDuration.create(5, TimeUnit.SECONDS); final FiniteDuration resetTimeout = FiniteDuration.create(1, TimeUnit.SECONDS); final CircuitBreaker breaker = CircuitBreaker.create(system().scheduler(), maxFailures, callTimeout, resetTimeout); final Route route = path(segment("divide").slash(integerSegment()).slash(integerSegment()), (a, b) -> onCompleteWithBreaker(breaker, () -> CompletableFuture.supplyAsync(() -> a / b), maybeResult -> maybeResult .map(func(result -> complete("The result was " + result))) .recover(new PFBuilder<Throwable, Route>() .matchAny(ex -> complete(StatusCodes.InternalServerError(), "An error occurred: " + ex.toString()) ) .build()) .get() ) ); testRoute(route).run(HttpRequest.GET("/divide/10/2")) .assertEntity("The result was 5"); testRoute(route).run(HttpRequest.GET("/divide/10/0")) .assertStatusCode(StatusCodes.InternalServerError()) .assertEntity("An error occurred: java.lang.ArithmeticException: / by zero"); // opened the circuit-breaker testRoute(route).run(HttpRequest.GET("/divide/10/0")) .assertEntity("The server is currently unavailable (because it is overloaded or down for maintenance).") .assertStatusCode(StatusCodes.ServiceUnavailable()); Thread.sleep(resetTimeout.toMillis()); // circuit breaker resets after this time, but observing it // is timing sensitive so retry a few times within a timeout /* This test only works when compiling against Akka 2.5 which has javadsl.Testkit new akka.testkit.javadsl.TestKit(system()) { { awaitAssert( FiniteDuration.create(500, TimeUnit.MILLISECONDS), () -> { testRoute(route).run(HttpRequest.GET("/divide/8/2")) .assertEntity("The result was 4"); return null; }); } };*/