Low-Level Server-Side API
Loading

Low-Level Server-Side API

Apart from the HTTP Client Akka HTTP also provides an embedded, Reactive-Streams-based, fully asynchronous HTTP/1.1 server implemented on top of Akka Stream.

It sports the following features:

  • Full support for HTTP persistent connections
  • Full support for HTTP pipelining
  • Full support for asynchronous HTTP streaming including "chunked" transfer encoding accessible through an idiomatic API
  • Optional SSL/TLS encryption
  • Websocket support

The server-side components of Akka HTTP are split into two layers:

  1. The basic low-level server implementation in the akka-http-core module
  2. Higher-level functionality in the akka-http module

The low-level server (1) is scoped with a clear focus on the essential functionality of an HTTP/1.1 server:

  • Connection management
  • Parsing and rendering of messages and headers
  • Timeout management (for requests and connections)
  • Response ordering (for transparent pipelining support)

All non-core features of typical HTTP servers (like request routing, file serving, compression, etc.) are left to the higher layers, they are not implemented by the akka-http-core-level server itself. Apart from general focus this design keeps the server core small and light-weight as well as easy to understand and maintain.

Depending on your needs you can either use the low-level API directly or rely on the high-level Routing DSL which can make the definition of more complex service logic much easier.

Streams and HTTP

The Akka HTTP server is implemented on top of Akka Stream and makes heavy use of it - in its implementation as well as on all levels of its API.

On the connection level Akka HTTP offers basically the same kind of interface as Akka Stream IO: A socket binding is represented as a stream of incoming connections. The application pulls connections from this stream source and, for each of them, provides a Flow<HttpRequest, HttpResponse, ?> to "translate" requests into responses.

Apart from regarding a socket bound on the server-side as a Source<IncomingConnection> and each connection as a Source<HttpRequest> with a Sink<HttpResponse> the stream abstraction is also present inside a single HTTP message: The entities of HTTP requests and responses are generally modeled as a Source<ByteString>. See also the HTTP Model for more information on how HTTP messages are represented in Akka HTTP.

Starting and Stopping

On the most basic level an Akka HTTP server is bound by invoking the bind method of the akka.http.javadsl.Http extension:

ActorSystem system = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(system);

Source<IncomingConnection, Future<ServerBinding>> serverSource =
    Http.get(system).bind("localhost", 8080, materializer);

Future<ServerBinding> serverBindingFuture =
    serverSource.to(Sink.foreach(
        new Procedure<IncomingConnection>() {
            @Override
            public void apply(IncomingConnection connection) throws Exception {
                System.out.println("Accepted new connection from " + connection.remoteAddress());
                // ... and then actually handle the connection
            }
        })).run(materializer);

Arguments to the Http().bind method specify the interface and port to bind to and register interest in handling incoming HTTP connections. Additionally, the method also allows for the definition of socket options as well as a larger number of settings for configuring the server according to your needs.

The result of the bind method is a Source<Http.IncomingConnection> which must be drained by the application in order to accept incoming connections. The actual binding is not performed before this source is materialized as part of a processing pipeline. In case the bind fails (e.g. because the port is already busy) the materialized stream will immediately be terminated with a respective exception. The binding is released (i.e. the underlying socket unbound) when the subscriber of the incoming connection source has cancelled its subscription. Alternatively one can use the unbind() method of the Http.ServerBinding instance that is created as part of the connection source's materialization process. The Http.ServerBinding also provides a way to get a hold of the actual local address of the bound socket, which is useful for example when binding to port zero (and thus letting the OS pick an available port).

Request-Response Cycle

When a new connection has been accepted it will be published as an Http.IncomingConnection which consists of the remote address and methods to provide a Flow<HttpRequest, HttpResponse, ?> to handle requests coming in over this connection.

Requests are handled by calling one of the handleWithXXX methods with a handler, which can either be

  • a Flow<HttpRequest, HttpResponse, ?> for handleWith,
  • a function Function<HttpRequest, HttpResponse> for handleWithSyncHandler,
  • a function Function<HttpRequest, Future<HttpResponse>> for handleWithAsyncHandler.

Here is a complete example:

ActorSystem system = ActorSystem.create();
final Materializer materializer = ActorMaterializer.create(system);

Source<IncomingConnection, Future<ServerBinding>> serverSource =
        Http.get(system).bind("localhost", 8080, materializer);

