ActorSink.actorRefWithBackpressure

Sends the elements of the stream to the given ActorRef<T>ActorRef[T] of the new actors API with backpressure, to be able to signal demand when the actor is ready to receive more elements.

Actor interop operators

Dependency

This operator is included in:

sbt
val AkkaVersion = "2.6.10+90-8b71fac8"
libraryDependencies += "com.typesafe.akka" %% "akka-stream-typed" % AkkaVersion
Maven
<properties>
  <akka.version>2.6.10+90-8b71fac8</akka.version>
  <scala.binary.version>2.12</scala.binary.version>
</properties>
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-stream-typed_${scala.binary.version}</artifactId>
  <version>${akka.version}</version>
</dependency>
Gradle
versions += [
  AkkaVersion: "2.6.10+90-8b71fac8",
  ScalaBinary: "2.12"
]
dependencies {
  compile group: 'com.typesafe.akka', name: "akka-stream-typed_${versions.ScalaBinary}", version: versions.AkkaVersion
}

Signature

ActorSink.actorRefWithBackpressureActorSink.actorRefWithBackpressure

Description

Sends the elements of the stream to the given ActorRef<T>ActorRef[T] with backpressure, to be able to signal demand when the actor is ready to receive more elements.

See also:

Examples

Scala
import akka.actor.typed.ActorRef
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.typed.scaladsl.ActorSink

trait Ack
object Ack extends Ack

trait Protocol
case class Init(ackTo: ActorRef[Ack]) extends Protocol
case class Message(ackTo: ActorRef[Ack], msg: String) extends Protocol
case object Complete extends Protocol
case class Fail(ex: Throwable) extends Protocol

val actor: ActorRef[Protocol] = targetActor()

val sink: Sink[String, NotUsed] = ActorSink.actorRefWithBackpressure(
  ref = actor,
  messageAdapter = (responseActorRef: ActorRef[Ack], element) => Message(responseActorRef, element),
  onInitMessage = (responseActorRef: ActorRef[Ack]) => Init(responseActorRef),
  ackMessage = Ack,
  onCompleteMessage = Complete,
  onFailureMessage = (exception) => Fail(exception))

Source.single("msg1").runWith(sink)
Java
import akka.NotUsed;
import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.typed.javadsl.ActorSink;

class Ack {}

interface Protocol {}

class Init implements Protocol {
  private final ActorRef<Ack> ack;

  public Init(ActorRef<Ack> ack) {
    this.ack = ack;
  }
}

class Message implements Protocol {
  private final ActorRef<Ack> ackTo;
  private final String msg;

  public Message(ActorRef<Ack> ackTo, String msg) {
    this.ackTo = ackTo;
    this.msg = msg;
  }
}

class Complete implements Protocol {}

class Fail implements Protocol {
  private final Throwable ex;

  public Fail(Throwable ex) {
    this.ex = ex;
  }
}

  final ActorRef<Protocol> actorRef = // spawned actor

  final Ack ackMessage = new Ack();
  final Complete completeMessage = new Complete();

  final Sink<String, NotUsed> sink =
      ActorSink.actorRefWithBackpressure(
          actorRef,
          (responseActorRef, element) -> new Message(responseActorRef, element),
          (responseActorRef) -> new Init(responseActorRef),
          ackMessage,
          completeMessage,
          (exception) -> new Fail(exception));

  Source.single("msg1").runWith(sink, system);

Reactive Streams semantics

cancels when the actor terminates

backpressures when the actor acknowledgement has not arrived

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.