ActorFlow.ask
Use the AskPattern
to send each element as an ask
to the target actor, and expect a reply back that will be sent further downstream.
Dependency
This operator is included in:
- sbt
libraryDependencies += "com.typesafe.akka" %% "akka-stream-typed" % "2.5.32"
- Maven
<dependencies> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream-typed_2.12</artifactId> <version>2.5.32</version> </dependency> </dependencies>
- Gradle
dependencies { implementation "com.typesafe.akka:akka-stream-typed_2.12:2.5.32" }
Signature
def ask[I, Q, A](ref: ActorRef[Q])(makeMessage: (I, ActorRef[A]) => Q)(implicit timeout: Timeout): Flow[I, A, NotUsed]
def ask[I, Q, A](parallelism: Int)(ref: ActorRef[Q])(makeMessage: (I, ActorRef[A]) => Q)(implicit timeout: Timeout): Flow[I, A, NotUsed]
Description
Emit the contents of a file, as ByteString
s, materializes into a Future
CompletionStage
which will be completed with a IOResult
upon reaching the end of the file or if there is a failure.
Examples
- Scala
-
source
import akka.stream.scaladsl._ import akka.actor.typed.ActorRef import akka.actor.typed.scaladsl.Behaviors import scala.concurrent.duration._ import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import org.scalatest.WordSpecLike val ref = spawn(Behaviors.receiveMessage[Asking] { asking => asking.replyTo ! Reply(asking.s + "!!!") Behaviors.same }) val in: Future[immutable.Seq[Reply]] = Source(1 to 50) .map(_.toString) .via(ActorFlow.ask(ref)((el, replyTo: ActorRef[Reply]) => Asking(el, replyTo))) .runWith(Sink.seq)
- Java
-
source
class AskMe { final String payload; final ActorRef<String> replyTo; AskMe(String payload, ActorRef<String> replyTo) { this.payload = payload; this.replyTo = replyTo; } } Duration timeout = Duration.of(1, ChronoUnit.SECONDS); Source.repeat("hello").via(ActorFlow.ask(ref, timeout, AskMe::new)).to(Sink.ignore()); Source.repeat("hello") .via( ActorFlow.<String, AskMe, String>ask( ref, timeout, (msg, replyTo) -> new AskMe(msg, replyTo))) .to(Sink.ignore());