Futures

Dependency

This section explains using plain Scala Futures but focuses on their interop with Akka Actors, so to follow those examples you will want to depend on:

sbt
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.32"
Maven
<dependencies>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-actor_2.12</artifactId>
    <version>2.5.32</version>
  </dependency>
</dependencies>
Gradle
dependencies {
  implementation "com.typesafe.akka:akka-actor_2.12:2.5.32"
}

Introduction

In the Scala Standard Library, a Future is a data structure used to retrieve the result of some concurrent operation. This result can be accessed synchronously (blocking) or asynchronously (non-blocking).

To be able to use this from Java, Akka provides a java friendly interface in akka.dispatch.Futures.

See also Java 8 Compatibility for Java compatibility.

Execution Contexts

In order to execute callbacks and operations, Futures need something called an ExecutionContext, which is very similar to a java.util.concurrent.Executor. If you have an ActorSystem in scope, it will use its default dispatcher as the ExecutionContext, or you can use the factory methods provided by the ExecutionContexts class to wrap Executors and ExecutorServices, or even create your own.

Scala
sourceimport scala.concurrent.{ ExecutionContext, Promise }

implicit val ec = ExecutionContext.fromExecutorService(yourExecutorServiceGoesHere)

// Do stuff with your brand new shiny ExecutionContext
val f = Promise.successful("foo")

// Then shut your ExecutionContext down at some
// appropriate place in your program/application
ec.shutdown()
Java
sourceimport akka.dispatch.*;
import jdocs.AbstractJavaTest;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Await;
import scala.concurrent.Promise;
import akka.util.Timeout;

ExecutionContext ec = ExecutionContexts.fromExecutorService(yourExecutorServiceGoesHere);

// Use ec with your Futures
Future<String> f1 = Futures.successful("foo");

// Then you shut down the ExecutorService at the end of your application.
yourExecutorServiceGoesHere.shutdown();

Within Actors

Each actor is configured to be run on a MessageDispatcher, and that dispatcher doubles as an ExecutionContext. If the nature of the Future calls invoked by the actor matches or is compatible with the activities of that actor (e.g. all CPU bound and no latency requirements), then it may be easiest to reuse the dispatcher for running the Futures by importing getContext().getDispatcher().

Scala
sourceclass A extends Actor {
  import context.dispatcher
  val f = Future("hello")
  def receive = {
    case _ =>
  }
}
Java
sourceimport akka.actor.AbstractActor;
import akka.dispatch.Futures;

public class ActorWithFuture extends AbstractActor {
  ActorWithFuture() {
    Futures.future(() -> "hello", getContext().dispatcher());
  }

  @Override
  public Receive createReceive() {
    return AbstractActor.emptyBehavior();
  }
}

Use with Actors

There are generally two ways of getting a reply from an AbstractActor: the first is by a sent message (actorRef.tell(msg, sender)), which only works if the original sender was an AbstractActor) and the second is through a Future.

Using the ActorRef’s ask method to send a message will return a Future. To wait for and retrieve the actual result the simplest method is:

Scala
sourceimport scala.concurrent.Await
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._

implicit val timeout = Timeout(5 seconds)
val future = actor ? msg // enabled by the “ask” import
val result = Await.result(future, timeout.duration).asInstanceOf[String]
Java
sourceimport akka.dispatch.*;
import jdocs.AbstractJavaTest;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Await;
import scala.concurrent.Promise;
import akka.util.Timeout;

Timeout timeout = Timeout.create(Duration.ofSeconds(5));
Future<Object> future = Patterns.ask(actor, msg, timeout);
String result = (String) Await.result(future, timeout.duration());

This will cause the current thread to block and wait for the AbstractActor to ‘complete’ the Future with its reply. Blocking is discouraged though as it will cause performance problems. The blocking operations are located in Await.result and Await.ready to make it easy to spot where blocking occurs. Alternatives to blocking are discussed further within this documentation. Also note that the Future returned by an AbstractActor is a Future<Object> since an AbstractActor is dynamic. That is why the cast to String is used in the above sample.

