actorRefWithBackpressure

Materialize an ActorRef; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.

Source operators

Signature

def actorRefWithBackpressure[T](ackMessage: Any,completionMatcher: PartialFunction[Any, CompletionStrategy],failureMatcher: PartialFunction[Any, Throwable]): Source[T, ActorRef]

Description

Materialize an ActorRef, sending messages to it will emit them on the stream. The actor responds with the provided ack message once the element could be emitted allowing for backpressure from the source. Sending another message before the previous one has been acknowledged will fail the stream.

Reactive Streams semantics

emits when there is demand and there are messages in the buffer or a message is sent to the ActorRef

completes when the ActorRef is sent akka.actor.Status.Success

Examples

Scala

import akka.actor.Status.Success import akka.actor.ActorRef import akka.stream.CompletionStrategy import akka.stream.scaladsl._ val probe = TestProbe() val source: Source[Any, ActorRef] = Source.actorRefWithBackpressure[Any]("ack", { case _: Success => CompletionStrategy.immediately }, PartialFunction.empty) val actorRef: ActorRef = source.to(Sink.foreach(println)).run() probe.send(actorRef, "hello") probe.expectMsg("ack") probe.send(actorRef, "hello") probe.expectMsg("ack") // The stream completes successfully with the following message actorRef ! Success(())
Java
import akka.actor.ActorRef;
import akka.actor.Status.Success;
import akka.stream.OverflowStrategy;
import akka.stream.CompletionStrategy;
import akka.stream.javadsl.Sink;
import akka.testkit.TestProbe;
Source<Object, ActorRef> source =
    Source.actorRefWithBackpressure(
        "ack",
        o -> {
          if (o == "complete") return Optional.of(CompletionStrategy.draining());
          else return Optional.empty();
        },
        o -> Optional.empty());

ActorRef actorRef = source.to(Sink.foreach(System.out::println)).run(system);
probe.send(actorRef, "hello");
probe.expectMsg("ack");
probe.send(actorRef, "hello");
probe.expectMsg("ack");

// The stream completes successfully with the following message
actorRef.tell("complete", ActorRef.noSender());
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.