New to Akka? Start with the Akka SDK.
Sink.actorRefWithBackpressure
Send the elements from the stream to an ActorRef (of the classic actors API) which must then acknowledge reception after completing a message, to provide back pressure onto the sink.
Signature
Sink.actorRefWithBackpressureSink.actorRefWithBackpressure
Description
Send the elements from the stream to an ActorRef which must then acknowledge reception after completing a message, to provide back pressure onto the sink. There is also a variant without a concrete acknowledge message accepting any message as such.
See also:
- Sink.actorRefSend elements to an actor, without considering backpressure
- ActorSink.actorRefThe corresponding operator for the new actors API
- ActorSink.actorRefWithBackpressureSend elements to an actor of the new actors API supporting backpressure
Example
Actor to be interacted with:
- Scala
- 
  source object AckingReceiver { case object Ack case object StreamInitialized case object StreamCompleted case class StreamFailure(ex: Throwable) } class AckingReceiver(probe: ActorRef) extends Actor with ActorLogging { import AckingReceiver._ def receive: Receive = { case StreamInitialized => log.info("Stream initialized!") probe ! "Stream initialized!" sender() ! Ack // ack to allow the stream to proceed sending more elements case el: String => log.info("Received element: {}", el) probe ! el sender() ! Ack // ack to allow the stream to proceed sending more elements case StreamCompleted => log.info("Stream completed!") probe ! "Stream completed!" case StreamFailure(ex) => log.error(ex, "Stream failed!") } }
- Java
- 
  source enum Ack { INSTANCE; } static class StreamInitialized {} static class StreamCompleted {} static class StreamFailure { private final Throwable cause; public StreamFailure(Throwable cause) { this.cause = cause; } public Throwable getCause() { return cause; } } static class AckingReceiver extends AbstractLoggingActor { private final ActorRef probe; public AckingReceiver(ActorRef probe) { this.probe = probe; } @Override public Receive createReceive() { return receiveBuilder() .match( StreamInitialized.class, init -> { log().info("Stream initialized"); probe.tell("Stream initialized", getSelf()); sender().tell(Ack.INSTANCE, self()); }) .match( String.class, element -> { log().info("Received element: {}", element); probe.tell(element, getSelf()); sender().tell(Ack.INSTANCE, self()); }) .match( StreamCompleted.class, completed -> { log().info("Stream completed"); probe.tell("Stream completed", getSelf()); }) .match( StreamFailure.class, failed -> { log().error(failed.getCause(), "Stream failed!"); probe.tell("Stream failed!", getSelf()); }) .build(); } }
Using the actorRefWithBackpressure operator with the above actor:
- Scala
- 
  source val words: Source[String, NotUsed] = Source(List("hello", "hi")) // sent from actor to stream to "ack" processing of given element val AckMessage = AckingReceiver.Ack // sent from stream to actor to indicate start, end or failure of stream: val InitMessage = AckingReceiver.StreamInitialized val OnCompleteMessage = AckingReceiver.StreamCompleted val onErrorMessage = (ex: Throwable) => AckingReceiver.StreamFailure(ex) val probe = TestProbe() val receiver = system.actorOf(Props(new AckingReceiver(probe.ref))) val sink = Sink.actorRefWithBackpressure( receiver, onInitMessage = InitMessage, ackMessage = AckMessage, onCompleteMessage = OnCompleteMessage, onFailureMessage = onErrorMessage) words.map(_.toLowerCase).runWith(sink) probe.expectMsg("Stream initialized!") probe.expectMsg("hello") probe.expectMsg("hi") probe.expectMsg("Stream completed!")
- Java
- 
  source Source<String, NotUsed> words = Source.from(Arrays.asList("hello", "hi")); final TestKit probe = new TestKit(system); ActorRef receiver = system.actorOf(Props.create(AckingReceiver.class, probe.getRef())); Sink<String, NotUsed> sink = Sink.<String>actorRefWithBackpressure( receiver, new StreamInitialized(), Ack.INSTANCE, new StreamCompleted(), ex -> new StreamFailure(ex)); words.map(el -> el.toLowerCase()).runWith(sink, system); probe.expectMsg("Stream initialized"); probe.expectMsg("hello"); probe.expectMsg("hi"); probe.expectMsg("Stream completed");
Reactive Streams semantics
cancels when the actor terminates
backpressures when the actor acknowledgement has not arrived