Streams

Dependency

To use Akka Streams Typed, add the module to your project:

sbt
libraryDependencies += "com.typesafe.akka" %% "akka-stream-typed" % "2.5.32"
Maven
<dependencies>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream-typed_2.12</artifactId>
    <version>2.5.32</version>
  </dependency>
</dependencies>
Gradle
dependencies {
  implementation "com.typesafe.akka:akka-stream-typed_2.12:2.5.32"
}

Introduction

Akka Streams make it easy to model type-safe message processing pipelines. With typed actors it is possible to connect streams to actors without losing the type information.

This module contains typed alternatives to the already existing ActorRef sources and sinks together with a factory methods for `ActorMaterializer``ActorMaterializerFactory` which takes a typed ActorSystem.

The materializer created from these factory methods and sources together with sinks contained in this module can be mixed and matched with the original Akka Streams building blocks from the original module.

Note

This module is ready to be used in production, but it is still marked as may change. This means that API or semantics can change without warning or deprecation period, but such changes will be collected and be performed in Akka 2.6.0 rather than in 2.5.x patch releases.

Actor Source

A stream that is driven by messages sent to a particular actor can be started with `ActorSource.actorRef``ActorSource.actorRef`. This source materializes to a typed ActorRef which only accepts messages that are of the same type as the stream.

Scala
sourceimport akka.actor.typed.ActorRef
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.typed.scaladsl.ActorSource

trait Protocol
case class Message(msg: String) extends Protocol
case object Complete extends Protocol
case class Fail(ex: Exception) extends Protocol

val source: Source[Protocol, ActorRef[Protocol]] = ActorSource.actorRef[Protocol](completionMatcher = {
  case Complete =>
}, failureMatcher = {
  case Fail(ex) => ex
}, bufferSize = 8, overflowStrategy = OverflowStrategy.fail)

val ref = source
  .collect {
    case Message(msg) => msg
  }
  .to(Sink.foreach(println))
  .run()

ref ! Message("msg1")
// ref ! "msg2" Does not compile
Java
sourceimport akka.actor.typed.ActorRef;
import akka.japi.JavaPartialFunction;
import akka.stream.ActorMaterializer;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.typed.javadsl.ActorSource;

interface Protocol {}

class Message implements Protocol {
  private final String msg;

  public Message(String msg) {
    this.msg = msg;
  }
}

class Complete implements Protocol {}

class Fail implements Protocol {
  private final Exception ex;

  public Fail(Exception ex) {
    this.ex = ex;
  }
}

  final JavaPartialFunction<Protocol, Throwable> failureMatcher =
      new JavaPartialFunction<Protocol, Throwable>() {
        public Throwable apply(Protocol p, boolean isCheck) {
          if (p instanceof Fail) {
            return ((Fail) p).ex;
          } else {
            throw noMatch();
          }
        }
      };

  final Source<Protocol, ActorRef<Protocol>> source =
      ActorSource.actorRef(
          (m) -> m instanceof Complete, failureMatcher, 8, OverflowStrategy.fail());

  final ActorRef<Protocol> ref =
      source
          .collect(
              new JavaPartialFunction<Protocol, String>() {
                public String apply(Protocol p, boolean isCheck) {
                  if (p instanceof Message) {
                    return ((Message) p).msg;
                  } else {
                    throw noMatch();
                  }
                }
              })
          .to(Sink.foreach(System.out::println))
          .run(mat);

  ref.tell(new Message("msg1"));
  // ref.tell("msg2"); Does not compile

Actor Sink

There are two sinks available that accept typed ActorRefs. To send all of the messages from a stream to an actor without considering backpressure, use `ActorSink.actorRef``ActorSink.actorRef`.

Scala
sourceimport akka.actor.typed.ActorRef
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.typed.scaladsl.ActorSink

trait Protocol
case class Message(msg: String) extends Protocol
case object Complete extends Protocol
case class Fail(ex: Throwable) extends Protocol

val actor: ActorRef[Protocol] = targetActor()

val sink: Sink[Protocol, NotUsed] =
  ActorSink.actorRef[Protocol](ref = actor, onCompleteMessage = Complete, onFailureMessage = Fail.apply)

Source.single(Message("msg1")).runWith(sink)
Java
sourceimport akka.NotUsed;
import akka.actor.typed.ActorRef;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.typed.javadsl.ActorSink;

interface Protocol {}

class Message implements Protocol {
  private final String msg;

  public Message(String msg) {
    this.msg = msg;
  }
}

class Complete implements Protocol {}

class Fail implements Protocol {
  private final Throwable ex;

  public Fail(Throwable ex) {
    this.ex = ex;
  }
}

  final ActorRef<Protocol> actor = null;

  final Sink<Protocol, NotUsed> sink = ActorSink.actorRef(actor, new Complete(), Fail::new);

  Source.<Protocol>single(new Message("msg1")).runWith(sink, mat);

For an actor to be able to react to backpressure, a protocol needs to be introduced between the actor and the stream. Use `ActorSink.actorRefWithAck``ActorSink.actorRefWithAck` to be able to signal demand when the actor is ready to receive more elements.

Scala
sourceimport akka.actor.typed.ActorRef
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.typed.scaladsl.ActorSink

trait Ack
object Ack extends Ack

trait Protocol
case class Init(ackTo: ActorRef[Ack]) extends Protocol
case class Message(ackTo: ActorRef[Ack], msg: String) extends Protocol
case object Complete extends Protocol
case class Fail(ex: Throwable) extends Protocol

val actor: ActorRef[Protocol] = targetActor()

val sink: Sink[String, NotUsed] = ActorSink.actorRefWithAck(
  ref = actor,
  onCompleteMessage = Complete,
  onFailureMessage = Fail.apply,
  messageAdapter = Message.apply,
  onInitMessage = Init.apply,
  ackMessage = Ack)

Source.single("msg1").runWith(sink)
Java
sourceimport akka.NotUsed;
import akka.actor.typed.ActorRef;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.typed.javadsl.ActorSink;

class Ack {}

interface Protocol {}

class Init implements Protocol {
  private final ActorRef<Ack> ack;

  public Init(ActorRef<Ack> ack) {
    this.ack = ack;
  }
}

class Message implements Protocol {
  private final ActorRef<Ack> ackTo;
  private final String msg;

  public Message(ActorRef<Ack> ackTo, String msg) {
    this.ackTo = ackTo;
    this.msg = msg;
  }
}

class Complete implements Protocol {}

class Fail implements Protocol {
  private final Throwable ex;

  public Fail(Throwable ex) {
    this.ex = ex;
  }
}

  final ActorRef<Protocol> actor = null;

  final Sink<String, NotUsed> sink =
      ActorSink.actorRefWithAck(
          actor, Message::new, Init::new, new Ack(), new Complete(), Fail::new);

  Source.single("msg1").runWith(sink, mat);
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.