New to Akka? Start with the Akka SDK.
Server-sent Events (SSE)
The SSE connector provides a continuous source of server-sent events recovering from connection failure.
| Project Info: Alpakka Server-sent events (SSE) | |
|---|---|
| Artifact | com.lightbend.akka
akka-stream-alpakka-sse
10.0.0
|
| JDK versions | Eclipse Temurin JDK 11 Eclipse Temurin JDK 17 |
| Scala versions | 2.13.17, 3.3.7 |
| JPMS module name | akka.stream.alpakka.sse |
| License | |
| Readiness level |
Since 0.6, 2017-02-13
|
| Home page | https://doc.akka.io/libraries/alpakka/current |
| API documentation | |
| Forums | |
| Release notes | GitHub releases |
| Issues | Github issues |
| Sources | https://github.com/akka/alpakka |
Artifacts
Note
The Akka dependencies are available from Akka’s secure library repository. To access them you need to use a secure, tokenized URL as specified at https://account.akka.io/token.
Additionally, add the dependencies as below.
- sbt
val AkkaVersion = "2.10.11" val AkkaHttpVersion = "10.7.3" libraryDependencies ++= Seq( "com.lightbend.akka" %% "akka-stream-alpakka-sse" % "10.0.0", "com.typesafe.akka" %% "akka-stream" % AkkaVersion, "com.typesafe.akka" %% "akka-http" % AkkaHttpVersion )- Maven
<properties> <akka.version>2.10.11</akka.version> <akka.http.version>10.7.3</akka.http.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-sse_${scala.binary.version}</artifactId> <version>10.0.0</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream_${scala.binary.version}</artifactId> <version>${akka.version}</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-http_${scala.binary.version}</artifactId> <version>${akka.http.version}</version> </dependency> </dependencies>- Gradle
def versions = [ AkkaVersion: "2.10.11", AkkaHttpVersion: "10.7.3", ScalaBinary: "2.13" ] dependencies { implementation "com.lightbend.akka:akka-stream-alpakka-sse_${versions.ScalaBinary}:10.0.0" implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}" implementation "com.typesafe.akka:akka-http_${versions.ScalaBinary}:${versions.AkkaHttpVersion}" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version com.typesafe.akka akka-http_2.13 10.7.3 com.typesafe.akka akka-stream_2.13 2.10.11 org.scala-lang scala-library 2.13.17 - Dependency tree
com.typesafe.akka akka-http_2.13 10.7.3 BUSL-1.1 com.typesafe.akka akka-http-core_2.13 10.7.3 BUSL-1.1 com.typesafe.akka akka-parsing_2.13 10.7.3 BUSL-1.1 org.scala-lang scala-library 2.13.17 Apache-2.0 org.scala-lang scala-library 2.13.17 Apache-2.0 com.typesafe.akka akka-pki_2.13 2.10.11 BUSL-1.1 com.hierynomus asn-one 0.6.0 The Apache License, Version 2.0 com.typesafe.akka akka-actor_2.13 2.10.11 BUSL-1.1 com.typesafe config 1.4.5 Apache-2.0 org.scala-lang scala-library 2.13.17 Apache-2.0 org.scala-lang scala-library 2.13.17 Apache-2.0 org.slf4j slf4j-api 2.0.17 MIT org.scala-lang scala-library 2.13.17 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.10.11 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.11 BUSL-1.1 com.typesafe config 1.4.5 Apache-2.0 org.scala-lang scala-library 2.13.17 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.11 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.17 Apache-2.0 org.scala-lang scala-library 2.13.17 Apache-2.0
Usage
Define an EventSource by giving a URI, a function for sending HTTP requests, and an optional initial value for Last-Event-ID header:
- Scala
-
source
import akka.http.scaladsl.Http import akka.http.scaladsl.model.sse.ServerSentEvent import akka.http.scaladsl.model.{HttpEntity, HttpRequest, HttpResponse, Uri} import akka.stream.alpakka.sse.scaladsl.EventSource import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AsyncWordSpec val send: HttpRequest => Future[HttpResponse] = Http().singleRequest(_) val eventSource: Source[ServerSentEvent, NotUsed] = EventSource( uri = Uri(s"http://$host:$port"), send, initialLastEventId = Some("2"), retryDelay = 1.second ) - Java
-
source
import java.util.function.Function; import java.util.concurrent.CompletionStage; import akka.http.javadsl.Http; import akka.http.javadsl.model.*; import akka.http.javadsl.model.sse.ServerSentEvent; import akka.stream.alpakka.sse.javadsl.EventSource; final Http http = Http.get(system); Function<HttpRequest, CompletionStage<HttpResponse>> send = (request) -> http.singleRequest(request); final Uri targetUri = Uri.create(String.format("http://%s:%d", host, port)); final Optional<String> lastEventId = Optional.of("2"); Source<ServerSentEvent, NotUsed> eventSource = EventSource.create(targetUri, send, lastEventId, system);
Then happily consume ServerSentEvents:
- Scala
-
source
val events: Future[immutable.Seq[ServerSentEvent]] = eventSource .throttle(elements = 1, per = 500.milliseconds, maximumBurst = 1, ThrottleMode.Shaping) .take(nrOfSamples) .runWith(Sink.seq) - Java
-
source
int elements = 1; Duration per = Duration.ofMillis(500); int maximumBurst = 1; eventSource .throttle(elements, per, maximumBurst, ThrottleMode.shaping()) .take(nrOfSamples) .runWith(Sink.seq(), system);