Sink.actorRefWithBackpressure
Send the elements from the stream to an ActorRef
(of the classic actors API) which must then acknowledge reception after completing a message, to provide back pressure onto the sink.
Signature
Sink.actorRefWithBackpressure
Sink.actorRefWithBackpressure
Description
Send the elements from the stream to an ActorRef
which must then acknowledge reception after completing a message, to provide back pressure onto the sink. There is also a variant without a concrete acknowledge message accepting any message as such.
See also:
Sink.actorRef
Send elements to an actor, without considering backpressureActorSink.actorRef
The corresponding operator for the new actors APIActorSink.actorRefWithBackpressure
Send elements to an actor of the new actors API supporting backpressure
Example
Actor to be interacted with:
- Scala
-
source
object AckingReceiver { case object Ack case object StreamInitialized case object StreamCompleted final case class StreamFailure(ex: Throwable) } class AckingReceiver(probe: ActorRef) extends Actor with ActorLogging { import AckingReceiver._ def receive: Receive = { case StreamInitialized => log.info("Stream initialized!") probe ! "Stream initialized!" sender() ! Ack // ack to allow the stream to proceed sending more elements case el: String => log.info("Received element: {}", el) probe ! el sender() ! Ack // ack to allow the stream to proceed sending more elements case StreamCompleted => log.info("Stream completed!") probe ! "Stream completed!" case StreamFailure(ex) => log.error(ex, "Stream failed!") } }
- Java
-
source
enum Ack { INSTANCE; } static class StreamInitialized {} static class StreamCompleted {} static class StreamFailure { private final Throwable cause; public StreamFailure(Throwable cause) { this.cause = cause; } public Throwable getCause() { return cause; } } static class AckingReceiver extends AbstractLoggingActor { private final ActorRef probe; public AckingReceiver(ActorRef probe) { this.probe = probe; } @Override public Receive createReceive() { return receiveBuilder() .match( StreamInitialized.class, init -> { log().info("Stream initialized"); probe.tell("Stream initialized", getSelf()); sender().tell(Ack.INSTANCE, self()); }) .match( String.class, element -> { log().info("Received element: {}", element); probe.tell(element, getSelf()); sender().tell(Ack.INSTANCE, self()); }) .match( StreamCompleted.class, completed -> { log().info("Stream completed"); probe.tell("Stream completed", getSelf()); }) .match( StreamFailure.class, failed -> { log().error(failed.getCause(), "Stream failed!"); probe.tell("Stream failed!", getSelf()); }) .build(); } }
Using the actorRefWithBackpressure
operator with the above actor:
- Scala
-
source
val words: Source[String, NotUsed] = Source(List("hello", "hi")) // sent from actor to stream to "ack" processing of given element val AckMessage = AckingReceiver.Ack // sent from stream to actor to indicate start, end or failure of stream: val InitMessage = AckingReceiver.StreamInitialized val OnCompleteMessage = AckingReceiver.StreamCompleted val onErrorMessage = (ex: Throwable) => AckingReceiver.StreamFailure(ex) val probe = TestProbe() val receiver = system.actorOf(Props(new AckingReceiver(probe.ref))) val sink = Sink.actorRefWithBackpressure( receiver, onInitMessage = InitMessage, ackMessage = AckMessage, onCompleteMessage = OnCompleteMessage, onFailureMessage = onErrorMessage) words.map(_.toLowerCase).runWith(sink) probe.expectMsg("Stream initialized!") probe.expectMsg("hello") probe.expectMsg("hi") probe.expectMsg("Stream completed!")
- Java
-
source
Source<String, NotUsed> words = Source.from(Arrays.asList("hello", "hi")); final TestKit probe = new TestKit(system); ActorRef receiver = system.actorOf(Props.create(AckingReceiver.class, probe.getRef())); Sink<String, NotUsed> sink = Sink.<String>actorRefWithBackpressure( receiver, new StreamInitialized(), Ack.INSTANCE, new StreamCompleted(), ex -> new StreamFailure(ex)); words.map(el -> el.toLowerCase()).runWith(sink, system); probe.expectMsg("Stream initialized"); probe.expectMsg("hello"); probe.expectMsg("hi"); probe.expectMsg("Stream completed");
Reactive Streams semantics
cancels when the actor terminates
backpressures when the actor acknowledgement has not arrived