Sink.actorRefWithAck

Send the elements from the stream to an ActorRef which must then acknowledge reception after completing a message, to provide back pressure onto the sink.

Sink stages

Description

Send the elements from the stream to an ActorRef which must then acknowledge reception after completing a message, to provide back pressure onto the sink.

Example

Actor to be interacted with:

Scala
object AckingReceiver {
  case object Ack

  case object StreamInitialized
  case object StreamCompleted
  final case class StreamFailure(ex: Throwable)
}

class AckingReceiver(probe: ActorRef, ackWith: Any) extends Actor with ActorLogging {
  import AckingReceiver._

  def receive: Receive = {
    case StreamInitialized ⇒
      log.info("Stream initialized!")
      probe ! "Stream initialized!"
      sender() ! Ack // ack to allow the stream to proceed sending more elements

    case el: String ⇒
      log.info("Received element: {}", el)
      probe ! el
      sender() ! Ack // ack to allow the stream to proceed sending more elements

    case StreamCompleted ⇒
      log.info("Stream completed!")
      probe ! "Stream completed!"
    case StreamFailure(ex) ⇒
      log.error(ex, "Stream failed!")
  }
}
Java
enum Ack {
  INSTANCE;
}

static class StreamInitialized {}
static class StreamCompleted {}
static class StreamFailure {
  private final Throwable cause;
  public StreamFailure(Throwable cause) { this.cause = cause; }

  public Throwable getCause() { return cause; }
}

static class AckingReceiver extends AbstractLoggingActor {

  private final ActorRef probe;

  public AckingReceiver(ActorRef probe) {
    this.probe = probe;
  }

  @Override
  public Receive createReceive() {
    return receiveBuilder()
      .match(StreamInitialized.class, init -> {
        log().info("Stream initialized");
        probe.tell("Stream initialized", getSelf());
        sender().tell(Ack.INSTANCE, self());
      })
      .match(String.class, element -> {
        log().info("Received element: {}", element);
        probe.tell(element, getSelf());
        sender().tell(Ack.INSTANCE, self());
      })
      .match(StreamCompleted.class, completed -> {
        log().info("Stream completed");
        probe.tell("Stream completed", getSelf());
      })
      .match(StreamFailure.class, failed -> {
        log().error(failed.getCause(),"Stream failed!");
        probe.tell("Stream failed!", getSelf());
      })
      .build();
  }
}

Using the actorRefWithAck operator with the above actor:

Scala
val words: Source[String, NotUsed] =
  Source(List("hello", "hi"))

// sent from actor to stream to "ack" processing of given element
val AckMessage = AckingReceiver.Ack

// sent from stream to actor to indicate start, end or failure of stream:
val InitMessage = AckingReceiver.StreamInitialized
val OnCompleteMessage = AckingReceiver.StreamCompleted
val onErrorMessage = (ex: Throwable) ⇒ AckingReceiver.StreamFailure(ex)

val probe = TestProbe()
val receiver = system.actorOf(
  Props(new AckingReceiver(probe.ref, ackWith = AckMessage)))
val sink = Sink.actorRefWithAck(
  receiver,
  onInitMessage = InitMessage,
  ackMessage = AckMessage,
  onCompleteMessage = OnCompleteMessage,
  onFailureMessage = onErrorMessage
)

words
  .map(_.toLowerCase)
  .runWith(sink)

probe.expectMsg("Stream initialized!")
probe.expectMsg("hello")
probe.expectMsg("hi")
probe.expectMsg("Stream completed!")
Java
Source<String, NotUsed> words =
  Source.from(Arrays.asList("hello", "hi"));

final TestKit probe = new TestKit(system);

ActorRef receiver =
    system.actorOf(Props.create(AckingReceiver.class, probe.getRef()));

Sink<String, NotUsed> sink = Sink.<String>actorRefWithAck(receiver,
    new StreamInitialized(),
    Ack.INSTANCE,
    new StreamCompleted(),
    ex -> new StreamFailure(ex)
);

words
    .map(el -> el.toLowerCase())
    .runWith(sink, mat);

probe.expectMsg("Stream initialized");
probe.expectMsg("hello");
probe.expectMsg("hi");
probe.expectMsg("Stream completed");

Reactive Streams semantics

cancels when the actor terminates

backpressures when the actor acknowledgement has not arrived

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.