final Function<HttpRequest, HttpResponse> requestHandler =
    new Function<HttpRequest, HttpResponse>() {
        private final HttpResponse NOT_FOUND =
            HttpResponse.create()
                .withStatus(404)
                .withEntity("Unknown resource!");


        @Override
        public HttpResponse apply(HttpRequest request) throws Exception {
            Uri uri = request.getUri();
            if (request.method() == HttpMethods.GET) {
                if (uri.path().equals("/"))
                    return
                        HttpResponse.create()
                            .withEntity(ContentTypes.TEXT_HTML_UTF8,
                                "<html><body>Hello world!</body></html>");
                else if (uri.path().equals("/hello")) {
                    String name = Util.getOrElse(uri.query().get("name"), "Mister X");

                    return
                        HttpResponse.create()
                            .withEntity("Hello " + name + "!");
                }
                else if (uri.path().equals("/ping"))
                    return HttpResponse.create().withEntity("PONG!");
                else
                    return NOT_FOUND;
            }
            else return NOT_FOUND;
        }
    };

Future<ServerBinding> serverBindingFuture =
    serverSource.to(Sink.foreach(
        new Procedure<IncomingConnection>() {
            @Override
            public void apply(IncomingConnection connection) throws Exception {
                System.out.println("Accepted new connection from " + connection.remoteAddress());

                connection.handleWithSyncHandler(requestHandler, materializer);
                // this is equivalent to
                //connection.handleWith(Flow.of(HttpRequest.class).map(requestHandler), materializer);
            }
        })).run(materializer);

