ActorFlow.ask

Use the AskPattern to send each element as an ask to the target actor, and expect a reply back that will be sent further downstream.

Actor interop operators

Dependency

This operator is included in:

sbt
libraryDependencies += "com.typesafe.akka" %% "akka-stream-typed" % "2.5.19"
Maven
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-stream-typed_2.11</artifactId>
  <version>2.5.19</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.typesafe.akka', name: 'akka-stream-typed_2.11', version: '2.5.19'
}

Signature

def ask[I, Q, A](ref: ActorRef[Q])(makeMessage: (I, ActorRef[A]) ⇒ Q)(implicit timeout: Timeout): Flow[I, A, NotUsed]
def ask[I, Q, A](parallelism: Int)(ref: ActorRef[Q])(makeMessage: (I, ActorRef[A]) ⇒ Q)(implicit timeout: Timeout): Flow[I, A, NotUsed]

Description

Emit the contents of a file, as ByteStrings, materializes into a Future CompletionStage which will be completed with a IOResult upon reaching the end of the file or if there is a failure.

Examples

Scala
import akka.stream.scaladsl._
import akka.actor.typed.ActorRef
import akka.actor.typed.scaladsl.Behaviors

import scala.concurrent.duration._
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import org.scalatest.WordSpecLike

val ref = spawn(Behaviors.receiveMessage[Asking] { asking ⇒
  asking.replyTo ! Reply(asking.s + "!!!")
  Behaviors.same
})

val in: Future[immutable.Seq[Reply]] =
  Source(1 to 50).map(_.toString)
    .via(ActorFlow.ask(ref)((el, replyTo: ActorRef[Reply]) ⇒ Asking(el, replyTo)))
    .runWith(Sink.seq)
Java
class AskMe {
  final String payload;
  final ActorRef<String> replyTo;

  AskMe(String payload, ActorRef<String> replyTo) {
    this.payload = payload;
    this.replyTo = replyTo;
  }
}

Duration timeout = Duration.of(1, ChronoUnit.SECONDS);

Source.repeat("hello")
  .via(ActorFlow.ask(ref, timeout, AskMe::new))
  .to(Sink.ignore());

Source.repeat("hello")
  .via(ActorFlow.<String, AskMe, String>ask(ref, timeout, (msg, replyTo) -> new AskMe(msg, replyTo)))
  .to(Sink.ignore());
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.