Routing DSL

In addition to the Core Server API Akka HTTP provides a very flexible “Routing DSL” for elegantly defining RESTful web services. It picks up where the low-level API leaves off and offers much of the higher-level functionality of typical web servers or frameworks, like deconstruction of URIs, content negotiation or static content serving.

Note

It is recommended to read the Implications of the streaming nature of Request/Response Entities section, as it explains the underlying full-stack streaming concepts, which may be unexpected when coming from a background with non-“streaming first” HTTP Servers.

Minimal Example

This is a complete, very basic Akka HTTP application relying on the Routing DSL:

Scala
sourceimport akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import scala.io.StdIn

object WebServer {
  def main(args: Array[String]) {

    implicit val system = ActorSystem("my-system")
    implicit val materializer = ActorMaterializer()
    // needed for the future flatMap/onComplete in the end
    implicit val executionContext = system.dispatcher

    val route =
      path("hello") {
        get {
          complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "<h1>Say hello to akka-http</h1>"))
        }
      }

    val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)

    println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
    StdIn.readLine() // let it run until user presses return
    bindingFuture
      .flatMap(_.unbind()) // trigger unbinding from the port
      .onComplete(_ => system.terminate()) // and shutdown when done
  }
}
Java
sourceimport akka.NotUsed;
import akka.actor.ActorSystem;
import akka.http.javadsl.ConnectHttp;
import akka.http.javadsl.Http;
import akka.http.javadsl.ServerBinding;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.http.javadsl.server.AllDirectives;
import akka.http.javadsl.server.Route;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Flow;

import java.util.concurrent.CompletionStage;

public class HttpServerMinimalExampleTest extends AllDirectives {

  public static void main(String[] args) throws Exception {
    // boot up server using the route as defined below
    ActorSystem system = ActorSystem.create("routes");

    final Http http = Http.get(system);
    final ActorMaterializer materializer = ActorMaterializer.create(system);

    //In order to access all directives we need an instance where the routes are define.
    HttpServerMinimalExampleTest app = new HttpServerMinimalExampleTest();

    final Flow<HttpRequest, HttpResponse, NotUsed> routeFlow = app.createRoute().flow(system, materializer);
    final CompletionStage<ServerBinding> binding = http.bindAndHandle(routeFlow,
        ConnectHttp.toHost("localhost", 8080), materializer);

    System.out.println("Server online at http://localhost:8080/\nPress RETURN to stop...");
    System.in.read(); // let it run until user presses return

    binding
        .thenCompose(ServerBinding::unbind) // trigger unbinding from the port
        .thenAccept(unbound -> system.terminate()); // and shutdown when done
  }

  private Route createRoute() {
    return concat(
        path("hello", () ->
            get(() ->
                complete("<h1>Say hello to akka-http</h1>"))));
  }
}

It starts an HTTP Server on localhost and replies to GET requests to /hello with a simple response.

API may change

The following example uses an experimental feature and its API is subjected to change in future releases of Akka HTTP. For further information about this marker, see The @DoNotInherit and @ApiMayChange markers in the Akka documentation.

To help start a server Akka HTTP provides an experimental helper class called HttpAppHttpApp. This is the same example as before rewritten using HttpAppHttpApp:

Scala
sourceimport akka.http.scaladsl.model.{ ContentTypes, HttpEntity }
import akka.http.scaladsl.server.HttpApp
import akka.http.scaladsl.server.Route

// Server definition
object WebServer extends HttpApp {
  override def routes: Route =
    path("hello") {
      get {
        complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "<h1>Say hello to akka-http</h1>"))
      }
    }
}

// Starting the server
WebServer.startServer("localhost", 8080)
Java
source
// Server definition class MinimalHttpApp extends HttpApp { @Override protected Route routes() { return path("hello", () -> get(() -> complete("<h1>Say hello to akka-http</h1>") ) ); } } // Starting the server final MinimalHttpApp myServer = new MinimalHttpApp(); myServer.startServer("localhost", 8080);