Warning

Await.result and Await.ready are provided for exceptional situations where you must block, a good rule of thumb is to only use them if you know why you must block. For all other cases, use asynchronous composition as described below.

Use the pipe pattern

Another useful message-transfer pattern is “pipe”, which is to send the result of CompletableFuture to another actor, upon completion of the CompletableFuture. The pipe pattern can be used by importing akka.pattern.Patterns.pipe.

Scala
sourceclass ActorUsingPipeTo(target: ActorRef) extends Actor {
  // akka.pattern.pipe needs to be imported
  import akka.pattern.{ ask, pipe }
  // implicit ExecutionContext should be in scope
  implicit val ec: ExecutionContext = context.dispatcher
  implicit val timeout: Timeout = 5.seconds

  def receive = {
    case _ =>
      val future = target ? "some message"
      future.pipeTo(sender()) // use the pipe pattern
  }
}
Java
sourceimport static akka.pattern.Patterns.ask;
import static akka.pattern.Patterns.pipe;
public class ActorUsingPipeTo extends AbstractActor {
  ActorRef target;
  Duration timeout;

  ActorUsingPipeTo(ActorRef target) {
    this.target = target;
    this.timeout = Duration.ofSeconds(5);
  }

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(
            String.class,
            msg -> {
              CompletableFuture<Object> fut =
                  ask(target, "some message", timeout).toCompletableFuture();

              // the pipe pattern
              pipe(fut, getContext().dispatcher()).to(getSender());
            })
        .build();
  }
}

To see how this works in more detail, let’s introduce a small example consisting of three different actors, UserProxyActor, UserDataActor and UserActivityActor. In this example, when you need information about a user, you send a request message to UserProxyActor, then it gets the corresponding result from the appropriate backend actor based on the request message type.

The message types you send to UserProxyActor are GetUserData and GetUserActivities:

Scala
sourceobject UserProxyActor {
  sealed trait Message
  case object GetUserData extends Message
  case object GetUserActivities extends Message
}
Java
sourcepublic class GetUserData {}

public class GetUserActivities {}

and UserData and ArrayList<UserActivity> are returned to the original sender in the end.

Scala
sourcecase class UserData(data: String)
case class UserActivity(activity: String)
Java
sourcepublic class UserData {
  final String data;

  UserData(String data) {
    this.data = data;
  }
}

public class UserActivity {
  final String activity;

  UserActivity(String activity) {
    this.activity = activity;
  }
}

The backend UserDataActor and UserActivityActor are defined as follows:

Scala
sourceclass UserDataActor extends Actor {
  import UserDataActor._

  //holds the user data internally
  var internalData: UserData = UserData("initial data")

  def receive = {
    case Get =>
      sender() ! internalData
  }
}

object UserDataActor {
  case object Get
}
Java
sourcepublic class UserDataActor extends AbstractActor {
  UserData internalData;

  UserDataActor() {
    this.internalData = new UserData("initial data");
  }

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(GetFromUserDataActor.class, msg -> sender().tell(internalData, self()))
        .build();
  }
}

public class GetFromUserDataActor {}

UserDataActor holds the data in memory, so that it can return the current state of the user data quickly upon a request.

On the other hand, UserActivityActor queries into a repository to retrieve historical user activities then sends the result to the sender() which is UserProxy in this case, with the pipe pattern.

Scala
sourcetrait UserActivityRepository {
  def queryHistoricalActivities(userId: String): Future[List[UserActivity]]
}

class UserActivityActor(val userId: String, repository: UserActivityRepository) extends Actor {
  import akka.pattern.pipe
  import UserActivityActor._
  implicit val ec: ExecutionContext = context.dispatcher

  def receive = {
    case Get =>
      // user's historical activities are retrieved
      // via the separate repository
      repository.queryHistoricalActivities(userId).pipeTo(sender())
  }
}