In this example, a request is handled by transforming the request stream with a function Function<HttpRequest, HttpResponse> using handleWithSyncHandler (or equivalently, Akka Stream's map operator). Depending on the use case many other ways of providing a request handler are conceivable using Akka Stream's combinators.

If the application provides a Flow it is also the responsibility of the application to generate exactly one response for every request and that the ordering of responses matches the ordering of the associated requests (which is relevant if HTTP pipelining is enabled where processing of multiple incoming requests may overlap). When relying on handleWithSyncHandler or handleWithAsyncHandler, or the map or mapAsync stream operators, this requirement will be automatically fulfilled.

See Routing DSL Overview for a more convenient high-level DSL to create request handlers.

Streaming Request/Response Entities

Streaming of HTTP message entities is supported through subclasses of HttpEntity. The application needs to be able to deal with streamed entities when receiving a request as well as, in many cases, when constructing responses. See HttpEntity for a description of the alternatives.

Closing a connection

The HTTP connection will be closed when the handling Flow cancels its upstream subscription or the peer closes the connection. An often times more convenient alternative is to explicitly add a Connection: close header to an HttpResponse. This response will then be the last one on the connection and the server will actively close the connection when it has been sent out.

Server-Side HTTPS Support

Akka HTTP supports TLS encryption on the server-side as well as on the client-side.

The central vehicle for configuring encryption is the HttpsContext, which can be created using the static method HttpsContext.create which is defined like this:

public static HttpsContext create(SSLContext sslContext,
                                  Option<Collection<String>> enabledCipherSuites,
                                  Option<Collection<String>> enabledProtocols,
                                  Option<ClientAuth> clientAuth,
                                  Option<SSLParameters> sslParameters)

On the server-side the bind, and bindAndHandleXXX methods of the akka.http.javadsl.Http extension define an optional httpsContext parameter, which can receive the HTTPS configuration in the form of an HttpsContext instance. If defined encryption is enabled on all accepted connections. Otherwise it is disabled (which is the default).

Stand-Alone HTTP Layer Usage

Due to its Reactive-Streams-based nature the Akka HTTP layer is fully detachable from the underlying TCP interface. While in most applications this "feature" will not be crucial it can be useful in certain cases to be able to "run" the HTTP layer (and, potentially, higher-layers) against data that do not come from the network but rather some other source. Potential scenarios where this might be useful include tests, debugging or low-level event-sourcing (e.g by replaying network traffic).

On the server-side the stand-alone HTTP layer forms a BidiFlow<HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, BoxedUnit>, that is a stage that "upgrades" a potentially encrypted raw connection to the HTTP level.

You create an instance of the layer by calling one of the two overloads of the Http.get(system).serverLayer method, which also allows for varying degrees of configuration. Note, that the returned instance is not reusable and can only be materialized once.

Handling HTTP Server failures in the Low-Level API

There are various situations when failure may occur while initialising or running an Akka HTTP server. Akka by default will log all these failures, however sometimes one may want to react to failures in addition to them just being logged, for example by shutting down the actor system, or notifying some external monitoring end-point explicitly.

There are multiple things that can fail when creating and materializing an HTTP Server (similarily, the same applied to a plain streaming Tcp server). The types of failures that can happen on different layers of the stack, starting from being unable to start the server, and ending with failing to unmarshal an HttpRequest, examples of failures include (from outer-most, to inner-most):

  • Failure to bind to the specified address/port,
  • Failure while accepting new IncommingConnection s, for example when the OS has run out of file descriptors or memory,
  • Failure while handling a connection, for example if the incoming HttpRequest is malformed.

This section describes how to handle each failure situation, and in which situations these failures may occur.

The first type of failure is when the server is unable to bind to the given port. For example when the port is already taken by another application, or if the port is privileged (i.e. only usable by root). In this case the "binding future" will fail immediatly, and we can react to if by listening on the Future's completion:

ActorSystem system = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(system);

Source<IncomingConnection, Future<ServerBinding>> serverSource =
    Http.get(system).bind("localhost", 80, materializer);

Future<ServerBinding> serverBindingFuture =
    serverSource.to(Sink.foreach(
        new Procedure<IncomingConnection>() {
            @Override
            public void apply(IncomingConnection connection) throws Exception {
                System.out.println("Accepted new connection from " + connection.remoteAddress());
                // ... and then actually handle the connection
            }
        })).run(materializer);

serverBindingFuture.onFailure(new OnFailure() {
    @Override
    public void onFailure(Throwable failure) throws Throwable {
        // possibly report the failure somewhere...
    }
}, system.dispatcher());

Once the server has successfully bound to a port, the Source<IncomingConnection, ?> starts running and emiting new incoming connections. This source technically can signal a failure as well, however this should only happen in very dramantic situations such as running out of file descriptors or memory available to the system, such that it's not able to accept a new incoming connection. Handling failures in Akka Streams is pretty stright forward, as failures are signaled through the stream starting from the stage which failed, all the way downstream to the final stages.

In the example below we add a custom PushStage (see Custom stream processing) in order to react to the stream's failure. We signal a failureMonitor actor with the cause why the stream is going down, and let the Actor handle the rest – maybe it'll decide to restart the server or shutdown the ActorSystem, that however is not our concern anymore.

ActorSystem system = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(system);

Source<IncomingConnection, Future<ServerBinding>> serverSource =
    Http.get(system).bind("localhost", 8080, materializer);

Flow<IncomingConnection, IncomingConnection, BoxedUnit> failureDetection =
    Flow.of(IncomingConnection.class).transform(() ->
        new PushStage<IncomingConnection, IncomingConnection>() {
            @Override
            public SyncDirective onPush(IncomingConnection elem, Context<IncomingConnection> ctx) {
                return ctx.push(elem);
            }

            @Override
            public TerminationDirective onUpstreamFailure(Throwable cause, Context<IncomingConnection> ctx) {
                // signal the failure to external monitoring service!
                return super.onUpstreamFailure(cause, ctx);
            }
        });

Future<ServerBinding> serverBindingFuture =
        serverSource
                .via(failureDetection) // feed signals through our custom stage
                .to(Sink.foreach(
                        new Procedure<IncomingConnection>() {
                            @Override
                            public void apply(IncomingConnection connection) throws Exception {
                                System.out.println("Accepted new connection from " + connection.remoteAddress());
                                // ... and then actually handle the connection
                            }
                        })).run(materializer);

The third type of failure that can occur is when the connection has been properly established, however afterwards is terminated abruptly – for example by the client aborting the underlying TCP connection. To handle this failure we can use the same pattern as in the previous snippet, however apply it to the connection's Flow:

ActorSystem system = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(system);

Source<IncomingConnection, Future<ServerBinding>> serverSource =
    Http.get(system).bind("localhost", 8080, materializer);

Flow<HttpRequest, HttpRequest, BoxedUnit> failureDetection =
        Flow.of(HttpRequest.class).transform(() ->
        new PushStage<HttpRequest, HttpRequest>() {
            @Override
            public SyncDirective onPush(HttpRequest elem, Context<HttpRequest> ctx) {
                return ctx.push(elem);
            }

            @Override
            public TerminationDirective onUpstreamFailure(Throwable cause, Context<HttpRequest> ctx) {
                // signal the failure to external monitoring service!
                return super.onUpstreamFailure(cause, ctx);
            }
        });

Flow<HttpRequest, HttpResponse, BoxedUnit> httpEcho =
        Flow.of(HttpRequest.class)
            .via(failureDetection)
            .map(request -> {
              Source<ByteString, Object> bytes = request.entity().getDataBytes();
              HttpEntity.Chunked entity = HttpEntities.create(ContentTypes.TEXT_PLAIN_UTF8, bytes);

              return HttpResponse.create()
                .withEntity(entity);
            });

Future<ServerBinding> serverBindingFuture =
    serverSource.to(Sink.foreach(con -> {
                System.out.println("Accepted new connection from " + con.remoteAddress());
                con.handleWith(httpEcho, materializer);
            }
        )).run(materializer);

These failures can be described more or less infrastructure related, they are failing bindings or connections. Most of the time you won't need to dive into those very deeply, as Akka will simply log errors of this kind anyway, which is a reasonable default for such problems.

Contents