Server-sent Events (SSE)
The SSE connector provides a continuous source of server-sent events recovering from connection failure.
[+] Show project info
Artifacts
The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.
- sbt
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
- Maven
<project>
...
<repositories>
<repository>
<id>akka-repository</id>
<name>Akka library repository</name>
<url>https://repo.akka.io/maven</url>
</repository>
</repositories>
</project>
- Gradle
repositories {
mavenCentral()
maven {
url "https://repo.akka.io/maven"
}
}
Additionally, add the dependencies as below.
- sbt
val AkkaVersion = "2.10.0"
val AkkaHttpVersion = "10.7.0"
libraryDependencies ++= Seq(
"com.lightbend.akka" %% "akka-stream-alpakka-sse" % "9.0.1",
"com.typesafe.akka" %% "akka-stream" % AkkaVersion,
"com.typesafe.akka" %% "akka-http" % AkkaHttpVersion
)
- Maven
<properties>
<akka.version>2.10.0</akka.version>
<akka.http.version>10.7.0</akka.http.version>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>com.lightbend.akka</groupId>
<artifactId>akka-stream-alpakka-sse_${scala.binary.version}</artifactId>
<version>9.0.1</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream_${scala.binary.version}</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-http_${scala.binary.version}</artifactId>
<version>${akka.http.version}</version>
</dependency>
</dependencies>
- Gradle
def versions = [
AkkaVersion: "2.10.0",
AkkaHttpVersion: "10.7.0",
ScalaBinary: "2.13"
]
dependencies {
implementation "com.lightbend.akka:akka-stream-alpakka-sse_${versions.ScalaBinary}:9.0.1"
implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}"
implementation "com.typesafe.akka:akka-http_${versions.ScalaBinary}:${versions.AkkaHttpVersion}"
}
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization | Artifact | Version |
com.typesafe.akka | akka-http_2.13 | 10.7.0 |
com.typesafe.akka | akka-stream_2.13 | 2.10.0 |
org.scala-lang | scala-library | 2.13.12 |
- Dependency tree
com.typesafe.akka akka-http_2.13 10.7.0 BUSL-1.1
com.typesafe.akka akka-http-core_2.13 10.7.0 BUSL-1.1
com.typesafe.akka akka-parsing_2.13 10.7.0 BUSL-1.1
org.scala-lang scala-library 2.13.12 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
com.typesafe.akka akka-pki_2.13 2.10.0 BUSL-1.1
com.hierynomus asn-one 0.6.0 The Apache License, Version 2.0
com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1
com.typesafe config 1.4.3 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
org.slf4j slf4j-api 2.0.16
org.scala-lang scala-library 2.13.12 Apache-2.0
com.typesafe.akka akka-stream_2.13 2.10.0 BUSL-1.1
com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1
com.typesafe config 1.4.3 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1
org.reactivestreams reactive-streams 1.0.4 MIT-0
org.scala-lang scala-library 2.13.12 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
Usage
Define an EventSource
by giving a URI, a function for sending HTTP requests, and an optional initial value for Last-Event-ID header:
- Scala
-
sourceimport akka.http.scaladsl.Http
import akka.http.scaladsl.model.sse.ServerSentEvent
import akka.http.scaladsl.model.{HttpEntity, HttpRequest, HttpResponse, Uri}
import akka.stream.alpakka.sse.scaladsl.EventSource
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec
val send: HttpRequest => Future[HttpResponse] = Http().singleRequest(_)
val eventSource: Source[ServerSentEvent, NotUsed] =
EventSource(
uri = Uri(s"http://$host:$port"),
send,
initialLastEventId = Some("2"),
retryDelay = 1.second
)
- Java
-
sourceimport java.util.function.Function;
import java.util.concurrent.CompletionStage;
import akka.http.javadsl.Http;
import akka.http.javadsl.model.*;
import akka.http.javadsl.model.sse.ServerSentEvent;
import akka.stream.alpakka.sse.javadsl.EventSource;
final Http http = Http.get(system);
Function<HttpRequest, CompletionStage<HttpResponse>> send =
(request) -> http.singleRequest(request);
final Uri targetUri = Uri.create(String.format("http://%s:%d", host, port));
final Optional<String> lastEventId = Optional.of("2");
Source<ServerSentEvent, NotUsed> eventSource =
EventSource.create(targetUri, send, lastEventId, system);
Then happily consume ServerSentEvent
s:
- Scala
-
sourceval events: Future[immutable.Seq[ServerSentEvent]] =
eventSource
.throttle(elements = 1, per = 500.milliseconds, maximumBurst = 1, ThrottleMode.Shaping)
.take(nrOfSamples)
.runWith(Sink.seq)
- Java
-
sourceint elements = 1;
Duration per = Duration.ofMillis(500);
int maximumBurst = 1;
eventSource
.throttle(elements, per, maximumBurst, ThrottleMode.shaping())
.take(nrOfSamples)
.runWith(Sink.seq(), system);