object UserActivityActor {
  case object Get
}
Java
sourceimport static akka.pattern.Patterns.pipe;
interface UserActivityRepository {
  CompletableFuture<ArrayList<UserActivity>> queryHistoricalActivities(String userId);
}

public class UserActivityActor extends AbstractActor {
  String userId;
  UserActivityRepository repository;

  UserActivityActor(String userId, UserActivityRepository repository) {
    this.userId = userId;
    this.repository = repository;
  }

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(
            GetFromUserActivityActor.class,
            msg -> {
              CompletableFuture<ArrayList<UserActivity>> fut =
                  repository.queryHistoricalActivities(userId);

              pipe(fut, getContext().dispatcher()).to(sender());
            })
        .build();
  }
}

public class GetFromUserActivityActor {}

Since it needs to talk to the separate repository, it takes time to retrieve the list of UserActivity, hence the return type of queryHistoricalActivities is CompletableFuture. To send back the result to the sender() we used the pipe method, so that the result of the CompletableFuture is sent to sender() upon CompletableFuture’s completion.

Finally, the definition of UserProxyActor is as below.

Scala
sourceclass UserProxyActor(userData: ActorRef, userActivities: ActorRef) extends Actor {
  import UserProxyActor._
  import akka.pattern.{ ask, pipe }
  implicit val ec: ExecutionContext = context.dispatcher

  implicit val timeout = Timeout(5 seconds)

  def receive = {
    case GetUserData =>
      (userData ? UserDataActor.Get).pipeTo(sender())
    case GetUserActivities =>
      (userActivities ? UserActivityActor.Get).pipeTo(sender())
  }
}
Java
sourceimport static akka.pattern.Patterns.ask;
import static akka.pattern.Patterns.pipe;
public class UserProxyActor extends AbstractActor {
  ActorRef userActor;
  ActorRef userActivityActor;
  Duration timeout = Duration.ofSeconds(5);

  UserProxyActor(ActorRef userActor, ActorRef userActivityActor) {
    this.userActor = userActor;
    this.userActivityActor = userActivityActor;
  }

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(
            GetUserData.class,
            msg -> {
              CompletableFuture<Object> fut =
                  ask(userActor, new GetUserData(), timeout).toCompletableFuture();

              pipe(fut, getContext().dispatcher());
            })
        .match(
            GetUserActivities.class,
            msg -> {
              CompletableFuture<Object> fut =
                  ask(userActivityActor, new GetFromUserActivityActor(), timeout)
                      .toCompletableFuture();

              pipe(fut, getContext().dispatcher()).to(sender());
            })
        .build();
  }
}

Note that the pipe method used with the ask method. Using pipe with the ask method is a common practice when you want to relay a message from one actor to another.

Use Directly

A common use case within Akka is to have some computation performed concurrently without needing the extra utility of an AbstractActor. If you find yourself creating a pool of AbstractActors for the sole reason of performing a calculation in parallel, there is an easier (and faster) way:

Scala
sourceimport scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._

val future = Future {
  "Hello" + "World"
}
future.foreach(println)
Java
sourceimport java.time.Duration;
import java.util.concurrent.*;

import scala.util.Try;

import akka.japi.Function;

import static akka.dispatch.Futures.future;
import static java.util.concurrent.TimeUnit.SECONDS;

Future<String> f =
    future(
        new Callable<String>() {
          public String call() {
            return "Hello" + "World";
          }
        },
        system.dispatcher());

f.onComplete(new PrintResult<Try<String>>(), system.dispatcher());

In the above code the block passed to Future will be executed by the default Dispatcher, with the return value of the block used to complete the Future (in this case, the result would be the string: “HelloWorld”). Unlike a Future that is returned from an AbstractActor, this Future is properly typed, and we also avoid the overhead of managing an AbstractActor.

You can also create already completed Futures using the Futures class, which can be either successes:

Scala
sourceval future = Future.successful("Yay!")
Java
sourceFuture<String> future = Futures.successful("Yay!");

