Implications of the streaming nature of Request/Response Entities
Akka HTTP is streaming all the way through, which means that the back-pressure mechanisms enabled by Akka Streams are exposed through all layers–from the TCP layer, through the HTTP server, all the way up to the user-facing HttpRequestHttpRequest and HttpResponseHttpResponse and their HttpEntityHttpEntity APIs.
This has surprising implications if you are used to non-streaming / not-reactive HTTP clients. Specifically it means that: “lack of consumption of the HTTP Entity, is signaled as back-pressure to the other side of the connection”. This is a feature, as it allows one only to consume the entity, and back-pressure servers/clients from overwhelming our application, possibly causing un-necessary buffering of the entity in memory.
Consuming (or discarding) the Entity of a request is mandatory! If accidentally left neither consumed or discarded Akka HTTP will assume the incoming data should remain back-pressured, and will stall the incoming data via TCP back-pressure mechanisms. A client should consume the Entity regardless of the status of the HttpResponseHttpResponse.
Client-Side handling of streaming HTTP Entities
Consuming the HTTP Response Entity (Client)
The most common use-case of course is consuming the response entity, which can be done via running the underlying dataBytes
Source. This is as simple as running the dataBytes source, (or on the server-side using directives such as BasicDirectives.extractDataBytes
).
It is encouraged to use various streaming techniques to utilise the underlying infrastructure to its fullest, for example by framing the incoming chunks, parsing them line-by-line and then connecting the flow into another destination Sink, such as a File or other Akka Streams connector:
- Scala
-
import java.io.File import akka.actor.ActorSystem import akka.http.scaladsl.model._ import akka.stream.ActorMaterializer import akka.stream.scaladsl.{ FileIO, Framing } import akka.util.ByteString implicit val system = ActorSystem() implicit val dispatcher = system.dispatcher implicit val materializer = ActorMaterializer() val response: HttpResponse = ??? response.entity.dataBytes .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256)) .map(transformEachLine) .runWith(FileIO.toPath(new File("/tmp/example.out").toPath)) def transformEachLine(line: ByteString): ByteString = ???
- Java
-
import java.io.File; import java.util.concurrent.TimeUnit; import java.util.function.Function; import akka.stream.ActorMaterializer; import akka.stream.javadsl.Framing; import akka.http.javadsl.model.*; import scala.concurrent.duration.FiniteDuration; final ActorSystem system = ActorSystem.create(); final ExecutionContextExecutor dispatcher = system.dispatcher(); final ActorMaterializer materializer = ActorMaterializer.create(system); final HttpResponse response = responseFromSomewhere(); final Function<ByteString, ByteString> transformEachLine = line -> line /* some transformation here */; final int maximumFrameLength = 256; response.entity().getDataBytes() .via(Framing.delimiter(ByteString.fromString("\n"), maximumFrameLength, FramingTruncation.ALLOW)) .map(transformEachLine::apply) .runWith(FileIO.toPath(new File("/tmp/example.out").toPath()), materializer);
however sometimes the need may arise to consume the entire entity as Strict
entity (which means that it is completely loaded into memory). Akka HTTP provides a special toStrict(timeout)
toStrict(timeout, materializer)
method which can be used to eagerly consume the entity and make it available in memory:
- Scala
-
import scala.concurrent.Future import scala.concurrent.duration._ import akka.actor.ActorSystem import akka.http.scaladsl.model._ import akka.stream.ActorMaterializer import akka.util.ByteString implicit val system = ActorSystem() implicit val dispatcher = system.dispatcher implicit val materializer = ActorMaterializer() case class ExamplePerson(name: String) def parse(line: ByteString): ExamplePerson = ??? val response: HttpResponse = ??? // toStrict to enforce all data be loaded into memory from the connection val strictEntity: Future[HttpEntity.Strict] = response.entity.toStrict(3.seconds) // while API remains the same to consume dataBytes, now they're in memory already: val transformedData: Future[ExamplePerson] = strictEntity flatMap { e => e.dataBytes .runFold(ByteString.empty) { case (acc, b) => acc ++ b } .map(parse) }
- Java
-
final class ExamplePerson { final String name; public ExamplePerson(String name) { this.name = name; } } public ExamplePerson parse(ByteString line) { return new ExamplePerson(line.utf8String()); } final ActorSystem system = ActorSystem.create(); final ExecutionContextExecutor dispatcher = system.dispatcher(); final ActorMaterializer materializer = ActorMaterializer.create(system); final HttpResponse response = responseFromSomewhere(); // toStrict to enforce all data be loaded into memory from the connection final CompletionStage<HttpEntity.Strict> strictEntity = response.entity() .toStrict(FiniteDuration.create(3, TimeUnit.SECONDS).toMillis(), materializer); // while API remains the same to consume dataBytes, now they're in memory already: final CompletionStage<ExamplePerson> person = strictEntity .thenCompose(strict -> strict.getDataBytes() .runFold(ByteString.empty(), (acc, b) -> acc.concat(b), materializer) .thenApply(this::parse) );
Discarding the HTTP Response Entity (Client)
Sometimes when calling HTTP services we do not care about their response payload (e.g. all we care about is the response code), yet as explained above entity still has to be consumed in some way, otherwise we’ll be exerting back-pressure on the underlying TCP connection.
The discardEntityBytes
convenience method serves the purpose of easily discarding the entity if it has no purpose for us. It does so by piping the incoming bytes directly into an Sink.ignore
.
The two snippets below are equivalent, and work the same way on the server-side for incoming HTTP Requests:
- Scala
-
import akka.actor.ActorSystem import akka.http.scaladsl.model.HttpMessage.DiscardedEntity import akka.http.scaladsl.model._ import akka.stream.ActorMaterializer implicit val system = ActorSystem() implicit val dispatcher = system.dispatcher implicit val materializer = ActorMaterializer() val response1: HttpResponse = ??? // obtained from an HTTP call (see examples below) val discarded: DiscardedEntity = response1.discardEntityBytes() discarded.future.onComplete { done => println("Entity discarded completely!") }
- Java
-
final ActorSystem system = ActorSystem.create(); final ExecutionContextExecutor dispatcher = system.dispatcher(); final ActorMaterializer materializer = ActorMaterializer.create(system); final HttpResponse response = responseFromSomewhere(); final HttpMessage.DiscardedEntity discarded = response.discardEntityBytes(materializer); discarded.completionStage().whenComplete((done, ex) -> { System.out.println("Entity discarded completely!"); });
Or the equivalent low-level code achieving the same result:
- Scala
-
val response1: HttpResponse = ??? // obtained from an HTTP call (see examples below) val discardingComplete: Future[Done] = response1.entity.dataBytes.runWith(Sink.ignore) discardingComplete.onComplete(done => println("Entity discarded completely!"))
- Java
-
final ActorSystem system = ActorSystem.create(); final ExecutionContextExecutor dispatcher = system.dispatcher(); final ActorMaterializer materializer = ActorMaterializer.create(system); final HttpResponse response = responseFromSomewhere(); final CompletionStage<Done> discardingComplete = response.entity().getDataBytes().runWith(Sink.ignore(), materializer); discardingComplete.whenComplete((done, ex) -> { System.out.println("Entity discarded completely!"); });
Server-Side handling of streaming HTTP Entities
Similarly as with the Client-side, HTTP Entities are directly linked to Streams which are fed by the underlying TCP connection. Thus, if request entities remain not consumed, the server will back-pressure the connection, expecting that the user-code will eventually decide what to do with the incoming data.
Note that some directives force an implicit toStrict
operation, such as entity(as[String])
entity(exampleUnmarshaller, example -> {})
and similar ones.
Consuming the HTTP Request Entity (Server)
The simplest way of consuming the incoming request entity is to simply transform it into an actual domain object, for example by using the entity directive:
- Scala
-
import akka.actor.ActorSystem import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import akka.stream.ActorMaterializer import spray.json.DefaultJsonProtocol._ implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() // needed for the future flatMap/onComplete in the end implicit val executionContext = system.dispatcher final case class Bid(userId: String, bid: Int) // these are from spray-json implicit val bidFormat = jsonFormat2(Bid) val route = path("bid") { put { entity(as[Bid]) { bid => // incoming entity is fully consumed and converted into a Bid complete("The bid was: " + bid) } } }
- Java
-
class Bid { final String userId; final int bid; Bid(String userId, int bid) { this.userId = userId; this.bid = bid; } } final ActorSystem system = ActorSystem.create(); final ExecutionContextExecutor dispatcher = system.dispatcher(); final ActorMaterializer materializer = ActorMaterializer.create(system); final Unmarshaller<HttpEntity, Bid> asBid = Jackson.unmarshaller(Bid.class); final Route s = path("bid", () -> put(() -> entity(asBid, bid -> // incoming entity is fully consumed and converted into a Bid complete("The bid was: " + bid) ) ) );
Of course you can access the raw dataBytes as well and run the underlying stream, for example piping it into an FileIO Sink, that signals completion via a Future[IoResult]
CompletionStage<IoResult>
once all the data has been written into the file:
- Scala
-
import akka.actor.ActorSystem import akka.stream.scaladsl.FileIO import akka.http.scaladsl.server.Directives._ import akka.stream.ActorMaterializer import java.io.File implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() // needed for the future flatMap/onComplete in the end implicit val executionContext = system.dispatcher val route = (put & path("lines")) { withoutSizeLimit { extractDataBytes { bytes => val finishedWriting = bytes.runWith(FileIO.toPath(new File("/tmp/example.out").toPath)) // we only want to respond once the incoming data has been handled: onComplete(finishedWriting) { ioResult => complete("Finished writing data: " + ioResult) } } } }
- Java
-
final ActorSystem system = ActorSystem.create(); final ExecutionContextExecutor dispatcher = system.dispatcher(); final ActorMaterializer materializer = ActorMaterializer.create(system); final Route s = put(() -> path("lines", () -> withoutSizeLimit(() -> extractDataBytes(bytes -> { final CompletionStage<IOResult> res = bytes.runWith(FileIO.toPath(new File("/tmp/example.out").toPath()), materializer); return onComplete(() -> res, ioResult -> // we only want to respond once the incoming data has been handled: complete("Finished writing data :" + ioResult)); }) ) ) );
Discarding the HTTP Request Entity (Server)
Sometimes, depending on some validation (e.g. checking if given user is allowed to perform uploads or not) you may want to decide to discard the uploaded entity.
Please note that discarding means that the entire upload will proceed, even though you are not interested in the data being streamed to the server - this may be useful if you are simply not interested in the given entity, however you don’t want to abort the entire connection (which we’ll demonstrate as well), since there may be more requests pending on the same connection still.
In order to discard the databytes explicitly you can invoke the discardEntityBytes
bytes of the incoming HTTPRequest
:
- Scala
-
import akka.actor.ActorSystem import akka.http.scaladsl.server.Directives._ import akka.stream.ActorMaterializer import akka.http.scaladsl.model.HttpRequest implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() // needed for the future flatMap/onComplete in the end implicit val executionContext = system.dispatcher val route = (put & path("lines")) { withoutSizeLimit { extractRequest { r: HttpRequest => val finishedWriting = r.discardEntityBytes().future // we only want to respond once the incoming data has been handled: onComplete(finishedWriting) { done => complete("Drained all data from connection... (" + done + ")") } } } }
- Java
-
final ActorSystem system = ActorSystem.create(); final ExecutionContextExecutor dispatcher = system.dispatcher(); final ActorMaterializer materializer = ActorMaterializer.create(system); final Route s = put(() -> path("lines", () -> withoutSizeLimit(() -> extractRequest(r -> { final CompletionStage<Done> res = r.discardEntityBytes(materializer).completionStage(); return onComplete(() -> res, done -> // we only want to respond once the incoming data has been handled: complete("Finished writing data :" + done)); }) ) ) );
A related concept is cancelling the incoming entity.dataBytes
entity.getDataBytes()
stream, which results in Akka HTTP abruptly closing the connection from the Client. This may be useful when you detect that the given user should not be allowed to make any uploads at all, and you want to drop the connection (instead of reading and ignoring the incoming data). This can be done by attaching the incoming entity.dataBytes
entity.getDataBytes()
to a Sink.cancelled()
which will cancel the entity stream, which in turn will cause the underlying connection to be shut-down by the server – effectively hard-aborting the incoming request:
- Scala
-
import akka.actor.ActorSystem import akka.stream.scaladsl.Sink import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.model.headers.Connection import akka.stream.ActorMaterializer implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() // needed for the future flatMap/onComplete in the end implicit val executionContext = system.dispatcher val route = (put & path("lines")) { withoutSizeLimit { extractDataBytes { data => // Closing connections, method 1 (eager): // we deem this request as illegal, and close the connection right away: data.runWith(Sink.cancelled) // "brutally" closes the connection // Closing connections, method 2 (graceful): // consider draining connection and replying with `Connection: Close` header // if you want the client to close after this request/reply cycle instead: respondWithHeader(Connection("close")) complete(StatusCodes.Forbidden -> "Not allowed!") } } }
- Java
-
final ActorSystem system = ActorSystem.create(); final ExecutionContextExecutor dispatcher = system.dispatcher(); final ActorMaterializer materializer = ActorMaterializer.create(system); final Route s = put(() -> path("lines", () -> withoutSizeLimit(() -> extractDataBytes(bytes -> { // Closing connections, method 1 (eager): // we deem this request as illegal, and close the connection right away: bytes.runWith(Sink.cancelled(), materializer); // "brutally" closes the connection // Closing connections, method 2 (graceful): // consider draining connection and replying with `Connection: Close` header // if you want the client to close after this request/reply cycle instead: return respondWithHeader(Connection.create("close"), () -> complete(StatusCodes.FORBIDDEN, "Not allowed!") ); }) ) ) );
Closing connections is also explained in depth in the Closing a connection section of the docs.
Pending: Automatic discarding of not used entities
Under certain conditions it is possible to detect an entity is very unlikely to be used by the user for a given request, and issue warnings or discard the entity automatically. This advanced feature has not been implemented yet, see the below note and issues for further discussion and ideas.
An advanced feature code named “auto draining” has been discussed and proposed for Akka HTTP, and we’re hoping to implement or help the community implement it.
You can read more about it in issue #183 as well as issue #117 ; as always, contributions are very welcome!