ActorFlow.ask
Use the “Ask Pattern” to send each stream element as an ask
to the target actor (of the new actors API), and expect a reply that will be emitted downstream.
Actor interop operators
Dependency
The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.
sbt resolvers += "Akka library repository" . at ( "https://repo.akka.io/maven" )
Maven <project>
...
<repositories>
<repository>
<id> akka-repository </id>
<name> Akka library repository </name>
<url> https://repo.akka.io/maven </url>
</repository>
</repositories>
</project>
Gradle repositories {
mavenCentral ()
maven {
url "https://repo.akka.io/maven"
}
}
This operator is included in:
sbt val AkkaVersion = "2.8.7"
libraryDependencies += "com.typesafe.akka" %% "akka-stream-typed" % AkkaVersion
Maven <properties>
<scala.binary.version> 2.13 </scala.binary.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId> com.typesafe.akka </groupId>
<artifactId> akka-bom_${scala.binary.version} </artifactId>
<version> 2.8.7 </version>
<type> pom </type>
<scope> import </scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId> com.typesafe.akka </groupId>
<artifactId> akka-stream-typed_${scala.binary.version} </artifactId>
</dependency>
</dependencies>
Gradle def versions = [
ScalaBinary : "2.13"
]
dependencies {
implementation platform ( "com.typesafe.akka:akka-bom_${versions.ScalaBinary}:2.8.7" )
implementation "com.typesafe.akka:akka-stream-typed_${versions.ScalaBinary}"
}
Signature
ActorFlow.ask
ActorFlow.ask
Description
Use the Ask pattern to send a request-reply message to the target ref
actor. If any of the asks times out it will fail the stream with an AskTimeoutException
AskTimeoutException
.
The ask
operator requires
the actor ref
,
a makeMessage
function to create the message sent to the actor from the incoming element, and the actor ref accepting the actor’s reply message
a timeout.
See also:
Examples
The ActorFlow.ask
sends a message to the actor. The actor expects Asking
messages which contain the actor ref for replies of type Reply
. When the actor for replies receives a reply, the ActorFlow.ask
stream stage emits the reply and the map
extracts the message String
.
Scala
copy source import akka . stream . scaladsl .{ Flow , Sink , Source }
import akka . stream . typed . scaladsl . ActorFlow
import akka . actor . typed . ActorRef
import akka . actor . typed . scaladsl . Behaviors
import akka . util . Timeout
final case class Asking ( s : String , replyTo : ActorRef [ Reply ])
final case class Reply ( msg : String )
final case class AskingWithStatus ( s : String , replyTo : ActorRef [ StatusReply [ String ]])
val ref = spawn ( Behaviors . receiveMessage [ Asking ] { asking =>
asking . replyTo ! Reply ( asking . s + "!!!" )
Behaviors . same
})
implicit val timeout : Timeout = 1.second
val askFlow : Flow [ String , Reply , NotUsed ] =
ActorFlow . ask ( ref )( Asking . apply )
// explicit creation of the sent message
val askFlowExplicit : Flow [ String , Reply , NotUsed ] =
ActorFlow . ask ( ref )( makeMessage = ( el , replyTo : ActorRef [ Reply ]) => Asking ( el , replyTo ))
val in : Future [ immutable . Seq [ String ]] =
Source ( 1 to 50 ). map ( _ . toString ). via ( askFlow ). map ( _ . msg ). runWith ( Sink . seq )
Java
copy source import akka . actor . typed . ActorRef ;
import akka . actor . typed . ActorSystem ;
import akka . pattern . StatusReply ;
import akka . stream . javadsl . Flow ;
import akka . stream . javadsl . Sink ;
import akka . stream . javadsl . Source ;
import akka . stream . typed . javadsl . ActorFlow ;
class Asking {
final String payload ;
final ActorRef < Reply > replyTo ;
public Asking ( String payload , ActorRef < Reply > replyTo ) {
this . payload = payload ;
this . replyTo = replyTo ;
}
}
static class AskingWithStatus {
final String payload ;
final ActorRef < StatusReply < String >> replyTo ;
public AskingWithStatus ( String payload , ActorRef < StatusReply < String >> replyTo ) {
this . payload = payload ;
this . replyTo = replyTo ;
}
}
class Reply {
public final String msg ;
public Reply ( String msg ) {
this . msg = msg ;
}
}
final ActorRef < Asking > actorRef = // ???
final ActorRef < AskingWithStatus > actorWithStatusRef = // ???
Duration timeout = Duration . ofSeconds ( 1 );
// method reference notation
Flow < String , Reply , NotUsed > askFlow = ActorFlow . ask ( actorRef , timeout , Asking :: new );
// explicit creation of the sent message
Flow < String , Reply , NotUsed > askFlowExplicit =
ActorFlow . ask ( actorRef , timeout , ( msg , replyTo ) -> new Asking ( msg , replyTo ));
Flow < String , String , NotUsed > askFlowExplicitWithStatus =
ActorFlow . askWithStatus (
actorWithStatusRef , timeout , ( msg , replyTo ) -> new AskingWithStatus ( msg , replyTo ));
Source . repeat ( "hello" ). via ( askFlow ). map ( reply -> reply . msg ). runWith ( Sink . seq (), system );
Reactive Streams semantics
emits when the futures (in submission order) created by the ask pattern internally are completed
backpressures when the number of futures reaches the configured parallelism and the downstream backpressures
completes when upstream completes and all futures have been completed and all elements have been emitted
fails when the passed-in actor terminates, or when any of the ask
s exceed a timeout
cancels when downstream cancels
Found an error in this documentation? The source code for this page can be found
here .
Please feel free to edit and contribute a pull request.