Or failures:

Scala
sourceval otherFuture = Future.failed[String](new IllegalArgumentException("Bang!"))
Java
sourceFuture<String> otherFuture = Futures.failed(new IllegalArgumentException("Bang!"));

It is also possible to create an empty Promise, to be filled later, and obtain the corresponding Future:

Scala
sourceval promise = Promise[String]()
val theFuture = promise.future
promise.success("hello")
Java
sourcePromise<String> promise = Futures.promise();
Future<String> theFuture = promise.future();
promise.success("hello");

For these examples PrintResult is defined as follows:

sourcepublic static final class PrintResult<T> extends OnSuccess<T> {
  @Override
  public final void onSuccess(T t) {
    System.out.println(t);
  }
}

Functional Futures

Scala’s Future has several monadic methods that are very similar to the ones used by Scala’s collections. These allow you to create ‘pipelines’ or ‘streams’ that the result will travel through.

The first method for working with Future functionally is map. This method takes a Mapper which performs some operation on the result of the Future, and returning a new result. The return value of the map method is another Future that will contain the new result:

Scala
sourceval f1 = Future {
  "Hello" + "World"
}
val f2 = f1.map { x =>
  x.length
}
f2.foreach(println)
Java
sourceimport java.time.Duration;
import java.util.concurrent.*;

import scala.util.Try;

import akka.japi.Function;

import static akka.dispatch.Futures.future;
import static java.util.concurrent.TimeUnit.SECONDS;

final ExecutionContext ec = system.dispatcher();

Future<String> f1 =
    future(
        new Callable<String>() {
          public String call() {
            return "Hello" + "World";
          }
        },
        ec);

Future<Integer> f2 =
    f1.map(
        new Mapper<String, Integer>() {
          public Integer apply(String s) {
            return s.length();
          }
        },
        ec);

f2.onComplete(new PrintResult<Try<Integer>>(), system.dispatcher());

In this example we are joining two strings together within a Future. Instead of waiting for f1 to complete, we apply our function that calculates the length of the string using the map method. Now we have a second Future, f2, that will eventually contain an Integer. When our original Future, f1, completes, it will also apply our function and complete the second Future with its result. When we finally get the result, it will contain the number 10. Our original Future still contains the string “HelloWorld” and is unaffected by the map.

Something to note when using these methods: passed work is always dispatched on the provided ExecutionContext. Even if the Future has already been completed, when one of these methods is called.

Composing Futures

It is very often desirable to be able to combine different Futures with each other, below are some examples on how that can be done in a non-blocking fashion.

Scala
source// oddActor returns odd numbers sequentially from 1 as a List[Future[Int]]
val listOfFutures = List.fill(100)(akka.pattern.ask(oddActor, GetNext).mapTo[Int])

// now we have a Future[List[Int]]
val futureList = Future.sequence(listOfFutures)

// Find the sum of the odd numbers
val oddSum = futureList.map(_.sum)
oddSum.foreach(println)
Java
sourceimport static akka.dispatch.Futures.sequence;

final ExecutionContext ec = system.dispatcher();
// Some source generating a sequence of Future<Integer>:s
Iterable<Future<Integer>> listOfFutureInts = source;

// now we have a Future[Iterable[Integer]]
Future<Iterable<Integer>> futureListOfInts = sequence(listOfFutureInts, ec);

// Find the sum of the odd numbers
Future<Long> futureSum =
    futureListOfInts.map(
        new Mapper<Iterable<Integer>, Long>() {
          public Long apply(Iterable<Integer> ints) {
            long sum = 0;
            for (Integer i : ints) sum += i;
            return sum;
          }
        },
        ec);

futureSum.onComplete(new PrintResult<Try<Long>>(), system.dispatcher());

To better explain what happened in the example, Future.sequence is taking the Iterable<Future<Integer>> and turning it into a Future<Iterable<Integer>>. We can then use map to work with the Iterable<Integer> directly, and we aggregate the sum of the Iterable.

