Skip to main content

Promises and CompletableFuture

From meetup.com
During my talk at Warsaw Java Users Group about functional reactive programming in Java a few interesting questions came up regarding CompletableFuture capabilities. One person was interested whether it's possible to wait for the first completed future that is passing a given predicate rather than just the first one (like CompletableFuture.anyOf()). This is similar requirement to Future.find() in Scala. It's not built into CompletableFuture but quite easy to implement using the concept of promises.

Our custom implementation will take two parameters: a list of homogeneous futures and a predicate. The first future to complete that matches given predicate wins. If no future matched resulting future never ends (rather easy to change that behaviour). We will use a thread-safe and lightweight AtomicBoolean completed flag because callbacks will be invoked from multiple threads.

public static <T> CompletableFuture<T> firstMatching(Predicate<T> predicate, CompletableFuture<T>... futures) {
    final AtomicBoolean completed = new AtomicBoolean();
    final CompletableFuture<T> promise = new CompletableFuture<>();
    for (CompletableFuture<T> future : futures) {
        future.thenAccept(result -> {
            if (predicate.test(result) && completed.compareAndSet(false, true))
                promise.complete(result);
        });
    }
    return promise;
}
As you can see promise is like a Future detached from a thread pool. Rather than waiting for an asynchronous computation to complete we simply assign a value to it at arbitrary point in time. See also: Implementing custom Future.


Second question was about CompletableFuture.anyOf() - whether it automatically cancels all tasks except the first one. As you may remember anyOf() will complete when the very first of underlying futures complete, discarding all remaining futures. It turns out that CompletableFuture forgets about them without any special treatment. We could expect that it should immediately call cancel() on all slower tasks but this doesn't happen (!) and we will see soon why.

Luckily we can easily build our own instances of CompletableFuture and resolve them at any time, thus it's relatively easy to build more abstract transformations on top of futures. Our implementation will asynchronously wait for completion of all underlying futures and once the first one completes it will attempt to cancel all the remaining ones - since they are no longer needed:

public static <T> CompletableFuture<T> cancellingAnyOf(CompletableFuture<T>... futures) {
    final AtomicBoolean completed = new AtomicBoolean();
    final CompletableFuture<T> promise = new CompletableFuture<>();
    for (CompletableFuture<T> future : futures) {
        future.whenComplete((result, ex) -> {
            if (completed.compareAndSet(false, true)) {
                Arrays.asList(futures).stream().
                        filter(f -> f != future).
                        forEach(f -> f.cancel(true));
                if (ex != null)
                    promise.completeExceptionally(ex);
                else
                    promise.complete(result);
            }
        });
    }
    return promise;
}
The implementation is slightly complex because whenComplete() callbacks are executed from multiple threads so we must synchronize this method properly. That's the rationale behind lightweight AtomicBoolean completed flag. When the very first whenComplete() callback is executed it passes the value to our custom CompletableFuture (called promise) and attempts to cancel all the remaining tasks. OK, so the implementation looks fine but it somehow fails to interrupt running tasks, e.g. blocked on Thread.sleep(). In essence all these methods that declare throwing InterruptedException should be interrupted but aren't. Why? Well, I failed to read the documentation of CompletableFuture:

Since [...] this class has no direct control over the computation that causes it to be completed, cancellation is treated as just another form of exceptional completion. Method cancel has the same effect as completeExceptionally(new CancellationException()).

And in CompletableFuture.cancel(mayInterruptIfRunning):

mayInterruptIfRunning - this value has no effect in this implementation because interrupts are not used to control processing.

This means that CompletableFuture.cancel() does not interrupt underlying thread. When you call Future.cancel() it tries to call Thread.interrupt(), eagerly stopping already running task. This is virtually impossible with CompletableFuture. All it does is resolving a future with CancellationException but does not care about computation running. Very disappointing but worth knowing.

I hope by now you are more familiar with the concept of promises and how they can be implemented using CompletableFuture. Other scenarios are relatively easy to glue together.

Comments