See HttpApp Bootstrap for more details about setting up a server using this approach.

Longer Example

The following is an Akka HTTP route definition that tries to show off a few features. The resulting service does not really do anything useful but its definition should give you a feel for what an actual API definition with the Routing DSL will look like:

sourceimport akka.actor.{ ActorRef, ActorSystem }
import akka.http.scaladsl.coding.Deflate
import akka.http.scaladsl.marshalling.ToResponseMarshaller
import akka.http.scaladsl.model.StatusCodes.MovedPermanently
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.unmarshalling.FromRequestUnmarshaller
import akka.pattern.ask
import akka.stream.ActorMaterializer
import akka.util.Timeout

// types used by the API routes
type Money = Double // only for demo purposes, don't try this at home!
type TransactionResult = String
case class User(name: String)
case class Order(email: String, amount: Money)
case class Update(order: Order)
case class OrderItem(i: Int, os: Option[String], s: String)

// marshalling would usually be derived automatically using libraries
implicit val orderUM: FromRequestUnmarshaller[Order] = ???
implicit val orderM: ToResponseMarshaller[Order] = ???
implicit val orderSeqM: ToResponseMarshaller[Seq[Order]] = ???
implicit val timeout: Timeout = ??? // for actor asks
implicit val ec: ExecutionContext = ???
implicit val mat: ActorMaterializer = ???
implicit val sys: ActorSystem = ???

// backend entry points
def myAuthenticator: Authenticator[User] = ???
def retrieveOrdersFromDB: Seq[Order] = ???
def myDbActor: ActorRef = ???
def processOrderRequest(id: Int, complete: Order => Unit): Unit = ???