The traverse method is similar to sequence, but it takes a sequence of A and applies a function from A to Future<B> and returns a Future<Iterable<B>>, enabling parallel map over the sequence, if you use Futures.future to create the Future.

Scala
sourceval futureList = Future.traverse((1 to 100).toList)(x => Future(x * 2 - 1))
val oddSum = futureList.map(_.sum)
oddSum.foreach(println)
Java
sourceimport static akka.dispatch.Futures.traverse;

final ExecutionContext ec = system.dispatcher();
// Just a sequence of Strings
Iterable<String> listStrings = Arrays.asList("a", "b", "c");

Future<Iterable<String>> futureResult =
    traverse(
        listStrings,
        new Function<String, Future<String>>() {
          public Future<String> apply(final String r) {
            return future(
                new Callable<String>() {
                  public String call() {
                    return r.toUpperCase();
                  }
                },
                ec);
          }
        },
        ec);

// Returns the sequence of strings as upper case
futureResult.onComplete(new PrintResult<Try<Iterable<String>>>(), system.dispatcher());

Then there’s a method that’s called fold that takes a start-value, a sequence of Futures and a function from the type of the start-value, a timeout, and the type of the futures and returns something with the same type as the start-value, and then applies the function to all elements in the sequence of futures, non-blockingly, the execution will be started when the last of the Futures is completed.

Scala
source// Create a sequence of Futures
val futures = for (i <- 1 to 1000) yield Future(i * 2)
val futureSum = Future.fold(futures)(0)(_ + _)
futureSum.foreach(println)
Java
sourceimport akka.japi.Function2;
import static akka.dispatch.Futures.fold;


final ExecutionContext ec = system.dispatcher();

// A sequence of Futures, in this case Strings
Iterable<Future<String>> futures = source;

// Start value is the empty string
Future<String> resultFuture =
    fold(
        "",
        futures,
        new Function2<String, String, String>() {
          public String apply(String r, String t) {
            return r + t; // Just concatenate
          }
        },
        ec);

resultFuture.onComplete(new PrintResult<Try<String>>(), system.dispatcher());

That’s all it takes!

If the sequence passed to fold is empty, it will return the start-value, in the case above, that will be 0. In some cases you don’t have a start-value and you’re able to use the value of the first completing Future in the sequence as the start-value, you can use reduce, it works like this:

Scala
source// Create a sequence of Futures
val futures = for (i <- 1 to 1000) yield Future(i * 2)
val futureSum = Future.reduce(futures)(_ + _)
futureSum.foreach(println)
Java
sourceimport static akka.dispatch.Futures.reduce;


final ExecutionContext ec = system.dispatcher();
// A sequence of Futures, in this case Strings
Iterable<Future<String>> futures = source;

Future<Object> resultFuture =
    reduce(
        futures,
        new Function2<Object, String, Object>() {
          public Object apply(Object r, String t) {
            return r + t; // Just concatenate
          }
        },
        ec);

resultFuture.onComplete(new PrintResult<Try<Object>>(), system.dispatcher());

Same as with fold, the execution will be done asynchronously when the last of the Future is completed, you can also parallelize it by chunking your futures into sub-sequences and reduce them, and then reduce the reduced results again.

Callbacks

Sometimes you just want to listen to a Future being completed, and react to that not by creating a new Future, but by side-effecting. For this, Future supports onComplete:

Scala
sourcefuture.onComplete {
  case Success(result)  => doSomethingOnSuccess(result)
  case Failure(failure) => doSomethingOnFailure(failure)
}
Java
sourcefinal ExecutionContext ec = system.dispatcher();

future.onComplete(
    new OnComplete<String>() {
      public void onComplete(Throwable failure, String result) {
        if (failure != null) {
          // We got a failure, handle it here
        } else {
          // We got a result, do something with it
        }
      }
    },
    ec);

Define Ordering

