Sink.actorRefWithAck
Send the elements from the stream to an ActorRef
which must then acknowledge reception after completing a message, to provide back pressure onto the sink.
Description
Send the elements from the stream to an ActorRef
which must then acknowledge reception after completing a message, to provide back pressure onto the sink.
Example
Actor to be interacted with:
- Scala
-
source
object AckingReceiver { case object Ack case object StreamInitialized case object StreamCompleted final case class StreamFailure(ex: Throwable) } class AckingReceiver(probe: ActorRef, ackWith: Any) extends Actor with ActorLogging { import AckingReceiver._ def receive: Receive = { case StreamInitialized => log.info("Stream initialized!") probe ! "Stream initialized!" sender() ! Ack // ack to allow the stream to proceed sending more elements case el: String => log.info("Received element: {}", el) probe ! el sender() ! Ack // ack to allow the stream to proceed sending more elements case StreamCompleted => log.info("Stream completed!") probe ! "Stream completed!" case StreamFailure(ex) => log.error(ex, "Stream failed!") } }
- Java
-
source
enum Ack { INSTANCE; } static class StreamInitialized {} static class StreamCompleted {} static class StreamFailure { private final Throwable cause; public StreamFailure(Throwable cause) { this.cause = cause; } public Throwable getCause() { return cause; } } static class AckingReceiver extends AbstractLoggingActor { private final ActorRef probe; public AckingReceiver(ActorRef probe) { this.probe = probe; } @Override public Receive createReceive() { return receiveBuilder() .match( StreamInitialized.class, init -> { log().info("Stream initialized"); probe.tell("Stream initialized", getSelf()); sender().tell(Ack.INSTANCE, self()); }) .match( String.class, element -> { log().info("Received element: {}", element); probe.tell(element, getSelf()); sender().tell(Ack.INSTANCE, self()); }) .match( StreamCompleted.class, completed -> { log().info("Stream completed"); probe.tell("Stream completed", getSelf()); }) .match( StreamFailure.class, failed -> { log().error(failed.getCause(), "Stream failed!"); probe.tell("Stream failed!", getSelf()); }) .build(); } }
Using the actorRefWithAck
operator with the above actor:
- Scala
-
source
val words: Source[String, NotUsed] = Source(List("hello", "hi")) // sent from actor to stream to "ack" processing of given element val AckMessage = AckingReceiver.Ack // sent from stream to actor to indicate start, end or failure of stream: val InitMessage = AckingReceiver.StreamInitialized val OnCompleteMessage = AckingReceiver.StreamCompleted val onErrorMessage = (ex: Throwable) => AckingReceiver.StreamFailure(ex) val probe = TestProbe() val receiver = system.actorOf(Props(new AckingReceiver(probe.ref, ackWith = AckMessage))) val sink = Sink.actorRefWithAck( receiver, onInitMessage = InitMessage, ackMessage = AckMessage, onCompleteMessage = OnCompleteMessage, onFailureMessage = onErrorMessage) words.map(_.toLowerCase).runWith(sink) probe.expectMsg("Stream initialized!") probe.expectMsg("hello") probe.expectMsg("hi") probe.expectMsg("Stream completed!")
- Java
-
source
Source<String, NotUsed> words = Source.from(Arrays.asList("hello", "hi")); final TestKit probe = new TestKit(system); ActorRef receiver = system.actorOf(Props.create(AckingReceiver.class, probe.getRef())); Sink<String, NotUsed> sink = Sink.<String>actorRefWithAck( receiver, new StreamInitialized(), Ack.INSTANCE, new StreamCompleted(), ex -> new StreamFailure(ex)); words.map(el -> el.toLowerCase()).runWith(sink, mat); probe.expectMsg("Stream initialized"); probe.expectMsg("hello"); probe.expectMsg("hi"); probe.expectMsg("Stream completed");
Reactive Streams semantics
cancels when the actor terminates
backpressures when the actor acknowledgement has not arrived