ActorFlow.ask
Use the AskPattern
to send each element as an ask
to the target actor, and expect a reply back that will be sent further downstream.
Dependency
This operator is included in:
Signature
def ask[I, Q, A](ref: ActorRef[Q])(makeMessage: (I, ActorRef[A]) => Q)(implicit timeout: Timeout): Flow[I, A, NotUsed]
def ask[I, Q, A](parallelism: Int)(ref: ActorRef[Q])(makeMessage: (I, ActorRef[A]) => Q)(implicit timeout: Timeout): Flow[I, A, NotUsed]
Description
Emit the contents of a file, as ByteString
s, materializes into a Future
which will be completed with a IOResult
upon reaching the end of the file or if there is a failure.
Examples
- Scala
-
source
import akka.stream.scaladsl._ import akka.actor.typed.ActorRef import akka.actor.typed.scaladsl.Behaviors import scala.concurrent.duration._ import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import org.scalatest.WordSpecLike val ref = spawn(Behaviors.receiveMessage[Asking] { asking => asking.replyTo ! Reply(asking.s + "!!!") Behaviors.same }) val in: Future[immutable.Seq[Reply]] = Source(1 to 50) .map(_.toString) .via(ActorFlow.ask(ref)((el, replyTo: ActorRef[Reply]) => Asking(el, replyTo))) .runWith(Sink.seq)
- Java