Since callbacks are executed in any order and potentially in parallel, it can be tricky at the times when you need sequential ordering of operations. But there’s a solution and its name is andThen. It creates a new Future with the specified callback, a Future that will have the same result as the Future it’s called on, which allows for ordering like in the following sample:

Scala
sourceval result = Future { loadPage(url) }
  .andThen {
    case Failure(exception) => log(exception)
  }
  .andThen {
    case _ => watchSomeTV()
  }
result.foreach(println)
Java
sourcefinal ExecutionContext ec = system.dispatcher();
Future<String> future1 =
    Futures.successful("value")
        .andThen(
            new OnComplete<String>() {
              public void onComplete(Throwable failure, String result) {
                if (failure != null) sendToIssueTracker(failure);
              }
            },
            ec)
        .andThen(
            new OnComplete<String>() {
              public void onComplete(Throwable failure, String result) {
                if (result != null) sendToTheInternetz(result);
              }
            },
            ec);

Auxiliary Methods

Future fallbackTo combines 2 Futures into a new Future, and will hold the successful value of the second Future if the first Future fails.

Scala
sourceval future4 = future1.fallbackTo(future2).fallbackTo(future3)
future4.foreach(println)
Java
sourceFuture<String> future1 = Futures.failed(new IllegalStateException("OHNOES1"));
Future<String> future2 = Futures.failed(new IllegalStateException("OHNOES2"));
Future<String> future3 = Futures.successful("bar");
// Will have "bar" in this case
Future<String> future4 = future1.fallbackTo(future2).fallbackTo(future3);
future4.onComplete(new PrintResult<Try<String>>(), system.dispatcher());

You can also combine two Futures into a new Future that will hold a tuple of the two Futures successful results, using the zip operation.

Scala
sourceval future3 = future1.zip(future2).map { case (a, b) => a + " " + b }
future3.foreach(println)
Java
sourcefinal ExecutionContext ec = system.dispatcher();
Future<String> future1 = Futures.successful("foo");
Future<String> future2 = Futures.successful("bar");
Future<String> future3 =
    future1
        .zip(future2)
        .map(
            new Mapper<scala.Tuple2<String, String>, String>() {
              public String apply(scala.Tuple2<String, String> zipped) {
                return zipped._1() + " " + zipped._2();
              }
            },
            ec);

future3.onComplete(new PrintResult<Try<String>>(), system.dispatcher());

Exceptions

Since the result of a Future is created concurrently to the rest of the program, exceptions must be handled differently. It doesn’t matter if an AbstractActor or the dispatcher is completing the Future, if an Exception is caught the Future will contain it instead of a valid result. If a Future does contain an Exception, calling Await.result will cause it to be thrown again so it can be handled properly.

It is also possible to handle an Exception by returning a different result. This is done with the recover method. For example:

Scala
sourceval future = akka.pattern.ask(actor, msg1).recover {
  case e: ArithmeticException => 0
}
future.foreach(println)
Java
sourcefinal ExecutionContext ec = system.dispatcher();

Future<Integer> future =
    future(
            new Callable<Integer>() {
              public Integer call() {
                return 1 / 0;
              }
            },
            ec)
        .recover(
            new Recover<Integer>() {
              public Integer recover(Throwable problem) throws Throwable {
                if (problem instanceof ArithmeticException) return 0;
                else throw problem;
              }
            },
            ec);

future.onComplete(new PrintResult<Try<Integer>>(), system.dispatcher());

In this example, if the actor replied with a akka.actor.Status.Failure containing the ArithmeticException, our Future would have a result of 0. The recover method works very similarly to the standard try/catch blocks, so multiple Exceptions can be handled in this manner, and if an Exception is not handled this way it will behave as if we hadn’t used the recover method.

You can also use the recoverWith method, which has the same relationship to recover as flatMap has to map, and is use like this:

Scala
sourceval future = akka.pattern.ask(actor, msg1).recoverWith {
  case e: ArithmeticException => Future.successful(0)
  case foo: IllegalArgumentException =>
    Future.failed[Int](new IllegalStateException("All br0ken!"))
}
future.foreach(println)
Java
sourcefinal ExecutionContext ec = system.dispatcher();

