Futures
Introduction
In the Scala Standard Library, a Future is a data structure
used to retrieve the result of some concurrent operation. This result can be accessed synchronously (blocking)
or asynchronously (non-blocking). To be able to use this from Java, Akka provides a java friendly interface
in akka.dispatch.Futures
.
Execution Contexts
In order to execute callbacks and operations, Futures need something called an ExecutionContext
,
which is very similar to a java.util.concurrent.Executor
. if you have an ActorSystem
in scope,
it will use its default dispatcher as the ExecutionContext
, or you can use the factory methods provided
by the ExecutionContexts
class to wrap Executors
and ExecutorServices
, or even create your own.
import akka.dispatch.*;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Await;
import scala.concurrent.Promise;
import akka.util.Timeout;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContext$;
ExecutionContext ec =
ExecutionContexts.fromExecutorService(yourExecutorServiceGoesHere);
//Use ec with your Futures
Future<String> f1 = Futures.successful("foo");
// Then you shut down the ExecutorService at the end of your application.
yourExecutorServiceGoesHere.shutdown();
Use with Actors
There are generally two ways of getting a reply from an UntypedActor
: the first is by a sent message (actorRef.tell(msg, sender)
),
which only works if the original sender was an UntypedActor
) and the second is through a Future
.
Using the ActorRef
's ask
method to send a message will return a Future
.
To wait for and retrieve the actual result the simplest method is:
import akka.dispatch.*;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Await;
import scala.concurrent.Promise;
import akka.util.Timeout;
Timeout timeout = new Timeout(Duration.create(5, "seconds"));
Future<Object> future = Patterns.ask(actor, msg, timeout);
String result = (String) Await.result(future, timeout.duration());
This will cause the current thread to block and wait for the UntypedActor
to 'complete' the Future
with it's reply.
Blocking is discouraged though as it can cause performance problem.
The blocking operations are located in Await.result
and Await.ready
to make it easy to spot where blocking occurs.
Alternatives to blocking are discussed further within this documentation.
Also note that the Future
returned by an UntypedActor
is a Future<Object>
since an UntypedActor
is dynamic.
That is why the cast to String
is used in the above sample.
Warning
Await.result
and Await.ready
are provided for exceptional situations where you must block,
a good rule of thumb is to only use them if you know why you must block. For all other cases, use
asynchronous composition as described below.
To send the result of a Future
to an Actor
, you can use the pipe
construct:
akka.pattern.Patterns.pipe(future, system.dispatcher()).to(actor);
Use Directly
A common use case within Akka is to have some computation performed concurrently without needing
the extra utility of an UntypedActor
. If you find yourself creating a pool of UntypedActor
s for the sole reason
of performing a calculation in parallel, there is an easier (and faster) way:
import scala.concurrent.duration.Duration;
import akka.japi.Function;
import java.util.concurrent.Callable;
import static akka.dispatch.Futures.future;
import static java.util.concurrent.TimeUnit.SECONDS;
Future<String> f = future(new Callable<String>() {
public String call() {
return "Hello" + "World";
}
}, system.dispatcher());
f.onSuccess(new PrintResult<String>(), system.dispatcher());
In the above code the block passed to future
will be executed by the default Dispatcher
,
with the return value of the block used to complete the Future
(in this case, the result would be the string: "HelloWorld").
Unlike a Future
that is returned from an UntypedActor
, this Future
is properly typed,
and we also avoid the overhead of managing an UntypedActor
.
You can also create already completed Futures using the Futures
class, which can be either successes:
Future<String> future = Futures.successful("Yay!");
Or failures:
Future<String> otherFuture = Futures.failed(
new IllegalArgumentException("Bang!"));
It is also possible to create an empty Promise
, to be filled later, and obtain the corresponding Future
:
Promise<String> promise = Futures.promise();
Future<String> theFuture = promise.future();
promise.success("hello");
For these examples PrintResult
is defined as follows:
public final static class PrintResult<T> extends OnSuccess<T> {
@Override public final void onSuccess(T t) {
System.out.println(t);
}
}
Functional Futures
Scala's Future
has several monadic methods that are very similar to the ones used by Scala
's collections.
These allow you to create 'pipelines' or 'streams' that the result will travel through.
Future is a Monad
The first method for working with Future
functionally is map
. This method takes a Mapper
which performs
some operation on the result of the Future
, and returning a new result.
The return value of the map
method is another Future
that will contain the new result:
import scala.concurrent.duration.Duration;
import akka.japi.Function;
import java.util.concurrent.Callable;
import static akka.dispatch.Futures.future;
import static java.util.concurrent.TimeUnit.SECONDS;
final ExecutionContext ec = system.dispatcher();
Future<String> f1 = future(new Callable<String>() {
public String call() {
return "Hello" + "World";
}
}, ec);
Future<Integer> f2 = f1.map(new Mapper<String, Integer>() {
public Integer apply(String s) {
return s.length();
}
}, ec);
f2.onSuccess(new PrintResult<Integer>(), system.dispatcher());
In this example we are joining two strings together within a Future
. Instead of waiting for f1 to complete,
we apply our function that calculates the length of the string using the map
method.
Now we have a second Future
, f2, that will eventually contain an Integer
.
When our original Future
, f1, completes, it will also apply our function and complete the second Future
with its result. When we finally get
the result, it will contain the number 10.
Our original Future
still contains the string "HelloWorld" and is unaffected by the map
.
Something to note when using these methods: if the Future
is still being processed when one of these methods are called,
it will be the completing thread that actually does the work.
If the Future
is already complete though, it will be run in our current thread. For example:
final ExecutionContext ec = system.dispatcher();
Future<String> f1 = future(new Callable<String>() {
public String call() throws Exception {
Thread.sleep(100);
return "Hello" + "World";
}
}, ec);
Future<Integer> f2 = f1.map(new Mapper<String, Integer>() {
public Integer apply(String s) {
return s.length();
}
}, ec);
f2.onSuccess(new PrintResult<Integer>(), system.dispatcher());
The original Future
will take at least 0.1 second to execute now, which means it is still being processed at
the time we call map
. The function we provide gets stored within the Future
and later executed automatically
by the dispatcher when the result is ready.
If we do the opposite:
final ExecutionContext ec = system.dispatcher();
Future<String> f1 = future(new Callable<String>() {
public String call() {
return "Hello" + "World";
}
}, ec);
// Thread.sleep is only here to prove a point
Thread.sleep(100); // Do not use this in your code
Future<Integer> f2 = f1.map(new Mapper<String, Integer>() {
public Integer apply(String s) {
return s.length();
}
}, ec);
f2.onSuccess(new PrintResult<Integer>(), system.dispatcher());
Our little string has been processed long before our 0.1 second sleep has finished. Because of this,
the dispatcher has moved onto other messages that need processing and can no longer calculate
the length of the string for us, instead it gets calculated in the current thread just as if we weren't using a Future
.
Normally this works quite well as it means there is very little overhead to running a quick function.
If there is a possibility of the function taking a non-trivial amount of time to process it might be better
to have this done concurrently, and for that we use flatMap
:
final ExecutionContext ec = system.dispatcher();
Future<String> f1 = future(new Callable<String>() {
public String call() {
return "Hello" + "World";
}
}, ec);
Future<Integer> f2 = f1.flatMap(new Mapper<String, Future<Integer>>() {
public Future<Integer> apply(final String s) {
return future(new Callable<Integer>() {
public Integer call() {
return s.length();
}
}, ec);
}
}, ec);
f2.onSuccess(new PrintResult<Integer>(), system.dispatcher());
Now our second Future
is executed concurrently as well. This technique can also be used to combine the results
of several Futures into a single calculation, which will be better explained in the following sections.
If you need to do conditional propagation, you can use filter
:
final ExecutionContext ec = system.dispatcher();
Future<Integer> future1 = Futures.successful(4);
Future<Integer> successfulFilter = future1.filter(Filter.filterOf(
new Function<Integer, Boolean>() {
public Boolean apply(Integer i) {
return i % 2 == 0;
}
}), ec);
Future<Integer> failedFilter = future1.filter(Filter.filterOf(
new Function<Integer, Boolean>() {
public Boolean apply(Integer i) {
return i % 2 != 0;
}
}), ec);
//When filter fails, the returned Future will be failed with a scala.MatchError
Composing Futures
It is very often desirable to be able to combine different Futures with each other, below are some examples on how that can be done in a non-blocking fashion.
import static akka.dispatch.Futures.sequence;
final ExecutionContext ec = system.dispatcher();
//Some source generating a sequence of Future<Integer>:s
Iterable<Future<Integer>> listOfFutureInts = source;
// now we have a Future[Iterable[Integer]]
Future<Iterable<Integer>> futureListOfInts = sequence(listOfFutureInts, ec);
// Find the sum of the odd numbers
Future<Long> futureSum = futureListOfInts.map(
new Mapper<Iterable<Integer>, Long>() {
public Long apply(Iterable<Integer> ints) {
long sum = 0;
for (Integer i : ints)
sum += i;
return sum;
}
}, ec);
futureSum.onSuccess(new PrintResult<Long>(), system.dispatcher());
To better explain what happened in the example, Future.sequence
is taking the Iterable<Future<Integer>>
and turning it into a Future<Iterable<Integer>>
. We can then use map
to work with the Iterable<Integer>
directly,
and we aggregate the sum of the Iterable
.
The traverse
method is similar to sequence
, but it takes a sequence of A``s and applies a function from ``A
to Future<B>
and returns a Future<Iterable<B>>
, enabling parallel map
over the sequence, if you use Futures.future
to create the Future
.
import static akka.dispatch.Futures.traverse;
final ExecutionContext ec = system.dispatcher();
//Just a sequence of Strings
Iterable<String> listStrings = Arrays.asList("a", "b", "c");
Future<Iterable<String>> futureResult = traverse(listStrings,
new Function<String, Future<String>>() {
public Future<String> apply(final String r) {
return future(new Callable<String>() {
public String call() {
return r.toUpperCase();
}
}, ec);
}
}, ec);
//Returns the sequence of strings as upper case
futureResult.onSuccess(new PrintResult<Iterable<String>>(), system.dispatcher());
It's as simple as that!
Then there's a method that's called fold
that takes a start-value,
a sequence of Future
:s and a function from the type of the start-value, a timeout,
and the type of the futures and returns something with the same type as the start-value,
and then applies the function to all elements in the sequence of futures, non-blockingly,
the execution will be started when the last of the Futures is completed.
import akka.japi.Function2;
import static akka.dispatch.Futures.fold;
final ExecutionContext ec = system.dispatcher();
//A sequence of Futures, in this case Strings
Iterable<Future<String>> futures = source;
//Start value is the empty string
Future<String> resultFuture = fold("", futures,
new Function2<String, String, String>() {
public String apply(String r, String t) {
return r + t; //Just concatenate
}
}, ec);
resultFuture.onSuccess(new PrintResult<String>(), system.dispatcher());
That's all it takes!
If the sequence passed to fold
is empty, it will return the start-value, in the case above, that will be empty String.
In some cases you don't have a start-value and you're able to use the value of the first completing Future
in the sequence as the start-value, you can use reduce
, it works like this:
import static akka.dispatch.Futures.reduce;
final ExecutionContext ec = system.dispatcher();
//A sequence of Futures, in this case Strings
Iterable<Future<String>> futures = source;
Future<Object> resultFuture = reduce(futures,
new Function2<Object, String, Object>() {
public Object apply(Object r, String t) {
return r + t; //Just concatenate
}
}, ec);
resultFuture.onSuccess(new PrintResult<Object>(), system.dispatcher());
Same as with fold
, the execution will be started when the last of the Futures is completed, you can also parallelize
it by chunking your futures into sub-sequences and reduce them, and then reduce the reduced results again.
This is just a sample of what can be done.
Callbacks
Sometimes you just want to listen to a Future
being completed, and react to that not by creating a new Future, but by side-effecting.
For this Scala supports onComplete
, onSuccess
and onFailure
, of which the latter two are specializations of the first.
final ExecutionContext ec = system.dispatcher();
future.onSuccess(new OnSuccess<String>() {
public void onSuccess(String result) {
if ("bar" == result) {
//Do something if it resulted in "bar"
} else {
//Do something if it was some other String
}
}
}, ec);
final ExecutionContext ec = system.dispatcher();
future.onFailure(new OnFailure() {
public void onFailure(Throwable failure) {
if (failure instanceof IllegalStateException) {
//Do something if it was this particular failure
} else {
//Do something if it was some other failure
}
}
}, ec);
final ExecutionContext ec = system.dispatcher();
future.onComplete(new OnComplete<String>() {
public void onComplete(Throwable failure, String result) {
if (failure != null) {
//We got a failure, handle it here
} else {
// We got a result, do something with it
}
}
}, ec);
Ordering
Since callbacks are executed in any order and potentially in parallel,
it can be tricky at the times when you need sequential ordering of operations.
But there's a solution! And it's name is andThen
, and it creates a new Future
with
the specified callback, a Future
that will have the same result as the Future
it's called on,
which allows for ordering like in the following sample:
final ExecutionContext ec = system.dispatcher();
Future<String> future1 = Futures.successful("value").andThen(
new OnComplete<String>() {
public void onComplete(Throwable failure, String result) {
if (failure != null)
sendToIssueTracker(failure);
}
}, ec).andThen(new OnComplete<String>() {
public void onComplete(Throwable failure, String result) {
if (result != null)
sendToTheInternetz(result);
}
}, ec);
Auxiliary methods
Future
fallbackTo
combines 2 Futures into a new Future
, and will hold the successful value of the second Future
if the first Future
fails.
Future<String> future1 = Futures.failed(new IllegalStateException("OHNOES1"));
Future<String> future2 = Futures.failed(new IllegalStateException("OHNOES2"));
Future<String> future3 = Futures.successful("bar");
// Will have "bar" in this case
Future<String> future4 = future1.fallbackTo(future2).fallbackTo(future3);
future4.onSuccess(new PrintResult<String>(), system.dispatcher());
You can also combine two Futures into a new Future
that will hold a tuple of the two Futures successful results,
using the zip
operation.
final ExecutionContext ec = system.dispatcher();
Future<String> future1 = Futures.successful("foo");
Future<String> future2 = Futures.successful("bar");
Future<String> future3 = future1.zip(future2).map(
new Mapper<scala.Tuple2<String, String>, String>() {
public String apply(scala.Tuple2<String, String> zipped) {
return zipped._1() + " " + zipped._2();
}
}, ec);
future3.onSuccess(new PrintResult<String>(), system.dispatcher());
Exceptions
Since the result of a Future
is created concurrently to the rest of the program, exceptions must be handled differently.
It doesn't matter if an UntypedActor
or the dispatcher is completing the Future
, if an Exception
is caught
the Future
will contain it instead of a valid result. If a Future
does contain an Exception
,
calling Await.result
will cause it to be thrown again so it can be handled properly.
It is also possible to handle an Exception
by returning a different result.
This is done with the recover
method. For example:
final ExecutionContext ec = system.dispatcher();
Future<Integer> future = future(new Callable<Integer>() {
public Integer call() {
return 1 / 0;
}
}, ec).recover(new Recover<Integer>() {
public Integer recover(Throwable problem) throws Throwable {
if (problem instanceof ArithmeticException)
return 0;
else
throw problem;
}
}, ec);
future.onSuccess(new PrintResult<Integer>(), system.dispatcher());
In this example, if the actor replied with a akka.actor.Status.Failure
containing the ArithmeticException
,
our Future
would have a result of 0. The recover
method works very similarly to the standard try/catch blocks,
so multiple Exception
s can be handled in this manner, and if an Exception
is not handled this way
it will behave as if we hadn't used the recover
method.
You can also use the recoverWith
method, which has the same relationship to recover
as flatMap
has to map
,
and is use like this:
final ExecutionContext ec = system.dispatcher();
Future<Integer> future = future(new Callable<Integer>() {
public Integer call() {
return 1 / 0;
}
}, ec).recoverWith(new Recover<Future<Integer>>() {
public Future<Integer> recover(Throwable problem) throws Throwable {
if (problem instanceof ArithmeticException) {
return future(new Callable<Integer>() {
public Integer call() {
return 0;
}
}, ec);
} else
throw problem;
}
}, ec);
future.onSuccess(new PrintResult<Integer>(), system.dispatcher());
After
akka.pattern.Patterns.after
makes it easy to complete a Future
with a value or exception after a timeout.
import static akka.pattern.Patterns.after;
import java.util.Arrays;
final ExecutionContext ec = system.dispatcher();
Future<String> failExc = Futures.failed(new IllegalStateException("OHNOES1"));
Future<String> delayed = Patterns.after(Duration.create(200, "millis"),
system.scheduler(), ec, failExc);
Future<String> future = future(new Callable<String>() {
public String call() throws InterruptedException {
Thread.sleep(1000);
return "foo";
}
}, ec);
Future<String> result = Futures.firstCompletedOf(
Arrays.<Future<String>>asList(future, delayed), ec);
Contents