ActorSource.actorRef

Materialize an ActorRef<T>ActorRef[T]; sending messages to it will emit them on the stream only if they are of the same type as the stream.

Actor interop operators

Dependency

This operator is included in:

sbt
libraryDependencies += "com.typesafe.akka" %% "akka-stream-typed" % "2.6.1"
Maven
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-stream-typed_2.13</artifactId>
  <version>2.6.1</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.typesafe.akka', name: 'akka-stream-typed_2.13', version: '2.6.1'
}

Signature

def actorRef[T](completionMatcher: PartialFunction[T, Unit],failureMatcher: PartialFunction[T, Throwable],bufferSize: Int,overflowStrategy: OverflowStrategy): Source[T, ActorRef[T]]

Description

Materialize an ActorRef<T>ActorRef[T] which only accepts messages that are of the same type as the stream.

Examples

Scala
import akka.actor.typed.ActorRef
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.typed.scaladsl.ActorSource

trait Protocol
case class Message(msg: String) extends Protocol
case object Complete extends Protocol
case class Fail(ex: Exception) extends Protocol

val source: Source[Protocol, ActorRef[Protocol]] = ActorSource.actorRef[Protocol](completionMatcher = {
  case Complete =>
}, failureMatcher = {
  case Fail(ex) => ex
}, bufferSize = 8, overflowStrategy = OverflowStrategy.fail)

val ref = source
  .collect {
    case Message(msg) => msg
  }
  .to(Sink.foreach(println))
  .run()

ref ! Message("msg1")
// ref ! "msg2" Does not compile
Java
import akka.actor.typed.ActorRef;
import akka.japi.JavaPartialFunction;
import akka.stream.ActorMaterializer;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.typed.javadsl.ActorSource;

import java.util.Optional;

interface Protocol {}

class Message implements Protocol {
  private final String msg;

  public Message(String msg) {
    this.msg = msg;
  }
}

class Complete implements Protocol {}

class Fail implements Protocol {
  private final Exception ex;

  public Fail(Exception ex) {
    this.ex = ex;
  }
}

  final Source<Protocol, ActorRef<Protocol>> source =
      ActorSource.actorRef(
          (m) -> m instanceof Complete,
          (m) -> (m instanceof Fail) ? Optional.of(((Fail) m).ex) : Optional.empty(),
          8,
          OverflowStrategy.fail());

  final ActorRef<Protocol> ref =
      source
          .collect(
              new JavaPartialFunction<Protocol, String>() {
                public String apply(Protocol p, boolean isCheck) {
                  if (p instanceof Message) {
                    return ((Message) p).msg;
                  } else {
                    throw noMatch();
                  }
                }
              })
          .to(Sink.foreach(System.out::println))
          .run(mat);

  ref.tell(new Message("msg1"));
  // ref.tell("msg2"); Does not compile
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.