Evaluates its parameter of type Future[T]CompletionStage<T> protecting it with the specified CircuitBreakerCircuitBreaker. Refer to Circuit Breaker for a detailed description of this pattern.
sourceimport static akka.http.javadsl.server.Directives.onCompleteWithBreaker;importstatic akka.http.javadsl.server.Directives.path;// import static scala.compat.java8.JFunction.func;// import static akka.http.javadsl.server.PathMatchers.*;finalint maxFailures =1;finalDuration callTimeout =Duration.ofSeconds(5);finalDuration resetTimeout =Duration.ofSeconds(1);finalCircuitBreaker breaker =CircuitBreaker.create(system().scheduler(), maxFailures, callTimeout, resetTimeout);finalRoute route = path(segment("divide").slash(integerSegment()).slash(integerSegment()),(a, b)-> onCompleteWithBreaker(breaker,()->CompletableFuture.supplyAsync(()-> a / b),
maybeResult -> maybeResult
.map(result -> complete("The result was "+ result)).recover(newPFBuilder<Throwable,Route>().matchAny(ex -> complete(StatusCodes.InternalServerError(),"An error occurred: "+ ex.toString())).build()).get()));
testRoute(route).run(HttpRequest.GET("/divide/10/2")).assertEntity("The result was 5");
testRoute(route).run(HttpRequest.GET("/divide/10/0")).assertStatusCode(StatusCodes.InternalServerError()).assertEntity("An error occurred: java.lang.ArithmeticException: / by zero");// The circuit-breaker will eventually be openednewTestKit(system()){{
awaitAssert(Duration.ofSeconds(500),()->{
testRoute(route).run(HttpRequest.GET("/divide/10/0")).assertEntity("The server is currently unavailable (because it is overloaded or down for maintenance).").assertStatusCode(StatusCodes.ServiceUnavailable());returnnull;});Thread.sleep(resetTimeout.toMillis());// circuit breaker resets after this time, but observing it// is timing sensitive so retry a few times within a timeout
awaitAssert(Duration.ofSeconds(500),()->{
testRoute(route).run(HttpRequest.GET("/divide/8/2")).assertEntity("The result was 4");returnnull;});}};