ActorSource.actorRefWithBackpressure
Materialize an ActorRef[T]
of the new actors API; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.
Dependency
The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.
This operator is included in:
- sbt
val AkkaVersion = "2.10.2" libraryDependencies += "com.typesafe.akka" %% "akka-stream-typed" % AkkaVersion
- Maven
- Gradle
Signature
ActorSource.actorRefWithBackpressure
Description
Materialize an ActorRef[T]
, sending messages to it will emit them on the stream. The actor responds with the provided ack message once the element could be emitted allowing for backpressure from the source. Sending another message before the previous one has been acknowledged will fail the stream.
See also:
- ActorSource.actorRef This operator, but without backpressure control
- Source.actorRef This operator, but without backpressure control for the classic actors API
- Source.actorRefWithBackpressure This operator for the classic actors API
- Source.queue Materialize a
SourceQueue
onto which elements can be pushed for emitting from the source
Example
With actorRefWithBackpressure
two actors get into play:
- An actor that is materialized when the stream runs. It feeds the stream.
- An actor provided by the user. It gets the ack signal when an element is emitted into the stream.
For the ack signal we create an Emitted
object .
For “feeding” the stream we use the Event
trait .
In this example we create the stream in an actor which itself reacts on the demand of the stream and sends more messages.
- Scala
-
source
import akka.actor.typed.ActorRef import akka.stream.CompletionStrategy import akka.stream.scaladsl.Sink import akka.stream.typed.scaladsl.ActorSource object StreamFeeder { /** Signals that the latest element is emitted into the stream */ case object Emitted sealed trait Event case class Element(content: String) extends Event case object ReachedEnd extends Event case class FailureOccured(ex: Exception) extends Event def apply(): Behavior[Emitted.type] = Behaviors.setup { context => val streamActor = runStream(context.self)(context.system) streamActor ! Element("first") sender(streamActor, 0) } private def runStream(ackReceiver: ActorRef[Emitted.type])(implicit system: ActorSystem[_]): ActorRef[Event] = { val source = ActorSource.actorRefWithBackpressure[Event, Emitted.type]( // get demand signalled to this actor receiving Ack ackTo = ackReceiver, ackMessage = Emitted, // complete when we send ReachedEnd completionMatcher = { case ReachedEnd => CompletionStrategy.draining }, failureMatcher = { case FailureOccured(ex) => ex }) val streamActor: ActorRef[Event] = source .collect { case Element(msg) => msg } .to(Sink.foreach(println)) .run() streamActor } private def sender(streamSource: ActorRef[Event], counter: Int): Behavior[Emitted.type] = Behaviors.receiveMessage { case Emitted if counter < 5 => streamSource ! Element(counter.toString) sender(streamSource, counter + 1) case _ => streamSource ! ReachedEnd Behaviors.stopped } } ActorSystem(StreamFeeder(), "stream-feeder") // Will print: // first // 0 // 1 // 2 // 3 // 4
- Java
Reactive Streams semantics
emits when a message is sent to the materialized ActorRef[T]
it is emitted as soon as there is demand from downstream
completes when the passed completion matcher returns a CompletionStrategy