Server-Sent Events Support

Server-Sent Events (SSE) is a lightweight and standardized protocol for pushing notifications from a HTTP server to a client. In contrast to WebSocket, which offers bi-directional communication, SSE only allows for one-way communication from the server to the client. If that’s all you need, SSE has the advantages to be much simpler, to rely on HTTP only and to offer retry semantics on broken connections by the browser.

According to the SSE specification clients can request an event stream from the server via HTTP. The server responds with the media type text/event-stream which has the fixed character encoding UTF-8 and keeps the response open to send events to the client when available. Events are textual structures which carry fields and are terminated by an empty line, e.g.

data: { "username": "John Doe" }
event: added
id: 42

data: another event

Clients can optionally signal the last seen event to the server via the Last-Event-IDLastEventId header, e.g. after a reconnect.

Model

Akka HTTP represents event streams as Source[ServerSentEvent, NotUsed]Source<ServerSentEvent, NotUsed> where ServerSentEvent is a case class with the following read-only properties:

  • data: StringString data – the actual payload, may span multiple lines
  • eventType: Option[String]Optional<String> type – optional qualifier, e.g. “added”, “removed”, etc.
  • id: Option[String]Optional<String> id – optional identifier
  • retry: Option[Int]OptionalInt retry – optional reconnection delay in milliseconds

In accordance to the SSE specification Akka HTTP also provides the Last-Event-IDLastEventId header and the text/event-streamTEXT_EVENT_STREAM media type.

Server-side usage: marshalling

In order to respond to a HTTP request with an event stream, you have to bring the implicit ToResponseMarshaller[Source[ServerSentEvent, Any]] defined by EventStreamMarshalling into the scope defining the respective routeuse the EventStreamMarshalling.toEventStream marshaller:

Scala
import akka.NotUsed
import akka.stream.scaladsl.Source

import akka.http.scaladsl.Http
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.model.sse.ServerSentEvent
import scala.concurrent.duration._

import java.time.LocalTime
import java.time.format.DateTimeFormatter.ISO_LOCAL_TIME

def route: Route = {
  import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._

  path("events") {
    get {
      complete {
        Source
          .tick(2.seconds, 2.seconds, NotUsed)
          .map(_ => LocalTime.now())
          .map(time => ServerSentEvent(ISO_LOCAL_TIME.format(time)))
          .keepAlive(1.second, () => ServerSentEvent.heartbeat)
      }
    }
  }
}
Java
final List<ServerSentEvent> events = new ArrayList<>();
events.add(ServerSentEvent.create("1"));
events.add(ServerSentEvent.create("2"));
final Route route = completeOK(Source.from(events), EventStreamMarshalling.toEventStream());

Client-side usage: unmarshalling

In order to unmarshal an event stream as Source[ServerSentEvent, NotUsed]Source<ServerSentEvent, NotUsed>, you have to bring the implicit FromEntityUnmarshaller[Source[ServerSentEvent, NotUsed]] defined by EventStreamUnmarshalling into scopeuse the EventStreamUnmarshalling.fromEventStream unmarshaller:

Scala
import akka.NotUsed
import akka.stream.scaladsl.Source

import akka.http.scaladsl.Http
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.model.sse.ServerSentEvent
import scala.concurrent.duration._

import java.time.LocalTime
import java.time.format.DateTimeFormatter.ISO_LOCAL_TIME

def route: Route = {
  import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._

  path("events") {
    get {
      complete {
        Source
          .tick(2.seconds, 2.seconds, NotUsed)
          .map(_ => LocalTime.now())
          .map(time => ServerSentEvent(ISO_LOCAL_TIME.format(time)))
          .keepAlive(1.second, () => ServerSentEvent.heartbeat)
      }
    }
  }
}
Java
List<ServerSentEvent> unmarshalledEvents =
        EventStreamUnmarshalling.fromEventStream()
                .unmarshal(entity, system.dispatcher(), mat)
                .thenCompose(source -> source.runWith(Sink.seq(), mat))
                .toCompletableFuture()
                .get(3000, TimeUnit.SECONDS);

Notice that if you are looking for a resilient way to permanently subscribe to an event stream, Alpakka provides the EventSource connector which reconnects automatically with the id of the last seen event.

The source code for this page can be found here.