Future<Integer> future =
    future(
            new Callable<Integer>() {
              public Integer call() {
                return 1 / 0;
              }
            },
            ec)
        .recoverWith(
            new Recover<Future<Integer>>() {
              public Future<Integer> recover(Throwable problem) throws Throwable {
                if (problem instanceof ArithmeticException) {
                  return future(
                      new Callable<Integer>() {
                        public Integer call() {
                          return 0;
                        }
                      },
                      ec);
                } else throw problem;
              }
            },
            ec);

future.onComplete(new PrintResult<Try<Integer>>(), system.dispatcher());

After

akka.pattern.after makes it easy to complete a Future with a value or exception after a timeout.

Scala
source// import akka.pattern.after

val delayed =
  akka.pattern.after(200 millis, using = system.scheduler)(Future.failed(new IllegalStateException("OHNOES")))
val future = Future { Thread.sleep(1000); "foo" }
val result = Future.firstCompletedOf(Seq(future, delayed))
Java
sourceimport static akka.pattern.Patterns.after;

import java.util.Arrays;
final ExecutionContext ec = system.dispatcher();
Future<String> failExc = Futures.failed(new IllegalStateException("OHNOES1"));
Timeout delay = Timeout.create(Duration.ofMillis(200));
Future<String> delayed = Patterns.after(delay.duration(), system.scheduler(), ec, failExc);
Future<String> future =
    future(
        new Callable<String>() {
          public String call() throws InterruptedException {
            Thread.sleep(1000);
            return "foo";
          }
        },
        ec);
Future<String> result =
    Futures.firstCompletedOf(Arrays.<Future<String>>asList(future, delayed), ec);

Retry

akka.pattern.Patterns.retry will retry a CompletionStage class some number of times with a delay between each attempt.

Scala
sourceimplicit val scheduler = system.scheduler
//Given some future that will succeed eventually
@volatile var failCount = 0
def attempt() = {
  if (failCount < 5) {
    failCount += 1
    Future.failed(new IllegalStateException(failCount.toString))
  } else Future.successful(5)
}
//Return a new future that will retry up to 10 times
val retried = akka.pattern.retry(() => attempt(), 10, 100 milliseconds)
Java
source
import static akka.pattern.Patterns.retry; final ExecutionContext ec = system.dispatcher(); Callable<CompletionStage<String>> attempt = () -> CompletableFuture.completedFuture("test"); CompletionStage<String> retriedFuture = retry(attempt, 3, java.time.Duration.ofMillis(200), system.scheduler(), ec);

Java 8, CompletionStage and CompletableFuture

Starting with Akka 2.4.2 we have begun to introduce Java 8 java.util.concurrent.CompletionStage in Java APIs. It’s a scala.concurrent.Future counterpart in Java; conversion from scala.concurrent.Future is done using scala-java8-compat library.

Unlike scala.concurrent.Future which has async methods only, CompletionStage has async and non-async methods.

The scala-java8-compat library returns its own implementation of CompletionStage which delegates all non-async methods to their async counterparts. The implementation extends standard Java CompletableFuture. Java 8 CompletableFuture creates a new instance of CompletableFuture for any new stage, which means scala-java8-compat implementation is not used after the first mapping method.

Note

After adding any additional computation stage to CompletionStage returned by scala-java8-compat (e.g. CompletionStage instances returned by Akka) it falls back to standard behavior of Java CompletableFuture.

Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

All async methods without an explicit Executor are performed using the ForkJoinPool.commonPool() executor.

Non-async methods

When non-async methods are applied on a not yet completed CompletionStage, they are completed by the thread which completes initial CompletionStage:

sourcefinal ExecutionContext ec = system.dispatcher();
final CountDownLatch countDownLatch = new CountDownLatch(1);

