handleWebSocketMessages
Signature
def handleWebSocketMessages ( handler : Flow [ Message , Message , Any ]): Route
Description
The directive first checks if the request was a valid WebSocket handshake request and if yes, it completes the request with the passed handler. Otherwise, the request is rejected with an ExpectedWebSocketRequestRejection
ExpectedWebSocketRequestRejection
.
WebSocket subprotocols offered in the Sec-WebSocket-Protocol
header of the request are ignored. If you want to support several protocols use the handleWebSocketMessagesForProtocol directive, instead.
For more information about the WebSocket support, see Server-Side WebSocket Support .
Example
Scala
copy source def greeter : Flow [ Message , Message , Any ] =
Flow [ Message ]. mapConcat {
case tm : TextMessage =>
TextMessage ( Source . single ( "Hello " ) ++ tm . textStream ++ Source . single ( "!" )) :: Nil
case bm : BinaryMessage =>
// ignore binary messages but drain content to avoid the stream being clogged
bm . dataStream . runWith ( Sink . ignore )
Nil
}
val websocketRoute =
path ( "greeter" ) {
handleWebSocketMessages ( greeter )
}
// tests:
// create a testing probe representing the client-side
val wsClient = WSProbe ()
// WS creates a WebSocket request for testing
WS ( "/greeter" , wsClient . flow ) ~> websocketRoute ~>
check {
// check response for WS Upgrade headers
isWebSocketUpgrade shouldEqual true
// manually run a WS conversation
wsClient . sendMessage ( "Peter" )
wsClient . expectMessage ( "Hello Peter!" )
wsClient . sendMessage ( BinaryMessage ( ByteString ( "abcdef" )))
wsClient . expectNoMessage ( 100.millis )
wsClient . sendMessage ( "John" )
wsClient . expectMessage ( "Hello John!" )
wsClient . sendCompletion ()
wsClient . expectCompletion ()
}
Java
copy source import static akka . http . javadsl . server . Directives . path ;
import static akka . http . javadsl . server . Directives . handleWebSocketMessages ;
final Flow < Message , Message , NotUsed > greeter = Flow . of ( Message . class ). mapConcat ( msg -> {
if ( msg instanceof TextMessage ) {
final TextMessage tm = ( TextMessage ) msg ;
final TextMessage ret = TextMessage . create ( Source . single ( "Hello " ). concat ( tm . getStreamedText ()). concat ( Source . single ( "!" )));
return Collections . singletonList ( ret );
} else if ( msg instanceof BinaryMessage ) {
final BinaryMessage bm = ( BinaryMessage ) msg ;
bm . getStreamedData (). runWith ( Sink . ignore (), materializer ());
return Collections . emptyList ();
} else {
throw new IllegalArgumentException ( "Unsupported message type!" );
}
});
final Route websocketRoute = path ( "greeter" , () ->
handleWebSocketMessages ( greeter )
);
// create a testing probe representing the client-side
final WSProbe wsClient = WSProbe . create ( system (), materializer ());
// WS creates a WebSocket request for testing
testRoute ( websocketRoute ). run ( WS ( Uri . create ( "/greeter" ), wsClient . flow (), materializer ()))
. assertStatusCode ( StatusCodes . SWITCHING_PROTOCOLS );
// manually run a WS conversation
wsClient . sendMessage ( "Peter" );
wsClient . expectMessage ( "Hello Peter!" );
wsClient . sendMessage ( BinaryMessage . create ( ByteString . fromString ( "abcdef" )));
wsClient . expectNoMessage ( FiniteDuration . create ( 100 , TimeUnit . MILLISECONDS ));
wsClient . sendMessage ( "John" );
wsClient . expectMessage ( "Hello John!" );
wsClient . sendCompletion ();
wsClient . expectCompletion ();