Client-Side WebSocket Support

Client-Side WebSocket Support

Client side WebSocket support is available through Http.singleWebSocketRequest , Http.webSocketClientFlow and Http.webSocketClientLayer.

A WebSocket consists of two streams of messages, incoming messages (a Sink) and outgoing messages (a Source) where either may be signalled first; or even be the only direction in which messages flow during the lifetime of the connection. Therefore a WebSocket connection is modelled as either something you connect a Flow[Message, Message, Mat] to or a Flow[Message, Message, Mat] that you connect a Source[Message, Mat] and a Sink[Message, Mat] to.

A WebSocket request starts with a regular HTTP request which contains an Upgrade header (and possibly other regular HTTP request properties), so in addition to the flow of messages there also is an initial response from the server, this is modelled with WebSocketUpgradeResponse.

The methods of the WebSocket client API handle the upgrade to WebSocket on connection success and materializes the connected WebSocket stream. If the connection fails, for example with a 404 NotFound error, this regular HTTP result can be found in WebSocketUpgradeResponse.response

Message

Messages sent and received over a WebSocket can be either TextMessage s or BinaryMessage s and each of those has two subtypes Strict or Streaming. In typical applications messages will be Strict as WebSockets are usually deployed to communicate using small messages not stream data, the protocol does however allow this (by not marking the first fragment as final, as described in rfc 6455 section 5.2).

For such streaming messages BinaryMessage.Streaming and TextMessage.Streaming will be used. In these cases the data is provided as a Source[ByteString, NotUsed] for binary and Source[String, NotUsed] for text messages.

singleWebSocketRequest

singleWebSocketRequest takes a WebSocketRequest and a flow it will connect to the source and sink of the WebSocket connection. It will trigger the request right away and returns a tuple containing the Future[WebSocketUpgradeResponse] and the materialized value from the flow passed to the method.

The future will succeed when the WebSocket connection has been established or the server returned a regular HTTP response, or fail if the connection fails with an exception.

Simple example sending a message and printing any incoming message:

import akka.{ Done, NotUsed }
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._

import scala.concurrent.Future

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
import system.dispatcher

// print each incoming strict text message
val printSink: Sink[Message, Future[Done]] =
  Sink.foreach {
    case message: TextMessage.Strict =>
      println(message.text)
  }

val helloSource: Source[Message, NotUsed] =
  Source.single(TextMessage("hello world!"))

// the Future[Done] is the materialized value of Sink.foreach
// and it is completed when the stream completes
val flow: Flow[Message, Message, Future[Done]] =
  Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)

// upgradeResponse is a Future[WebSocketUpgradeResponse] that
// completes or fails when the connection succeeds or fails
// and closed is a Future[Done] representing the stream completion from above
val (upgradeResponse, closed) =
  Http().singleWebSocketRequest(WebSocketRequest("ws://echo.websocket.org"), flow)

val connected = upgradeResponse.map { upgrade =>
  // just like a regular http request we can get 404 NotFound,
  // with a response body, that will be available from upgrade.response
  if (upgrade.response.status == StatusCodes.OK) {
    Done
  } else {
    throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
  }
}

// in a real application you would not side effect here
// and handle errors more carefully
connected.onComplete(println)
closed.foreach(_ => println("closed"))

The websocket request may also include additional headers, like in this example, HTTP Basic Auth:

val (upgradeResponse, _) =
  Http().singleWebSocketRequest(
    WebSocketRequest(
      "ws://example.com:8080/some/path",
      extraHeaders = Seq(Authorization(
        BasicHttpCredentials("johan", "correcthorsebatterystaple")))),
    flow)

webSocketClientFlow

webSocketClientFlow takes a request, and returns a Flow[Message, Message, Future[WebSocketUpgradeResponse]].

The future that is materialized from the flow will succeed when the WebSocket connection has been established or the server returned a regular HTTP response, or fail if the connection fails with an exception.

Note

The Flow that is returned by this method can only be materialized once. For each request a new flow must be acquired by calling the method again.

Simple example sending a message and printing any incoming message:

import akka.Done
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._

import scala.concurrent.Future

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
import system.dispatcher

// Future[Done] is the materialized value of Sink.foreach,
// emitted when the stream completes
val incoming: Sink[Message, Future[Done]] =
  Sink.foreach[Message] {
    case message: TextMessage.Strict =>
      println(message.text)
  }

// send this as a message over the WebSocket
val outgoing = Source.single(TextMessage("hello world!"))

// flow to use (note: not re-usable!)
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org"))

// the materialized value is a tuple with
// upgradeResponse is a Future[WebSocketUpgradeResponse] that
// completes or fails when the connection succeeds or fails
// and closed is a Future[Done] with the stream completion from the incoming sink
val (upgradeResponse, closed) =
  outgoing
    .viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
    .toMat(incoming)(Keep.both) // also keep the Future[Done]
    .run()

// just like a regular http request we can get 404 NotFound etc.
// that will be available from upgrade.response
val connected = upgradeResponse.flatMap { upgrade =>
  if (upgrade.response.status == StatusCodes.OK) {
    Future.successful(Done)
  } else {
    throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
  }
}

// in a real application you would not side effect here
connected.onComplete(println)
closed.foreach(_ => println("closed"))

webSocketClientLayer

Just like the Stand-Alone HTTP Layer Usage for regular HTTP requests, the WebSocket layer can be used fully detached from the underlying TCP interface. The same scenarios as described for regular HTTP requests apply here.

The returned layer forms a BidiFlow[Message, SslTlsOutbound, SslTlsInbound, Message, Future[WebSocketUpgradeResponse]].

Contents