ActorSink.actorRefWithBackpressure

Sends the elements of the stream to the given ActorRef<T>ActorRef[T] with backpressure, to be able to signal demand when the actor is ready to receive more elements.

Actor interop operators

Dependency

This operator is included in:

sbt
libraryDependencies += "com.typesafe.akka" %% "akka-stream-typed" % "2.6.1"
Maven
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-stream-typed_2.13</artifactId>
  <version>2.6.1</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.typesafe.akka', name: 'akka-stream-typed_2.13', version: '2.6.1'
}

Signature

def actorRefWithBackpressure[T, M, A](ref: ActorRef[M],messageAdapter: (ActorRef[A], T) => M,onInitMessage: ActorRef[A] => M,ackMessage: A,onCompleteMessage: M,onFailureMessage: Throwable => M): Sink[T, NotUsed]

Description

Sends the elements of the stream to the given ActorRef<T>ActorRef[T] with backpressure, to be able to signal demand when the actor is ready to receive more elements.

Examples

Scala
import akka.actor.typed.ActorRef
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.typed.scaladsl.ActorSink

trait Ack
object Ack extends Ack

trait Protocol
case class Init(ackTo: ActorRef[Ack]) extends Protocol
case class Message(ackTo: ActorRef[Ack], msg: String) extends Protocol
case object Complete extends Protocol
case class Fail(ex: Throwable) extends Protocol

val actor: ActorRef[Protocol] = targetActor()

val sink: Sink[String, NotUsed] = ActorSink.actorRefWithBackpressure(
  ref = actor,
  onCompleteMessage = Complete,
  onFailureMessage = Fail.apply,
  messageAdapter = Message.apply,
  onInitMessage = Init.apply,
  ackMessage = Ack)

Source.single("msg1").runWith(sink)
Java
import akka.NotUsed;
import akka.actor.typed.ActorRef;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.typed.javadsl.ActorSink;

class Ack {}

interface Protocol {}

class Init implements Protocol {
  private final ActorRef<Ack> ack;

  public Init(ActorRef<Ack> ack) {
    this.ack = ack;
  }
}

class Message implements Protocol {
  private final ActorRef<Ack> ackTo;
  private final String msg;

  public Message(ActorRef<Ack> ackTo, String msg) {
    this.ackTo = ackTo;
    this.msg = msg;
  }
}

class Complete implements Protocol {}

class Fail implements Protocol {
  private final Throwable ex;

  public Fail(Throwable ex) {
    this.ex = ex;
  }
}

  final ActorRef<Protocol> actor = null;

  final Sink<String, NotUsed> sink =
      ActorSink.actorRefWithBackpressure(
          actor, Message::new, Init::new, new Ack(), new Complete(), Fail::new);

  Source.single("msg1").runWith(sink, mat);
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.