Sink.actorRefWithBackpressure

Send the elements from the stream to an ActorRef (of the classic actors API) which must then acknowledge reception after completing a message, to provide back pressure onto the sink.

Actor interop operators

Signature

Sink.actorRefWithBackpressureSink.actorRefWithBackpressure

Description

Send the elements from the stream to an ActorRef which must then acknowledge reception after completing a message, to provide back pressure onto the sink. There is also a variant without a concrete acknowledge message accepting any message as such.

See also:

Example

Actor to be interacted with:

Scala
sourceobject AckingReceiver {
  case object Ack

  case object StreamInitialized
  case object StreamCompleted
  final case class StreamFailure(ex: Throwable)
}

class AckingReceiver(probe: ActorRef) extends Actor with ActorLogging {
  import AckingReceiver._

  def receive: Receive = {
    case StreamInitialized =>
      log.info("Stream initialized!")
      probe ! "Stream initialized!"
      sender() ! Ack // ack to allow the stream to proceed sending more elements

    case el: String =>
      log.info("Received element: {}", el)
      probe ! el
      sender() ! Ack // ack to allow the stream to proceed sending more elements

    case StreamCompleted =>
      log.info("Stream completed!")
      probe ! "Stream completed!"
    case StreamFailure(ex) =>
      log.error(ex, "Stream failed!")
  }
}
Java
sourceenum Ack {
  INSTANCE;
}

static class StreamInitialized {}

static class StreamCompleted {}

static class StreamFailure {
  private final Throwable cause;

  public StreamFailure(Throwable cause) {
    this.cause = cause;
  }

  public Throwable getCause() {
    return cause;
  }
}

static class AckingReceiver extends AbstractLoggingActor {

  private final ActorRef probe;

  public AckingReceiver(ActorRef probe) {
    this.probe = probe;
  }

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(
            StreamInitialized.class,
            init -> {
              log().info("Stream initialized");
              probe.tell("Stream initialized", getSelf());
              sender().tell(Ack.INSTANCE, self());
            })
        .match(
            String.class,
            element -> {
              log().info("Received element: {}", element);
              probe.tell(element, getSelf());
              sender().tell(Ack.INSTANCE, self());
            })
        .match(
            StreamCompleted.class,
            completed -> {
              log().info("Stream completed");
              probe.tell("Stream completed", getSelf());
            })
        .match(
            StreamFailure.class,
            failed -> {
              log().error(failed.getCause(), "Stream failed!");
              probe.tell("Stream failed!", getSelf());
            })
        .build();
  }
}

Using the actorRefWithBackpressure operator with the above actor:

Scala
sourceval words: Source[String, NotUsed] =
  Source(List("hello", "hi"))

// sent from actor to stream to "ack" processing of given element
val AckMessage = AckingReceiver.Ack

// sent from stream to actor to indicate start, end or failure of stream:
val InitMessage = AckingReceiver.StreamInitialized
val OnCompleteMessage = AckingReceiver.StreamCompleted
val onErrorMessage = (ex: Throwable) => AckingReceiver.StreamFailure(ex)

val probe = TestProbe()
val receiver = system.actorOf(Props(new AckingReceiver(probe.ref)))
val sink = Sink.actorRefWithBackpressure(
  receiver,
  onInitMessage = InitMessage,
  ackMessage = AckMessage,
  onCompleteMessage = OnCompleteMessage,
  onFailureMessage = onErrorMessage)

words.map(_.toLowerCase).runWith(sink)

probe.expectMsg("Stream initialized!")
probe.expectMsg("hello")
probe.expectMsg("hi")
probe.expectMsg("Stream completed!")
Java
sourceSource<String, NotUsed> words = Source.from(Arrays.asList("hello", "hi"));

final TestKit probe = new TestKit(system);

ActorRef receiver = system.actorOf(Props.create(AckingReceiver.class, probe.getRef()));

Sink<String, NotUsed> sink =
    Sink.<String>actorRefWithBackpressure(
        receiver,
        new StreamInitialized(),
        Ack.INSTANCE,
        new StreamCompleted(),
        ex -> new StreamFailure(ex));

words.map(el -> el.toLowerCase()).runWith(sink, system);

probe.expectMsg("Stream initialized");
probe.expectMsg("hello");
probe.expectMsg("hi");
probe.expectMsg("Stream completed");

Reactive Streams semantics

cancels when the actor terminates

backpressures when the actor acknowledgement has not arrived

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.