Streams
Dependency
To use Akka Streams Typed, add the module to your project:
- sbt
libraryDependencies += "com.typesafe.akka" %% "akka-stream-typed" % "2.5.32"
- Maven
<dependencies> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream-typed_2.12</artifactId> <version>2.5.32</version> </dependency> </dependencies>
- Gradle
dependencies { implementation "com.typesafe.akka:akka-stream-typed_2.12:2.5.32" }
Introduction
Akka Streams make it easy to model type-safe message processing pipelines. With typed actors it is possible to connect streams to actors without losing the type information.
This module contains typed alternatives to the already existing ActorRef
sources and sinks together with a factory methods for `ActorMaterializer`
`ActorMaterializerFactory`
which takes a typed ActorSystem
.
The materializer created from these factory methods and sources together with sinks contained in this module can be mixed and matched with the original Akka Streams building blocks from the original module.
This module is ready to be used in production, but it is still marked as may change. This means that API or semantics can change without warning or deprecation period, but such changes will be collected and be performed in Akka 2.6.0 rather than in 2.5.x patch releases.
Actor Source
A stream that is driven by messages sent to a particular actor can be started with `ActorSource.actorRef`
`ActorSource.actorRef`
. This source materializes to a typed ActorRef
which only accepts messages that are of the same type as the stream.
- Scala
-
source
import akka.actor.typed.ActorRef import akka.stream.OverflowStrategy import akka.stream.scaladsl.{ Sink, Source } import akka.stream.typed.scaladsl.ActorSource trait Protocol case class Message(msg: String) extends Protocol case object Complete extends Protocol case class Fail(ex: Exception) extends Protocol val source: Source[Protocol, ActorRef[Protocol]] = ActorSource.actorRef[Protocol](completionMatcher = { case Complete => }, failureMatcher = { case Fail(ex) => ex }, bufferSize = 8, overflowStrategy = OverflowStrategy.fail) val ref = source .collect { case Message(msg) => msg } .to(Sink.foreach(println)) .run() ref ! Message("msg1") // ref ! "msg2" Does not compile
- Java
-
source
import akka.actor.typed.ActorRef; import akka.japi.JavaPartialFunction; import akka.stream.ActorMaterializer; import akka.stream.OverflowStrategy; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import akka.stream.typed.javadsl.ActorSource; interface Protocol {} class Message implements Protocol { private final String msg; public Message(String msg) { this.msg = msg; } } class Complete implements Protocol {} class Fail implements Protocol { private final Exception ex; public Fail(Exception ex) { this.ex = ex; } } final JavaPartialFunction<Protocol, Throwable> failureMatcher = new JavaPartialFunction<Protocol, Throwable>() { public Throwable apply(Protocol p, boolean isCheck) { if (p instanceof Fail) { return ((Fail) p).ex; } else { throw noMatch(); } } }; final Source<Protocol, ActorRef<Protocol>> source = ActorSource.actorRef( (m) -> m instanceof Complete, failureMatcher, 8, OverflowStrategy.fail()); final ActorRef<Protocol> ref = source .collect( new JavaPartialFunction<Protocol, String>() { public String apply(Protocol p, boolean isCheck) { if (p instanceof Message) { return ((Message) p).msg; } else { throw noMatch(); } } }) .to(Sink.foreach(System.out::println)) .run(mat); ref.tell(new Message("msg1")); // ref.tell("msg2"); Does not compile
Actor Sink
There are two sinks available that accept typed ActorRef
s. To send all of the messages from a stream to an actor without considering backpressure, use `ActorSink.actorRef`
`ActorSink.actorRef`
.
- Scala
-
source
import akka.actor.typed.ActorRef import akka.stream.scaladsl.{ Sink, Source } import akka.stream.typed.scaladsl.ActorSink trait Protocol case class Message(msg: String) extends Protocol case object Complete extends Protocol case class Fail(ex: Throwable) extends Protocol val actor: ActorRef[Protocol] = targetActor() val sink: Sink[Protocol, NotUsed] = ActorSink.actorRef[Protocol](ref = actor, onCompleteMessage = Complete, onFailureMessage = Fail.apply) Source.single(Message("msg1")).runWith(sink)
- Java
-
source
import akka.NotUsed; import akka.actor.typed.ActorRef; import akka.stream.ActorMaterializer; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import akka.stream.typed.javadsl.ActorSink; interface Protocol {} class Message implements Protocol { private final String msg; public Message(String msg) { this.msg = msg; } } class Complete implements Protocol {} class Fail implements Protocol { private final Throwable ex; public Fail(Throwable ex) { this.ex = ex; } } final ActorRef<Protocol> actor = null; final Sink<Protocol, NotUsed> sink = ActorSink.actorRef(actor, new Complete(), Fail::new); Source.<Protocol>single(new Message("msg1")).runWith(sink, mat);
For an actor to be able to react to backpressure, a protocol needs to be introduced between the actor and the stream. Use `ActorSink.actorRefWithAck`
`ActorSink.actorRefWithAck`
to be able to signal demand when the actor is ready to receive more elements.
- Scala
-
source
import akka.actor.typed.ActorRef import akka.stream.scaladsl.{ Sink, Source } import akka.stream.typed.scaladsl.ActorSink trait Ack object Ack extends Ack trait Protocol case class Init(ackTo: ActorRef[Ack]) extends Protocol case class Message(ackTo: ActorRef[Ack], msg: String) extends Protocol case object Complete extends Protocol case class Fail(ex: Throwable) extends Protocol val actor: ActorRef[Protocol] = targetActor() val sink: Sink[String, NotUsed] = ActorSink.actorRefWithAck( ref = actor, onCompleteMessage = Complete, onFailureMessage = Fail.apply, messageAdapter = Message.apply, onInitMessage = Init.apply, ackMessage = Ack) Source.single("msg1").runWith(sink)
- Java
-
source
import akka.NotUsed; import akka.actor.typed.ActorRef; import akka.stream.ActorMaterializer; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import akka.stream.typed.javadsl.ActorSink; class Ack {} interface Protocol {} class Init implements Protocol { private final ActorRef<Ack> ack; public Init(ActorRef<Ack> ack) { this.ack = ack; } } class Message implements Protocol { private final ActorRef<Ack> ackTo; private final String msg; public Message(ActorRef<Ack> ackTo, String msg) { this.ackTo = ackTo; this.msg = msg; } } class Complete implements Protocol {} class Fail implements Protocol { private final Throwable ex; public Fail(Throwable ex) { this.ex = ex; } } final ActorRef<Protocol> actor = null; final Sink<String, NotUsed> sink = ActorSink.actorRefWithAck( actor, Message::new, Init::new, new Ack(), new Complete(), Fail::new); Source.single("msg1").runWith(sink, mat);