val route = concat(
  path("orders") {
    authenticateBasic(realm = "admin area", myAuthenticator) { user =>
      concat(
        get {
          encodeResponseWith(Deflate) {
            complete {
              // marshal custom object with in-scope marshaller
              retrieveOrdersFromDB
            }
          }
        },
        post {
          // decompress gzipped or deflated requests if required
          decodeRequest {
            // unmarshal with in-scope unmarshaller
            entity(as[Order]) { order =>
              complete {
                // ... write order to DB
                "Order received"
              }
            }
          }
        })
    }
  },
  // extract URI path element as Int
  pathPrefix("order" / IntNumber) { orderId =>
    concat(
      pathEnd {
        concat(
          (put | parameter('method ! "put")) {
            // form extraction from multipart or www-url-encoded forms
            formFields(('email, 'total.as[Money])).as(Order) { order =>
              complete {
                // complete with serialized Future result
                (myDbActor ? Update(order)).mapTo[TransactionResult]
              }
            }
          },
          get {
            // debugging helper
            logRequest("GET-ORDER") {
              // use in-scope marshaller to create completer function
              completeWith(instanceOf[Order]) { completer =>
                // custom
                processOrderRequest(orderId, completer)
              }
            }
          })
      },
      path("items") {
        get {
          // parameters to case class extraction
          parameters(('size.as[Int], 'color ?, 'dangerous ? "no"))
            .as(OrderItem) { orderItem =>
              // ... route using case class instance created from
              // required and optional query parameters
            }
        }
      })
  },
  pathPrefix("documentation") {
    // optionally compresses the response with Gzip or Deflate
    // if the client accepts compressed responses
    encodeResponse {
      // serve up static content from a JAR resource
      getFromResourceDirectory("docs")
    }
  },
  path("oldApi" / Remaining) { pathRest =>
    redirect("http://oldapi.example.com/" + pathRest, MovedPermanently)
  }
)

Interaction with Akka Typed

Since Akka version 2.5.22, Akka typed became ready for production, Akka HTTP, however, is still using the untyped ActorSystem. This following example will demonstrate how to use Akka HTTP and Akka Typed together within the same application.

We will create a small web server responsible to record build jobs with its state and duration, query jobs by id and status, and clear the job history.

First let’s start by defining the Behavior that will act as a repository for the build job information, this isn’t strictly needed for our sample but just to have an actual actor to interact with:

Scala
sourceimport akka.actor.typed.{ ActorRef, Behavior }
import akka.actor.typed.scaladsl.Behaviors

object JobRepository {

  // Definition of the a build job and its possible status values
  sealed trait Status
  object Successful extends Status
  object Failed extends Status

  final case class Job(id: Long, projectName: String, status: Status, duration: Long)
  final case class Jobs(jobs: Seq[Job])

  // Trait defining successful and failure responses
  sealed trait Response
  case object OK extends Response
  final case class KO(reason: String) extends Response

  // Trait and its implementations representing all possible messages that can be sent to this Behavior
  sealed trait Command
  final case class AddJob(job: Job, replyTo: ActorRef[Response]) extends Command
  final case class GetJobById(id: Long, replyTo: ActorRef[Option[Job]]) extends Command
  final case class GetJobByStatus(status: Status, replyTo: ActorRef[Seq[Job]]) extends Command
  final case class ClearJobs(replyTo: ActorRef[Response]) extends Command

  // This behavior handles all possible incoming messages and keeps the state in the function parameter
  def apply(jobs: Map[Long, Job] = Map.empty): Behavior[Command] = Behaviors.receiveMessage {
    case AddJob(job, replyTo) if jobs.contains(job.id) =>
      replyTo ! KO("Job already exists")
      Behaviors.same
    case AddJob(job, replyTo) =>
      replyTo ! OK
      JobRepository(jobs.+(job.id -> job))
    case GetJobById(id, replyTo) =>
      replyTo ! jobs.get(id)
      Behaviors.same
    case ClearJobs(replyTo) =>
      replyTo ! OK
      JobRepository(Map.empty)
  }

}

Then, let’s define the JSON marshaller and unmarshallers for the HTTP routes:

Scala
sourceimport akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import spray.json.DefaultJsonProtocol
import spray.json.DeserializationException
import spray.json.JsString
import spray.json.JsValue
import spray.json.RootJsonFormat

trait JsonSupport extends SprayJsonSupport {
  // import the default encoders for primitive types (Int, String, Lists etc)
  import DefaultJsonProtocol._
  import JobRepository._

  implicit object StatusFormat extends RootJsonFormat[Status] {
    def write(status: Status): JsValue = status match {
      case Failed     => JsString("Failed")
      case Successful => JsString("Successful")
    }

    def read(json: JsValue): Status = json match {
      case JsString("Failed")     => Failed
      case JsString("Successful") => Successful
      case _                      => throw new DeserializationException("Status unexpected")
    }
  }

  implicit val jobFormat = jsonFormat4(Job)

  implicit val jobsFormat = jsonFormat1(Jobs)
}

Next step is to define the RouteRoute that will communicate with the previously defined behavior and handle all its possible responses:

Scala
sourceimport akka.actor.typed.ActorSystem
import akka.util.Timeout

import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.server.Route

import scala.concurrent.duration._
import scala.concurrent.Future

class JobRoutes(buildJobRepository: ActorRef[JobRepository.Command])(implicit system: ActorSystem[_]) extends JsonSupport {

  import akka.actor.typed.scaladsl.AskPattern.Askable

  // asking someone requires a timeout and a scheduler, if the timeout hits without response
  // the ask is failed with a TimeoutException
  implicit val timeout: Timeout = 3.seconds
  // implicit scheduler only needed in 2.5
  // in 2.6 having an implicit typed ActorSystem in scope is enough if you import AskPattern.schedulerFromActorSystem
  implicit val scheduler = system.scheduler

  lazy val theJobRoutes: Route =
    pathPrefix("jobs") {
      concat(
        pathEnd {
          concat(
            post {
              entity(as[JobRepository.Job]) { job =>
                val operationPerformed: Future[JobRepository.Response] =
                  buildJobRepository.ask(JobRepository.AddJob(job, _))
                onSuccess(operationPerformed) {
                  case JobRepository.OK         => complete("Job added")
                  case JobRepository.KO(reason) => complete(StatusCodes.InternalServerError -> reason)
                }
              }
            },
            delete {
              val operationPerformed: Future[JobRepository.Response] =
                buildJobRepository.ask(JobRepository.ClearJobs(_))
              onSuccess(operationPerformed) {
                case JobRepository.OK         => complete("Jobs cleared")
                case JobRepository.KO(reason) => complete(StatusCodes.InternalServerError -> reason)
              }
            }
          )
        },
        (get & path(LongNumber)) { id =>
          val maybeJob: Future[Option[JobRepository.Job]] =
            buildJobRepository.ask(JobRepository.GetJobById(id, _))
          rejectEmptyResponse {
            complete(maybeJob)
          }
        }
      )
    }
}

Finally, we create a Behavior that bootstraps the web server and use it as the root behavior of our actor system:

Scala
sourceimport akka.actor.typed.PostStop
import akka.actor.typed.scaladsl.adapter._
import akka.stream.ActorMaterializer
import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.Http

import scala.concurrent.ExecutionContextExecutor
import scala.util.{ Success, Failure }

object Server {

  sealed trait Message
  private final case class StartFailed(cause: Throwable) extends Message
  private final case class Started(binding: ServerBinding) extends Message
  case object Stop extends Message

  def apply(host: String, port: Int): Behavior[Message] = Behaviors.setup { ctx =>

    implicit val system = ctx.system
    // http doesn't know about akka typed so provide untyped system
    implicit val untypedSystem: akka.actor.ActorSystem = ctx.system.toClassic
    // implicit materializer only required in Akka 2.5
    // in 2.6 having an implicit classic or typed ActorSystem in scope is enough
    implicit val materializer: ActorMaterializer = ActorMaterializer()(ctx.system.toClassic)
    implicit val ec: ExecutionContextExecutor = ctx.system.executionContext

    val buildJobRepository = ctx.spawn(JobRepository(), "JobRepository")
    val routes = new JobRoutes(buildJobRepository)

    val serverBinding: Future[Http.ServerBinding] =
      Http.apply().bindAndHandle(routes.theJobRoutes, host, port)
    ctx.pipeToSelf(serverBinding) {
      case Success(binding) => Started(binding)
      case Failure(ex)      => StartFailed(ex)
    }

    def running(binding: ServerBinding): Behavior[Message] =
      Behaviors.receiveMessagePartial[Message] {
        case Stop =>
          ctx.log.info(
            "Stopping server http://{}:{}/",
            binding.localAddress.getHostString,
            binding.localAddress.getPort)
          Behaviors.stopped
      }.receiveSignal {
        case (_, PostStop) =>
          binding.unbind()
          Behaviors.same
      }

    def starting(wasStopped: Boolean): Behaviors.Receive[Message] =
      Behaviors.receiveMessage[Message] {
        case StartFailed(cause) =>
          throw new RuntimeException("Server failed to start", cause)
        case Started(binding) =>
          ctx.log.info(
            "Server online at http://{}:{}/",
            binding.localAddress.getHostString,
            binding.localAddress.getPort)
          if (wasStopped) ctx.self ! Stop
          running(binding)
        case Stop =>
          // we got a stop message but haven't completed starting yet,
          // we cannot stop until starting has completed
          starting(wasStopped = true)
      }

    starting(wasStopped = false)
  }
}

def main(args: Array[String]) {
  val system: ActorSystem[Server.Message] =
    ActorSystem(Server("localhost", 8080), "BuildJobsServer")
}

Note that the akka.actor.typed.ActorSystem is converted with toClassic, which comes from import akka.actor.typed.scaladsl.adapter._. If you are using an earlier version than Akka 2.5.26 this conversion method is named toUntyped.

Dynamic Routing Example

As the routes are evaluated for each request, it is possible to make changes at runtime. Please note that every access may happen on a separated thread, so any shared mutable state must be thread safe.

The following is an Akka HTTP route definition that allows dynamically adding new or updating mock endpoints with associated request-response pairs at runtime.

Scala
sourcecase class MockDefinition(path: String, requests: Seq[JsValue], responses: Seq[JsValue])
implicit val format = jsonFormat3(MockDefinition)

@volatile var state = Map.empty[String, Map[JsValue, JsValue]]

// fixed route to update state
val fixedRoute: Route = post {
  pathSingleSlash {
    entity(as[MockDefinition]) { mock =>
      val mapping = mock.requests.zip(mock.responses).toMap
      state = state + (mock.path -> mapping)
      complete("ok")
    }
  }
}

// dynamic routing based on current state
val dynamicRoute: Route = ctx => {
  val routes = state.map {
    case (segment, responses) =>
      post {
        path(segment) {
          entity(as[JsValue]) { input =>
            complete(responses.get(input))
          }
        }
      }
  }
  concat(routes.toList: _*)(ctx)
}

val route = fixedRoute ~ dynamicRoute
Java
sourcefinal private Map<String, Map<JsonNode, JsonNode>> state = new ConcurrentHashMap<>();

private Route createRoute() {
  // fixed route to update state
  Route fixedRoute = post(() ->
    pathSingleSlash(() ->
      entity(Jackson.unmarshaller(MockDefinition.class), mock -> {
        Map<JsonNode, JsonNode> mappings = new HashMap<>();
        int size = Math.min(mock.getRequests().size(), mock.getResponses().size());
        for (int i = 0; i < size; i++) {
          mappings.put(mock.getRequests().get(i), mock.getResponses().get(i));
        }
        state.put(mock.getPath(), mappings);
        return complete("ok");
      })
    )
  );

  // dynamic routing based on current state
  Route dynamicRoute = post(() ->
    state.entrySet().stream().map(mock ->
      path(mock.getKey(), () ->
        entity(Jackson.unmarshaller(JsonNode.class), input ->
          complete(StatusCodes.OK, mock.getValue().get(input), Jackson.marshaller())
        )
      )
    ).reduce(reject(), Route::orElse)
  );

  return concat(fixedRoute, dynamicRoute);
}

private static class MockDefinition {
  private final String path;
  private final List<JsonNode> requests;
  private final List<JsonNode> responses;

  public MockDefinition(@JsonProperty("path") String path,
                        @JsonProperty("requests") List<JsonNode> requests,
                        @JsonProperty("responses") List<JsonNode> responses) {
    this.path = path;
    this.requests = requests;
    this.responses = responses;
  }

  public String getPath() {
    return path;
  }

  public List<JsonNode> getRequests() {
    return requests;
  }

  public List<JsonNode> getResponses() {
    return responses;
  }
}

For example, let’s say we do a POST request with body:

{
  "path": "test",
  "requests": [
    {"id": 1},
    {"id": 2}
  ],
  "responses": [
    {"amount": 1000},
    {"amount": 2000}
  ]
}

Subsequent POST request to /test with body {"id": 1} will be responded with {"amount": 1000}.

Handling HTTP Server failures in the High-Level API

There are various situations when failure may occur while initialising or running an Akka HTTP server. Akka by default will log all these failures, however sometimes one may want to react to failures in addition to them just being logged, for example by shutting down the actor system, or notifying some external monitoring end-point explicitly.

Bind failures

For example the server might be unable to bind to the given port. For example when the port is already taken by another application, or if the port is privileged (i.e. only usable by root). In this case the “binding future” will fail immediately, and we can react to it by listening on the FutureCompletionStage’s completion:

Scala
sourceimport akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer

import scala.concurrent.Future

object WebServer {
  def main(args: Array[String]) {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    // needed for the future foreach in the end
    implicit val executionContext = system.dispatcher

    val handler = get {
      complete("Hello world!")
    }

    // let's say the OS won't allow us to bind to 80.
    val (host, port) = ("localhost", 80)
    val bindingFuture: Future[ServerBinding] =
      Http().bindAndHandle(handler, host, port)

    bindingFuture.failed.foreach { ex =>
      log.error(ex, "Failed to bind to {}:{}!", host, port)
    }
  }
}
Java
source
import akka.NotUsed; import akka.actor.ActorSystem; import akka.http.javadsl.ConnectHttp; import akka.http.javadsl.ServerBinding; import akka.http.javadsl.model.HttpRequest; import akka.http.javadsl.model.HttpResponse; import akka.http.javadsl.server.Route; import akka.http.javadsl.Http; import akka.stream.ActorMaterializer; import akka.stream.javadsl.Flow; import java.io.IOException; import java.util.concurrent.CompletionStage; public class HighLevelServerBindFailureExample { public static void main(String[] args) throws IOException { // boot up server using the route as defined below final ActorSystem system = ActorSystem.create(); final ActorMaterializer materializer = ActorMaterializer.create(system); final HighLevelServerExample app = new HighLevelServerExample(); final Route route = app.createRoute(); final Flow<HttpRequest, HttpResponse, NotUsed> handler = route.flow(system, materializer); final CompletionStage<ServerBinding> binding = Http.get(system).bindAndHandle(handler, ConnectHttp.toHost("127.0.0.1", 8080), materializer); binding.exceptionally(failure -> { System.err.println("Something very bad happened! " + failure.getMessage()); system.terminate(); return null; }); system.terminate(); } }
Note

For a more low-level overview of the kinds of failures that can happen and also more fine-grained control over them refer to the Handling HTTP Server failures in the Low-Level API documentation.

Failures and exceptions inside the Routing DSL

Exception handling within the Routing DSL is done by providing ExceptionHandlerExceptionHandler s which are documented in-depth in the Exception Handling section of the documentation. You can use them to transform exceptions into HttpResponseHttpResponse s with appropriate error codes and human-readable failure descriptions.

File uploads

For high level directives to handle uploads see the FileUploadDirectives.

Handling a simple file upload from for example a browser form with a file input can be done by accepting a Multipart.FormData entity, note that the body parts are Source rather than all available right away, and so is the individual body part payload so you will need to consume those streams both for the file and for the form fields.

Here is a simple example which just dumps the uploaded file into a temporary file on disk, collects some form fields and saves an entry to a fictive database:

Scala
sourceval uploadVideo =
  path("video") {
    entity(as[Multipart.FormData]) { formData =>

      // collect all parts of the multipart as it arrives into a map
      val allPartsF: Future[Map[String, Any]] = formData.parts.mapAsync[(String, Any)](1) {

        case b: BodyPart if b.name == "file" =>
          // stream into a file as the chunks of it arrives and return a future
          // file to where it got stored
          val file = File.createTempFile("upload", "tmp")
          b.entity.dataBytes.runWith(FileIO.toPath(file.toPath)).map(_ =>
            (b.name -> file))

        case b: BodyPart =>
          // collect form field values
          b.toStrict(2.seconds).map(strict =>
            (b.name -> strict.entity.data.utf8String))

      }.runFold(Map.empty[String, Any])((map, tuple) => map + tuple)

      val done = allPartsF.map { allParts =>
        // You would have some better validation/unmarshalling here
        db.create(Video(
          file = allParts("file").asInstanceOf[File],
          title = allParts("title").asInstanceOf[String],
          author = allParts("author").asInstanceOf[String]))
      }

      // when processing have finished create a response for the user
      onSuccess(allPartsF) { allParts =>
        complete {
          "ok!"
        }
      }
    }
  }
Java
sourceimport static akka.http.javadsl.server.Directives.complete;
import static akka.http.javadsl.server.Directives.entity;
import static akka.http.javadsl.server.Directives.onSuccess;
import static akka.http.javadsl.server.Directives.path;

  path("video", () ->
  entity(Unmarshaller.entityToMultipartFormData(), formData -> {
    // collect all parts of the multipart as it arrives into a map
    final CompletionStage<Map<String, Object>> allParts =
      formData.getParts().mapAsync(1, bodyPart -> {
        if ("file".equals(bodyPart.getName())) {
          // stream into a file as the chunks of it arrives and return a CompletionStage
          // file to where it got stored
          final File file = File.createTempFile("upload", "tmp");
          return bodyPart.getEntity().getDataBytes()
            .runWith(FileIO.toPath(file.toPath()), materializer)
            .thenApply(ignore ->
              new Pair<String, Object>(bodyPart.getName(), file)
            );
        } else {
          // collect form field values
          return bodyPart.toStrict(2 * 1000, materializer)
            .thenApply(strict ->
              new Pair<String, Object>(bodyPart.getName(),
                strict.getEntity().getData().utf8String())
            );
        }
      }).runFold(new HashMap<String, Object>(), (acc, pair) -> {
        acc.put(pair.first(), pair.second());
        return acc;
      }, materializer);

    // simulate a DB call
    final CompletionStage<Void> done = allParts.thenCompose(map ->
      // You would have some better validation/unmarshalling here
      DB.create((File) map.get("file"),
        (String) map.get("title"),
        (String) map.get("author")
      ));

    // when processing have finished create a response for the user
    return onSuccess(allParts, x -> complete("ok!"));
  })
);

You can transform the uploaded files as they arrive rather than storing them in a temporary file as in the previous example. In this example we accept any number of .csv files, parse those into lines and split each line before we send it to an actor for further processing:

Scala
sourceval splitLines = Framing.delimiter(ByteString("\n"), 256)

val csvUploads =
  path("metadata" / LongNumber) { id =>
    entity(as[Multipart.FormData]) { formData =>
      val done: Future[Done] = formData.parts.mapAsync(1) {
        case b: BodyPart if b.filename.exists(_.endsWith(".csv")) =>
          b.entity.dataBytes
            .via(splitLines)
            .map(_.utf8String.split(",").toVector)
            .runForeach(csv =>
              metadataActor ! MetadataActor.Entry(id, csv))
        case _ => Future.successful(Done)
      }.runWith(Sink.ignore)

      // when processing have finished create a response for the user
      onSuccess(done) { _ =>
        complete {
          "ok!"
        }
      }
    }
  }
Java
sourceimport static akka.http.javadsl.server.Directives.complete;
import static akka.http.javadsl.server.Directives.entity;
import static akka.http.javadsl.server.Directives.onComplete;
import static akka.http.javadsl.server.Directives.path;

Route csvUploads() {
  final Flow<ByteString, ByteString, NotUsed> splitLines =
    Framing.delimiter(ByteString.fromString("\n"), 256);

  return path(segment("metadata").slash(longSegment()), id ->
    entity(Unmarshaller.entityToMultipartFormData(), formData -> {

      final CompletionStage<Done> done = formData.getParts().mapAsync(1, bodyPart ->
        bodyPart.getFilename().filter(name -> name.endsWith(".csv")).map(ignored ->
          bodyPart.getEntity().getDataBytes()
            .via(splitLines)
            .map(bs -> bs.utf8String().split(","))
            .runForeach(csv ->
                metadataActor.tell(new Entry(id, csv), ActorRef.noSender()),
              materializer)
        ).orElseGet(() ->
          // in case the uploaded file is not a CSV
          CompletableFuture.completedFuture(Done.getInstance()))
      ).runWith(Sink.ignore(), materializer);

      // when processing have finished create a response for the user
      return onComplete(() -> done, ignored -> complete("ok!"));
    })
  );
}

Configuring Server-side HTTPS

For detailed documentation about configuring and using HTTPS on the server-side refer to Server-Side HTTPS Support.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.