Future<String> scalaFuture =
    Futures.future(
        () -> {
          assertThat(
              Thread.currentThread().getName(),
              containsString("akka.actor.default-dispatcher"));
          countDownLatch.await(); // do not complete yet
          return "hello";
        },
        ec);

CompletionStage<String> fromScalaFuture =
    FutureConverters.toJava(scalaFuture)
        .thenApply(
            s -> { // 1
              assertThat(
                  Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool"));
              return s;
            })
        .thenApply(
            s -> { // 2
              assertThat(
                  Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool"));
              return s;
            })
        .thenApply(
            s -> { // 3
              assertThat(
                  Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool"));
              return s;
            });

countDownLatch.countDown(); // complete scalaFuture

In this example Scala Future is converted to CompletionStage just like Akka does. The completion is delayed: we are calling thenApply multiple times on a not yet complete CompletionStage, then complete the Future.

First thenApply is actually performed on scala-java8-compat instance and computational stage (lambda) execution is delegated to default Java thenApplyAsync which is executed on ForkJoinPool.commonPool().

Second and third thenApply methods are executed on Java 8 CompletableFuture instance which executes computational stages on the thread which completed the first stage. It is never executed on a thread of Scala Future because default thenApply breaks the chain and executes on ForkJoinPool.commonPool().

In the next example thenApply methods are executed on an already completed Future/CompletionStage:

sourceFuture<String> scalaFuture =
    Futures.future(
        () -> {
          assertThat(
              Thread.currentThread().getName(),
              containsString("akka.actor.default-dispatcher"));
          return "hello";
        },
        ec);

CompletionStage<String> completedStage =
    FutureConverters.toJava(scalaFuture)
        .thenApply(
            s -> { // 1
              assertThat(
                  Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool"));
              return s;
            });

completedStage.toCompletableFuture().get(2, SECONDS); // complete current CompletionStage
final String currentThread = Thread.currentThread().getName();

CompletionStage<String> stage2 =
    completedStage
        .thenApply(
            s -> { // 2
              assertThat(Thread.currentThread().getName(), is(currentThread));
              return s;
            })
        .thenApply(
            s -> { // 3
              assertThat(Thread.currentThread().getName(), is(currentThread));
              return s;
            });

First thenApply is still executed on ForkJoinPool.commonPool() (because it is actually thenApplyAsync which is always executed on global Java pool).

Then we wait for stages to complete so second and third thenApply are executed on completed CompletionStage, and stages are executed on the current thread - the thread which called second and third thenApply.

Async methods

As mentioned above, default async methods are always executed on ForkJoinPool.commonPool():

sourceCompletionStage<String> fromScalaFuture =
    FutureConverters.toJava(scalaFuture)
        .thenApplyAsync(
            s -> { // 1
              assertThat(
                  Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool"));
              return s;
            })
        .thenApplyAsync(
            s -> { // 2
              assertThat(
                  Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool"));
              return s;
            })
        .thenApplyAsync(
            s -> { // 3
              assertThat(
                  Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool"));
              return s;
            });

CompletionStage also has async methods which take Executor as a second parameter, just like Future:

sourcefinal Executor ex = system.dispatcher();

CompletionStage<String> fromScalaFuture =
    FutureConverters.toJava(scalaFuture)
        .thenApplyAsync(
            s -> {
              assertThat(
                  Thread.currentThread().getName(),
                  containsString("akka.actor.default-dispatcher"));
              return s;
            },
            ex)
        .thenApplyAsync(
            s -> {
              assertThat(
                  Thread.currentThread().getName(),
                  containsString("akka.actor.default-dispatcher"));
              return s;
            },
            ex)
        .thenApplyAsync(
            s -> {
              assertThat(
                  Thread.currentThread().getName(),
                  containsString("akka.actor.default-dispatcher"));
              return s;
            },
            ex);

This example is behaving like Future: every stage is executed on an explicitly specified Executor.

Note

When in doubt, async methods with explicit executor should be used. Always async methods with a dedicated executor/dispatcher for long-running or blocking computations, such as IO operations.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.