Sink.actorRefWithBackpressure
Send the elements from the stream to an ActorRef
(of the classic actors API) which must then acknowledge reception after completing a message, to provide back pressure onto the sink.
Signature
Description
Send the elements from the stream to an ActorRef
which must then acknowledge reception after completing a message, to provide back pressure onto the sink. There is also a variant without a concrete acknowledge message accepting any message as such.
See also:
Sink.actorRef
Send elements to an actor, without considering backpressureActorSink.actorRef
The corresponding operator for the new actors APIActorSink.actorRefWithBackpressure
Send elements to an actor of the new actors API supporting backpressure
Example
Actor to be interacted with:
- Scala
-
source
object AckingReceiver { case object Ack case object StreamInitialized case object StreamCompleted case class StreamFailure(ex: Throwable) } class AckingReceiver(probe: ActorRef) extends Actor with ActorLogging { import AckingReceiver._ def receive: Receive = { case StreamInitialized => log.info("Stream initialized!") probe ! "Stream initialized!" sender() ! Ack // ack to allow the stream to proceed sending more elements case el: String => log.info("Received element: {}", el) probe ! el sender() ! Ack // ack to allow the stream to proceed sending more elements case StreamCompleted => log.info("Stream completed!") probe ! "Stream completed!" case StreamFailure(ex) => log.error(ex, "Stream failed!") } }
- Java
Using the actorRefWithBackpressure
operator with the above actor:
- Scala
-
source
val words: Source[String, NotUsed] = Source(List("hello", "hi")) // sent from actor to stream to "ack" processing of given element val AckMessage = AckingReceiver.Ack // sent from stream to actor to indicate start, end or failure of stream: val InitMessage = AckingReceiver.StreamInitialized val OnCompleteMessage = AckingReceiver.StreamCompleted val onErrorMessage = (ex: Throwable) => AckingReceiver.StreamFailure(ex) val probe = TestProbe() val receiver = system.actorOf(Props(new AckingReceiver(probe.ref))) val sink = Sink.actorRefWithBackpressure( receiver, onInitMessage = InitMessage, ackMessage = AckMessage, onCompleteMessage = OnCompleteMessage, onFailureMessage = onErrorMessage) words.map(_.toLowerCase).runWith(sink) probe.expectMsg("Stream initialized!") probe.expectMsg("hello") probe.expectMsg("hi") probe.expectMsg("Stream completed!")
- Java
Reactive Streams semantics
cancels when the actor terminates
backpressures when the actor acknowledgement has not arrived