Streams

Dependency

To use Akka Streams Typed, add the module to your project:

sbt
libraryDependencies += "com.typesafe.akka" %% "akka-stream-typed" % "2.5.14"
Maven
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-stream-typed_2.12</artifactId>
  <version>2.5.14</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.typesafe.akka', name: 'akka-stream-typed_2.12', version: '2.5.14'
}

Introduction

Akka Streams make it easy to model type-safe message processing pipelines. With typed actors it is possible to connect streams to actors without losing the type information.

This module contains typed alternatives to the already existing ActorRef sources and sinks together with a factory methods for ActorMaterializerActorMaterializerFactory which takes a typed ActorSystem.

The materializer created from these factory methods and sources together with sinks contained in this module can be mixed and matched with the original Akka Streams building blocks from the original module.

Warning

This module is currently marked as may change in the sense of being the subject of active research. This means that API or semantics can change without warning or deprecation period and it is not recommended to use this module in production just yet—you have been warned.

Actor Source

A stream that is driven by messages sent to a particular actor can be started with ActorSource.actorRefActorSource.actorRef. This source materializes to a typed ActorRef which only accepts messages that are of the same type as the stream.

Scala
import akka.actor.typed.ActorRef
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.typed.scaladsl.ActorSource

trait Protocol
case class Message(msg: String) extends Protocol
case object Complete extends Protocol
case class Fail(ex: Exception) extends Protocol

val source: Source[Protocol, ActorRef[Protocol]] = ActorSource.actorRef[Protocol](
  completionMatcher = {
    case Complete ⇒
  },
  failureMatcher = {
    case Fail(ex) ⇒ ex
  },
  bufferSize = 8,
  overflowStrategy = OverflowStrategy.fail
)

val ref = source.collect {
  case Message(msg) ⇒ msg
}.to(Sink.foreach(println)).run()

ref ! Message("msg1")
// ref ! "msg2" Does not compile
Java
import akka.actor.typed.ActorRef;
import akka.japi.JavaPartialFunction;
import akka.stream.ActorMaterializer;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.typed.javadsl.ActorSource;

  interface Protocol {}
  class Message implements Protocol {
    private final String msg;
    public Message(String msg) {
      this.msg = msg;
    }
  }
  class Complete implements Protocol {}
  class Fail implements Protocol {
    private final Exception ex;
    public Fail(Exception ex) {
      this.ex = ex;
    }
  }

    final JavaPartialFunction<Protocol, Throwable> failureMatcher =
      new JavaPartialFunction<Protocol, Throwable>() {
        public Throwable apply(Protocol p, boolean isCheck) {
          if (p instanceof Fail) {
            return ((Fail)p).ex;
          } else {
            throw noMatch();
          }
        }
      };

    final Source<Protocol, ActorRef<Protocol>> source = ActorSource.actorRef(
      (m) -> m instanceof Complete,
      failureMatcher,
      8,
      OverflowStrategy.fail()
    );

    final ActorRef<Protocol> ref = source.collect(new JavaPartialFunction<Protocol, String>() {
      public String apply(Protocol p, boolean isCheck) {
        if (p instanceof Message) {
          return ((Message)p).msg;
       } else {
          throw noMatch();
       }
     }
    }).to(Sink.foreach(System.out::println)).run(mat);

    ref.tell(new Message("msg1"));
    // ref.tell("msg2"); Does not compile

Actor Sink

There are two sinks availabe that accept typed ActorRefs. To send all of the messages from a stream to an actor without considering backpressure, use ActorSink.actorRefActorSink.actorRef.

Scala
import akka.actor.typed.ActorRef
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.typed.scaladsl.ActorSink

trait Protocol
case class Message(msg: String) extends Protocol
case object Complete extends Protocol
case class Fail(ex: Throwable) extends Protocol

val actor: ActorRef[Protocol] = ???

val sink: Sink[Protocol, NotUsed] = ActorSink.actorRef[Protocol](
  ref = actor,
  onCompleteMessage = Complete,
  onFailureMessage = Fail.apply
)

Source.single(Message("msg1")).runWith(sink)
Java
import akka.NotUsed;
import akka.actor.typed.ActorRef;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.typed.javadsl.ActorSink;

  interface Protocol {}
  class Message implements Protocol {
    private final String msg;
    public Message(String msg) {
      this.msg = msg;
    }
  }
  class Complete implements Protocol {}
  class Fail implements Protocol {
    private final Throwable ex;
    public Fail(Throwable ex) {
      this.ex = ex;
    }
  }

    final ActorRef<Protocol> actor = null;

    final Sink<Protocol, NotUsed> sink = ActorSink.actorRef(
      actor,
      new Complete(),
      Fail::new
    );

    Source.<Protocol>single(new Message("msg1")).runWith(sink, mat);

For an actor to be able to react to backpressure, a protocol needs to be introduced between the actor and the stream. Use ActorSink.actorRefWithAckActorSink.actorRefWithAck to be able to signal demand when the actor is ready to receive more elements.

Scala
import akka.actor.typed.ActorRef
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.typed.scaladsl.ActorSink

trait Ack
object Ack extends Ack

trait Protocol
case class Init(ackTo: ActorRef[Ack]) extends Protocol
case class Message(ackTo: ActorRef[Ack], msg: String) extends Protocol
case object Complete extends Protocol
case class Fail(ex: Throwable) extends Protocol

val actor: ActorRef[Protocol] = ???

val sink: Sink[String, NotUsed] = ActorSink.actorRefWithAck(
  ref = actor,
  onCompleteMessage = Complete,
  onFailureMessage = Fail.apply,
  messageAdapter = Message.apply,
  onInitMessage = Init.apply,
  ackMessage = Ack
)

Source.single("msg1").runWith(sink)
Java
import akka.NotUsed;
import akka.actor.typed.ActorRef;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.typed.javadsl.ActorSink;

  class Ack {}

  interface Protocol {}
  class Init implements Protocol {
    private final ActorRef<Ack> ack;
    public Init(ActorRef<Ack> ack) {
      this.ack = ack;
    }
  }
  class Message implements Protocol {
    private final ActorRef<Ack> ackTo;
    private final String msg;
    public Message(ActorRef<Ack> ackTo, String msg) {
      this.ackTo = ackTo;
      this.msg = msg;
    }
  }
  class Complete implements Protocol {}
  class Fail implements Protocol {
    private final Throwable ex;
    public Fail(Throwable ex) {
      this.ex = ex;
    }
  }

    final ActorRef<Protocol> actor = null;

    final Sink<String, NotUsed> sink = ActorSink.actorRefWithAck(
      actor,
      Message::new,
      Init::new,
      new Ack(),
      new Complete(),
      Fail::new
    );

    Source.single("msg1").runWith(sink, mat);
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.