ActorFlow.ask

Use the AskPattern to send each element as an ask to the target actor, and expect a reply back that will be sent further downstream.

Actor interop operators

Dependency

This operator is included in:

sbt
libraryDependencies += "com.typesafe.akka" %% "akka-stream-typed" % "2.5.32"
Maven
<dependencies>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream-typed_2.12</artifactId>
    <version>2.5.32</version>
  </dependency>
</dependencies>
Gradle
dependencies {
  implementation "com.typesafe.akka:akka-stream-typed_2.12:2.5.32"
}

Signature

def ask[I, Q, A](ref: ActorRef[Q])(makeMessage: (I, ActorRef[A]) => Q)(implicit timeout: Timeout): Flow[I, A, NotUsed]
def ask[I, Q, A](parallelism: Int)(ref: ActorRef[Q])(makeMessage: (I, ActorRef[A]) => Q)(implicit timeout: Timeout): Flow[I, A, NotUsed]

Description

Emit the contents of a file, as ByteStrings, materializes into a Future CompletionStage which will be completed with a IOResult upon reaching the end of the file or if there is a failure.

Examples

Scala
sourceimport akka.stream.scaladsl._
import akka.actor.typed.ActorRef
import akka.actor.typed.scaladsl.Behaviors

import scala.concurrent.duration._
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import org.scalatest.WordSpecLike

val ref = spawn(Behaviors.receiveMessage[Asking] { asking =>
  asking.replyTo ! Reply(asking.s + "!!!")
  Behaviors.same
})

val in: Future[immutable.Seq[Reply]] =
  Source(1 to 50)
    .map(_.toString)
    .via(ActorFlow.ask(ref)((el, replyTo: ActorRef[Reply]) => Asking(el, replyTo)))
    .runWith(Sink.seq)
Java
sourceclass AskMe {
  final String payload;
  final ActorRef<String> replyTo;

  AskMe(String payload, ActorRef<String> replyTo) {
    this.payload = payload;
    this.replyTo = replyTo;
  }
}

Duration timeout = Duration.of(1, ChronoUnit.SECONDS);

Source.repeat("hello").via(ActorFlow.ask(ref, timeout, AskMe::new)).to(Sink.ignore());

Source.repeat("hello")
    .via(
        ActorFlow.<String, AskMe, String>ask(
            ref, timeout, (msg, replyTo) -> new AskMe(msg, replyTo)))
    .to(Sink.ignore());
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.