To use Akka Streams Typed, add the module to your project:
Akka Streams make it easy to model type-safe message processing pipelines. With typed actors it is possible to connect streams to actors without losing the type information.
This module contains typed alternatives to the already existing ActorRef
sources and sinks together with a factory methods for `ActorMaterializer`
which takes a typed ActorSystem
The materializer created from these factory methods and sources together with sinks contained in this module can be mixed and matched with the original Akka Streams building blocks from the original module.
This module is ready to be used in production, but it is still marked as may change. This means that API or semantics can change without warning or deprecation period, but such changes will be collected and be performed in Akka 2.6.0 rather than in 2.5.x patch releases.
Actor Source
A stream that is driven by messages sent to a particular actor can be started with `ActorSource.actorRef`
. This source materializes to a typed ActorRef
which only accepts messages that are of the same type as the stream.
- Scala
import import import{ Sink, Source } import trait Protocol case class Message(msg: String) extends Protocol case object Complete extends Protocol case class Fail(ex: Exception) extends Protocol val source: Source[Protocol, ActorRef[Protocol]] = ActorSource.actorRef[Protocol](completionMatcher = { case Complete => }, failureMatcher = { case Fail(ex) => ex }, bufferSize = 8, overflowStrategy = val ref = source .collect { case Message(msg) => msg } .to(Sink.foreach(println)) .run() ref ! Message("msg1") // ref ! "msg2" Does not compile
- Java
Actor Sink
There are two sinks available that accept typed ActorRef
s. To send all of the messages from a stream to an actor without considering backpressure, use `ActorSink.actorRef`
- Scala
import import{ Sink, Source } import trait Protocol case class Message(msg: String) extends Protocol case object Complete extends Protocol case class Fail(ex: Throwable) extends Protocol val actor: ActorRef[Protocol] = targetActor() val sink: Sink[Protocol, NotUsed] = ActorSink.actorRef[Protocol](ref = actor, onCompleteMessage = Complete, onFailureMessage = Fail.apply) Source.single(Message("msg1")).runWith(sink)
- Java
For an actor to be able to react to backpressure, a protocol needs to be introduced between the actor and the stream. Use `ActorSink.actorRefWithAck`
to be able to signal demand when the actor is ready to receive more elements.
- Scala
import import{ Sink, Source } import trait Ack object Ack extends Ack trait Protocol case class Init(ackTo: ActorRef[Ack]) extends Protocol case class Message(ackTo: ActorRef[Ack], msg: String) extends Protocol case object Complete extends Protocol case class Fail(ex: Throwable) extends Protocol val actor: ActorRef[Protocol] = targetActor() val sink: Sink[String, NotUsed] = ActorSink.actorRefWithAck( ref = actor, onCompleteMessage = Complete, onFailureMessage = Fail.apply, messageAdapter = Message.apply, onInitMessage = Init.apply, ackMessage = Ack) Source.single("msg1").runWith(sink)
- Java