Server-sent Events (SSE)

The SSE connector provides a continuous source of server-sent events recovering from connection failure.

Project Info: Alpakka Server-sent events (SSE)
Artifact
com.lightbend.akka
akka-stream-alpakka-sse
9.0.0
JDK versions
Eclipse Temurin JDK 11
Eclipse Temurin JDK 17
Scala versions2.13.12, 3.3.3
JPMS module nameakka.stream.alpakka.sse
License
Readiness level
Since 0.6, 2017-02-13
Home pagehttps://doc.akka.io/libraries/alpakka/current
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.

sbt
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
Maven
<project>
  ...
  <repositories>
    <repository>
      <id>akka-repository</id>
      <name>Akka library repository</name>
      <url>https://repo.akka.io/maven</url>
    </repository>
  </repositories>
</project>
Gradle
repositories {
    mavenCentral()
    maven {
        url "https://repo.akka.io/maven"
    }
}

Additionally, add the dependencies as below.

sbt
val AkkaVersion = "2.10.0"
val AkkaHttpVersion = "10.7.0"
libraryDependencies ++= Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-sse" % "9.0.0",
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion,
  "com.typesafe.akka" %% "akka-http" % AkkaHttpVersion
)
Maven
<properties>
  <akka.version>2.10.0</akka.version>
  <akka.http.version>10.7.0</akka.http.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>com.lightbend.akka</groupId>
    <artifactId>akka-stream-alpakka-sse_${scala.binary.version}</artifactId>
    <version>9.0.0</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_${scala.binary.version}</artifactId>
    <version>${akka.version}</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-http_${scala.binary.version}</artifactId>
    <version>${akka.http.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  AkkaVersion: "2.10.0",
  AkkaHttpVersion: "10.7.0",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "com.lightbend.akka:akka-stream-alpakka-sse_${versions.ScalaBinary}:9.0.0"
  implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}"
  implementation "com.typesafe.akka:akka-http_${versions.ScalaBinary}:${versions.AkkaHttpVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersion
com.typesafe.akkaakka-http_2.1310.7.0
com.typesafe.akkaakka-stream_2.132.10.0
org.scala-langscala-library2.13.12
Dependency tree
com.typesafe.akka    akka-http_2.13    10.7.0    BUSL-1.1
    com.typesafe.akka    akka-http-core_2.13    10.7.0    BUSL-1.1
        com.typesafe.akka    akka-parsing_2.13    10.7.0    BUSL-1.1
            org.scala-lang    scala-library    2.13.12    Apache-2.0
        org.scala-lang    scala-library    2.13.12    Apache-2.0
    com.typesafe.akka    akka-pki_2.13    2.10.0    BUSL-1.1
        com.hierynomus    asn-one    0.6.0    The Apache License, Version 2.0
        com.typesafe.akka    akka-actor_2.13    2.10.0    BUSL-1.1
            com.typesafe    config    1.4.3    Apache-2.0
            org.scala-lang    scala-library    2.13.12    Apache-2.0
        org.scala-lang    scala-library    2.13.12    Apache-2.0
        org.slf4j    slf4j-api    2.0.16
    org.scala-lang    scala-library    2.13.12    Apache-2.0
com.typesafe.akka    akka-stream_2.13    2.10.0    BUSL-1.1
    com.typesafe.akka    akka-actor_2.13    2.10.0    BUSL-1.1
        com.typesafe    config    1.4.3    Apache-2.0
        org.scala-lang    scala-library    2.13.12    Apache-2.0
    com.typesafe.akka    akka-protobuf-v3_2.13    2.10.0    BUSL-1.1
    org.reactivestreams    reactive-streams    1.0.4    MIT-0
    org.scala-lang    scala-library    2.13.12    Apache-2.0
org.scala-lang    scala-library    2.13.12    Apache-2.0

Usage

Define an EventSource by giving a URI, a function for sending HTTP requests, and an optional initial value for Last-Event-ID header:

Scala
sourceimport akka.http.scaladsl.Http
import akka.http.scaladsl.model.sse.ServerSentEvent
import akka.http.scaladsl.model.{HttpEntity, HttpRequest, HttpResponse, Uri}
import akka.stream.alpakka.sse.scaladsl.EventSource
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec

val send: HttpRequest => Future[HttpResponse] = Http().singleRequest(_)

val eventSource: Source[ServerSentEvent, NotUsed] =
  EventSource(
    uri = Uri(s"http://$host:$port"),
    send,
    initialLastEventId = Some("2"),
    retryDelay = 1.second
  )
Java
sourceimport java.util.function.Function;
import java.util.concurrent.CompletionStage;

import akka.http.javadsl.Http;
import akka.http.javadsl.model.*;
import akka.http.javadsl.model.sse.ServerSentEvent;
import akka.stream.alpakka.sse.javadsl.EventSource;

final Http http = Http.get(system);
Function<HttpRequest, CompletionStage<HttpResponse>> send =
    (request) -> http.singleRequest(request);

final Uri targetUri = Uri.create(String.format("http://%s:%d", host, port));
final Optional<String> lastEventId = Optional.of("2");
Source<ServerSentEvent, NotUsed> eventSource =
    EventSource.create(targetUri, send, lastEventId, system);

Then happily consume ServerSentEvents:

Scala
sourceval events: Future[immutable.Seq[ServerSentEvent]] =
  eventSource
    .throttle(elements = 1, per = 500.milliseconds, maximumBurst = 1, ThrottleMode.Shaping)
    .take(nrOfSamples)
    .runWith(Sink.seq)
Java
sourceint elements = 1;
Duration per = Duration.ofMillis(500);
int maximumBurst = 1;

eventSource
    .throttle(elements, per, maximumBurst, ThrottleMode.shaping())
    .take(nrOfSamples)
    .runWith(Sink.seq(), system);
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.