ActorSink.actorRefWithBackpressure
Sends the elements of the stream to the given ActorRef<T>
ActorRef[T]
of the new actors API with backpressure, to be able to signal demand when the actor is ready to receive more elements.
Actor interop operators
Dependency
The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.
sbt resolvers += "Akka library repository" . at ( "https://repo.akka.io/maven" )
Maven <project>
...
<repositories>
<repository>
<id> akka-repository </id>
<name> Akka library repository </name>
<url> https://repo.akka.io/maven </url>
</repository>
</repositories>
</project>
Gradle repositories {
mavenCentral ()
maven {
url "https://repo.akka.io/maven"
}
}
This operator is included in:
sbt val AkkaVersion = "2.8.7"
libraryDependencies += "com.typesafe.akka" %% "akka-stream-typed" % AkkaVersion
Maven <properties>
<scala.binary.version> 2.13 </scala.binary.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId> com.typesafe.akka </groupId>
<artifactId> akka-bom_${scala.binary.version} </artifactId>
<version> 2.8.7 </version>
<type> pom </type>
<scope> import </scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId> com.typesafe.akka </groupId>
<artifactId> akka-stream-typed_${scala.binary.version} </artifactId>
</dependency>
</dependencies>
Gradle def versions = [
ScalaBinary : "2.13"
]
dependencies {
implementation platform ( "com.typesafe.akka:akka-bom_${versions.ScalaBinary}:2.8.7" )
implementation "com.typesafe.akka:akka-stream-typed_${versions.ScalaBinary}"
}
Signature
ActorSink.actorRefWithBackpressure
ActorSink.actorRefWithBackpressure
Description
Sends the elements of the stream to the given ActorRef<T>
ActorRef[T]
with backpressure, to be able to signal demand when the actor is ready to receive more elements. There is also a variant without a concrete acknowledge message accepting any message as such.
See also:
Examples
Scala
copy source import akka . actor . typed . ActorRef
import akka . stream . scaladsl .{ Sink , Source }
import akka . stream . typed . scaladsl . ActorSink
trait Ack
object Ack extends Ack
trait Protocol
case class Init ( ackTo : ActorRef [ Ack ]) extends Protocol
case class Message ( ackTo : ActorRef [ Ack ], msg : String ) extends Protocol
case object Complete extends Protocol
case class Fail ( ex : Throwable ) extends Protocol
val actor : ActorRef [ Protocol ] = targetActor ()
val sink : Sink [ String , NotUsed ] = ActorSink . actorRefWithBackpressure (
ref = actor ,
messageAdapter = ( responseActorRef : ActorRef [ Ack ], element ) => Message ( responseActorRef , element ),
onInitMessage = ( responseActorRef : ActorRef [ Ack ]) => Init ( responseActorRef ),
ackMessage = Ack ,
onCompleteMessage = Complete ,
onFailureMessage = ( exception ) => Fail ( exception ))
Source . single ( "msg1" ). runWith ( sink )
Java
copy source import akka . NotUsed ;
import akka . actor . typed . ActorRef ;
import akka . actor . typed . ActorSystem ;
import akka . stream . javadsl . Sink ;
import akka . stream . javadsl . Source ;
import akka . stream . typed . javadsl . ActorSink ;
enum Ack {
INSTANCE ;
}
interface Protocol {}
class Init implements Protocol {
private final ActorRef < Ack > ack ;
public Init ( ActorRef < Ack > ack ) {
this . ack = ack ;
}
}
class Message implements Protocol {
private final ActorRef < Ack > ackTo ;
private final String msg ;
public Message ( ActorRef < Ack > ackTo , String msg ) {
this . ackTo = ackTo ;
this . msg = msg ;
}
}
class Complete implements Protocol {}
class Fail implements Protocol {
private final Throwable ex ;
public Fail ( Throwable ex ) {
this . ex = ex ;
}
}
final ActorRef < Protocol > actorRef = // spawned actor
final Complete completeMessage = new Complete ();
final Sink < String , NotUsed > sink =
ActorSink . actorRefWithBackpressure (
actorRef ,
( responseActorRef , element ) -> new Message ( responseActorRef , element ),
( responseActorRef ) -> new Init ( responseActorRef ),
Ack . INSTANCE ,
completeMessage ,
( exception ) -> new Fail ( exception ));
Source . single ( "msg1" ). runWith ( sink , system );
Reactive Streams semantics
cancels when the actor terminates
backpressures when the actor acknowledgement has not arrived
Found an error in this documentation? The source code for this page can be found
here .
Please feel free to edit and contribute a pull request.