Skip to content

Latest commit

 

History

History
753 lines (551 loc) · 32.7 KB

readme.md

File metadata and controls

753 lines (551 loc) · 32.7 KB

Asynchronous programming in Java with CompletableFuture

Introduction

The CompletableFuture API is a high-level API for asynchronous programming in Java. This API supports pipelining (also known as chaining or combining) of multiple asynchronous computations into a single result without the mess of nested callbacks (“callback hell“). This API also is an implementation of the future/promise concurrency constructs in Java.

Since Java 5 there is a much simpler API for asynchronous programming: the Future interface and its base implementation, the FutureTask class. The Future interface represents the result of asynchronous computation and has only a few methods:

  • to check if a task is completed or canceled
  • to cancel a task
  • to wait for a task to complete (if necessary) and then get its result

However, the Future interface has significant limitations in building non-trivial asynchronous computations:

  • it is impossible to register a callback for a future competition
  • it is impossible to pipeline futures in a non-blocking manner
  • it is impossible to manually complete a future

To overcome these limitations, Java 8 added (and Java 9 and Java 12 updated) the CompletionStage interface and its base implementation, the CompletableFuture class. These classes allow building efficient and fluent multi-stage asynchronous computations.

However, the CompletableFuture API is not simple. The CompletionStage interface has 43 public methods. The CompletableFuture class implements 5 methods from the Future interface, 43 methods from the CompletionStage interface, and has 30 of its public methods. This article describes only the most useful methods of the CompletableFuture API.

Futures and promises

Future/promise are the high-level concurrency constructs that decouple a value (a future) from the way it is computed (a promise). That allows writing more fluent concurrent programs that transfer objects between threads without using any explicit synchronization mechanisms. The future/promise constructs are often used when multiple threads work on different tasks, and the results need to be combined by the main thread.

Implementations of future/promise exist in many programming languages:

  • JаvаScript: Promise
  • Java: java.util.concurrent.Future, java.util.concurrent.CompletableFuture
  • Scala: scala.concurrent.Future
  • C#: Task, TaskCompletionSource

Concepts of futures and promises are sometimes used interchangeably. In reality, they are separate objects that encapsulate the two different sets of functionality.

A future is a read-only object to encapsulate a value that may not be available yet but will be provided at some point. The future is used by a consumer to retrieve the result which was computed. A promise is a writable, single-assignment object to guarantee that some task will compute some result and make it available in the future. The promise is used by a producer to store the success value or exception in the corresponding future.

The following workflow example can help you to understand the idea of future/promise. A consumer sends a task to a producer to execute it asynchronously. The producer creates a promise that starts the given task. From the promise, the producer extracts a future and sends it to the consumer. The consumer receives the future that is not completed and waits for its completion.

The consumer can call a blocking getter of the future to wait for the value to be available. If the future has already been completed, the call to the getter will return the result immediately. Otherwise, the call to the getter will wait until the future is finished. (Also, the consumer can use a non-blocking checking method to identify whether the future has already been completed or not).

Once the task has finished, the producer sets the value of the promise, and the future becomes available. But when the task fails, the future will contain an exception instead of a success value. In this case, when the consumer calls the getter method, the exception in the future will be thrown.

future and promise workflow

One of the important features of future/promise implementations is the ability to chain tasks together. The idea is that when one future/promise is finished, another future/promise is created that takes the result of the previous one. This means that the consumer is not blocked by calling the getter on a future. Once the future is completed, the result of the previous task is automatically passed to the next task in the chain. Compared to callbacks, this allows writing more fluent asynchronous code that supports the composition of nested success and failure handlers without ”callback hell”.

In Java, the Future interface represents a future: it has the isDone method to check if the task is completed and the get method to wait for the task to complete and get its result. The CompletableFuture class represents a promise: it has the complete and completeExceptionally methods to set the result of the task with a successful value or with an exception. However, the CompletableFuture class also implements the Future interface allowing it to be used as a future.

Java futures class diagram

CompletableFuture in practice

The following code example can help you to understand the use of the CompletableFuture class as a future/promise implementation in Java.

Let’s implement the following simplified multi-stage workflow. First, we need to call two long-running methods that return a product price in the EUR and the EUR/USD exchange rate. Then, we need to calculate the net product price from the results of these methods. Then, we need to call the third long-running method that takes the net product price and returns the tax amount. Finally, we need to calculate the gross product price from the net product price and the tax amount.

Implementation of this workflow is divided into the following tasks:

  1. to get the product price in the EUR (a slow task)
  2. to get the EUR/USD exchange rate (a slow task)
  3. to calculate the net product price (a fast task, depends on tasks 1, 2)
  4. to get the tax amount (a slow task, depends on tasks 3)
  5. to calculate the gross product price (a fast task, depends on tasks 3, 4)

Note that not all tasks are similar. Some of them are fast (they should be executed synchronously), and some of them are slow (they should be executed asynchronously). Some of them are independent (they can be executed in parallel), and some of them depend on the results of previous tasks (they have to be executed sequentially).

The mentioned workflow is implemented below in three programming styles: synchronous, asynchronous based on the Future interface, and asynchronous based on the CompletableFuture class.

In synchronous programming, the main thread starts an axillary task and blocks until this task is finished. When the axillary task is completed, the main thread continues the main task.

In asynchronous programming, the main thread starts an axillary task in a worker thread and continues its task. When the worker thread completes the auxiliary task, it notifies the main thread (for example, with a callback call).

The advantage of the synchronous implementation is the simplest and most reliable code. The disadvantage of this implementation is the longest execution time (because all tasks run sequentially).

logger.info("this task started");

int netAmountInUsd = getPriceInEur() * getExchangeRateEurToUsd(); // blocking
float tax = getTax(netAmountInUsd); // blocking
float grossAmountInUsd = netAmountInUsd * (1 + tax);

logger.info("this task finished: {}", grossAmountInUsd);
logger.info("another task started");

The advantage of the asynchronous implementation based on the Future interface is shorter execution time (because some tasks run in parallel). The disadvantage of this implementation is the most complicated code (because the Future interface lacks methods for tasks pipelining).

logger.info("this task started");

Future<Integer> priceInEur = executorService.submit(this::getPriceInEur);
Future<Integer> exchangeRateEurToUsd = executorService.submit(this::getExchangeRateEurToUsd);

while (!priceInEur.isDone() || !exchangeRateEurToUsd.isDone()) { // non-blocking
   Thread.sleep(100);
   logger.info("another task is running");
}

int netAmountInUsd = priceInEur.get() * exchangeRateEurToUsd.get(); // actually non-blocking
Future<Float> tax = executorService.submit(() -> getTax(netAmountInUsd));

while (!tax.isDone()) { // non-blocking
   Thread.sleep(100);
   logger.info("another task is running");
}

float grossAmountInUsd = netAmountInUsd * (1 + tax.get()); // actually non-blocking

logger.info("this task finished: {}", grossAmountInUsd);
logger.info("another task is running");

The advantage of the asynchronous implementation based on the CompletableFuture class is shorter execution time (because some tasks run in parallel) and more fluent code. The disadvantage of this implementation is that the more advanced CompletableFuture API is at the same time harder to learn.

CompletableFuture<Integer> priceInEur = CompletableFuture.supplyAsync(this::getPriceInEur);
CompletableFuture<Integer> exchangeRateEurToUsd = CompletableFuture.supplyAsync(this::getExchangeRateEurToUsd);

CompletableFuture<Integer> netAmountInUsd = priceInEur
       .thenCombine(exchangeRateEurToUsd, (price, exchangeRate) -> price * exchangeRate);

logger.info("this task started");

netAmountInUsd
       .thenCompose(amount -> CompletableFuture.supplyAsync(() -> amount * (1 + getTax(amount))))
       .whenComplete((grossAmountInUsd, throwable) -> {
           if (throwable == null) {
               logger.info("this task finished: {}", grossAmountInUsd);
           } else {
               logger.warn("this task failed: {}", throwable.getMessage());
           }
       }); // non-blocking

logger.info("another task started");

The CompletionStage interface

The CompletionStage interface represents a stage in a multi-stage (possibly asynchronous) computation where stages can be forked, chained, and joined.

This interface specifies pipelining of the future/promise implementation in the CompletableFuture API:

  1. Each stage performs a computation. A stage can or can not require arguments. A stage can either compute a value (returns a result) or performs an action (returns no result).
  2. Stages can be chained in a pipeline. A stage can be started by finishing a single previous stage (or two previous stages) in the pipeline. A stage finishes when its computation is completed. Finishing a stage can start a single next stage in the pipeline.
  3. A stage can be executed synchronously or asynchronously. The appropriate execution type should be selected depending on the parameters of the computation.

The methods of the CompletionStage interface can be divided into two groups according to their purpose:

  • methods to pipeline computations
  • methods to handle exceptions

methods of the CompletionStage interface

The CompletionStage interface contains only methods for stages pipelining. This interface does not contain methods for other parts of stages workflow: creating, checking, completing, reading. This functionality is delegated to the CompletableFuture class - the main implementation of the CompletionStage interface.

Methods to pipeline computations

The CompletionStage interface has 43 public methods, most of which follow three clear naming patterns.

The first naming pattern explains how the new stage starts:

  • if a method name has fragment “then“, then the new stage starts after completion of a single previous stage
  • if a method name has fragment “either“, then the new stage starts after completion of the first of two previous stages
  • if a method name has fragment “both“, then the new stage starts after completion of both of two previous stages

The second naming pattern explains what computations perform the new stage:

  • if a method name has fragment “apply“, then the new stage transforms an argument by the given Function
  • if a method name has fragment “accept“, then the new stage consumes an argument by the given Consumer
  • if a method name has fragment “run“, then the new stage runs an action by the given Runnable

If the new stage depends on both of the two previous stages, it uses BiFunction instead of Function and BiConsumer instead of Consumer.

Summary of methods to pipeline computations:

Function

(takes one argument and returns a result)

Consumer

(takes one argument and returns no result)

Runnable

(takes no argument and returns no result)

then thenApply, thenCompose thenAccept thenRun
either applyToEither acceptEither runAfterEither
BiFunction

(takes two arguments and returns a result)

BiConsumer

(takes two arguments and returns no result)

both thenCombine thenAcceptBoth runAfterBoth

If methods accept a functional interface that does not return a result (Consumer, BiConsumer, Runnable), it is used to perform a computation with side-effects. Such methods can signal that the computation has been completed either with a result or with an exception.

The third naming pattern explains what thread executes the new stage:

  • if a method has fragment “something(...)“, then the new stage is executed by the default facility (that can be synchronous or asynchronous)
  • if a method has fragment “somethingAsync(...)“, then the new stage is executed by the default asynchronous facility
  • if a method has fragment “somethingAsync(..., Executor)“, then the new stage is executed by the given Executor

Note that the default facility and the default asynchronous facility are specified by the CompletionStage implementations, not by this interface. Looking ahead, the CompletableFuture class uses the thread that completes the future (or any other threads that simultaneously are trying to do the same) as the default facility and a thread pool returned by the ForkJoinPool.commonPool() method as the default asynchronous facility.

Note that the thread pool returned by the ForkJoinPool.commonPool() method is shared across a JVM by all CompletableFutures and all Parallel Streams.

The following code example demonstrates the use of the methods to pipeline computations to calculate the area of a circle. First, the pipeline takes the radius and squares it by the thenApply method. Then the pipeline takes the squared radius and the constant π and multiplies them in the thenCombine method. Then the area is consumed by the thenAccept method that logs it and returns no result. Finally, the thenRun method (which takes no argument and returns no result) logs a message that the pipeline has ended.

CompletableFuture<Double> pi = CompletableFuture.supplyAsync(() -> Math.PI);
CompletableFuture<Integer> radius = CompletableFuture.supplyAsync(() -> 1);

// area of a circle = π * r^2
CompletableFuture<Void> area = radius
        .thenApply(r -> r * r)
        .thenCombine(pi, (multiplier1, multiplier2) -> multiplier1 * multiplier2)
        .thenAccept(a -> logger.info("area: {}", a))
        .thenRun(() -> logger.info("operation completed"));

area.join();

You should use the thenApply method if you want to transform a CompletionStage with a fast function. You should use the thenCompose method if you want to transform a CompletionStage with a slow function.

You should use the thenCompose method if you want to transform two CompletionStages sequentially. You should use the thenCombine method if you want to transform two CompletionStages in parallel.

code examples

Methods to handle exceptions

Each computation may complete normally or exceptionally. In asynchronous computations, the source of the exception and the recovery method can be in different threads. Therefore in this case it is not possible to use the try-catch-finally statements to recover from exceptions. So the CompletionStage interface has special methods to handle exceptions.

Each stage has two types of completion of equal importance: normal completion and exceptional completion. If a stage completes normally, the dependent stages start to execute normally. If a stage completes exceptionally, the dependent stages complete exceptionally, unless there is an exception recovery stage in the computation pipeline.

Summary of methods to handle exceptions:

When the method is called What the method returns Method Description
called on success or exception the same result or exception whenComplete(biConsumer) returns a new CompletionStage that upon normal or exceptional completion consumes the result or exception of this stage and returns the same result or exception without modifying them
a new result handle(biFunction) returns a new CompletionStage that upon normal or exceptional completion transforms the result or exception of this stage and returns the new result
called on exception exceptionally(function) returns a new CompletionStage that upon exceptional completion transforms the exception of this stage and returns the new result

If you need to perform some action, when the previous stage completes normally or exceptionally, you should use the whenComplete method. A BiConsumer argument of the whenComplete method is called when the previous stage completes normally or exceptionally. This method allows reading both the result (or null if none) and the exception (or null if none) but does not allow to change the result.

If you need to recover from an exception (to replace the exception with some default value), you should use the handle and exceptionally methods. A BiFunction argument of the handle method is called when the previous stage completes normally or exceptionally. A Function argument of the exceptionally method is called when the previous stage completes exceptionally. In both cases, an exception is not propagated to the next stage.

The following code example demonstrates the use of the methods to handle exceptions. During the execution of stage 1 occurs an exception (division by zero). Execution of stage 2 is skipped because its previous stage is completed exceptionally. The whenComplete method identifies that the previous stage completed exceptionally, but does not recover from the exception. Execution of stage 3 is also skipped because its previous stage is still completed exceptionally. The handle method identifies that the previous stage completed exceptionally and replaces the exception with a default value. The execution of stage 4 is at last performed normally.

CompletableFuture.supplyAsync(() -> 0)
       .thenApply(i -> { logger.info("stage 1: {}", i); return 1 / i; }) // executed and failed
       .thenApply(i -> { logger.info("stage 2: {}", i); return 1 / i; }) // skipped
       .whenComplete((value, t) -> {
           if (t == null) {
               logger.info("success: {}", value);
           } else {
               logger.warn("failure: {}", t.getMessage()); // executed
           }
       })
       .thenApply(i -> { logger.info("stage 3: {}", i); return 1 / i; }) // skipped
       .handle((value, t) -> {
           if (t == null) {
               return value + 1;
           } else {
               return -1; // executed and recovered
           }
       })
       .thenApply(i -> { logger.info("stage 4: {}", i); return 1 / i; }) // executed
       .join();

code examples

The CompletableFuture class

The CompletableFuture class represents a stage in a multi-stage (possibly asynchronous) computation where stages can be created, checked, completed, and read. The CompletableFuture class is the main implementation of the CompletionStage interface, and it also implements the Future interface. That means the CompletableFuture class can simultaneously represent a stage in a multi-stage computation and the result of such a computation.

This class specifies the general lifecycle of the future/promise implementation in the CompletableFuture API:

  1. A creating thread creates an incomplete future and adds computation handlers to it.
  2. A reading thread waits (in a blocking or non-blocking manner) until the future is completed normally or exceptionally.
  3. A completing thread completes the future and unblocks the reading thread.

The methods of the CompletableFuture class can be divided into five groups according to their purpose:

  • methods to create futures
  • methods to check futures
  • methods to complete futures
  • methods to read futures
  • methods for bulk futures operations

methods of the CompletableFuture class

The following code example demonstrates the use of the methods to handle the lifecycle of the CompletableFuture class. The future is created incomplete in the first thread. Then the same thread starts checking the future for completion. After a delay, that simulates a long operation, the future is completed in the second thread. Finally, the first thread finished the checking and reads the value of the future which has already been completed.

ExecutorService executorService = Executors.newSingleThreadExecutor();

CompletableFuture<String> future = new CompletableFuture<>(); // creating an incomplete future

executorService.submit(() -> {
   Thread.sleep(500);
   future.complete("value"); // completing the incomplete future
   return null;
});

while (!future.isDone()) { // checking the future for completion
   Thread.sleep(1000);
}

String result = future.get(); // reading value of the completed future
logger.info("result: {}", result);

executorService.shutdown();

Methods to create futures

In the most general case, a future is created incompleted in one thread and is completed in another thread. However, in some cases (for example, for testing), it may be necessary to create an already completed future.

Summary of methods to create futures:

Future status Method Description
incomplete newIncompleteFuture() returns a new incomplete CompletableFuture
asynchronously completing supplyAsync(supplier) returns a new CompletableFuture that is asynchronously completed after it obtains a value from the given Supplier
runAsync(runnable) returns a new CompletableFuture of that is asynchronously completed after it runs an action from the given Runnable
completed completedFuture(value) returns a new CompletableFuture that is already completed with the given value
failedFuture(throwable) returns a new CompletableFuture that is already completed exceptionally with the given exception

The no-arg CompletableFuture constructor also creates an incomplete future.

code examples

Methods to check futures

The CompletableFuture class has non-blocking methods for checking whether a future is incomplete, completed normally, completed exceptionally, or canceled.

Summary of the methods to check futures:

Behavior Method Description
non-blocking isDone returns true if the CompletableFuture is completed in any manner: normally, exceptionally, or by cancellation
isCompletedExceptionally returns true if this CompletableFuture is completed exceptionally, including cancellation
isCancelled returns true if this CompletableFuture is canceled before it completed normally

It is impossible to cancel an already completed future.

code examples

Methods to complete futures

The CompletableFuture class has methods for completing futures, which means transferring incomplete futures to one of the completed states: normal completion, exceptional completion, and cancellation.

Summary of methods to complete futures:

Future action Execution Method Description
complete normally synchronous complete(value) completes this CompletableFuture with the given value if not already completed
asynchronous completeAsync(supplier) completes this CompletableFuture with the result of the given Supplier
completeOnTimeout(value, timeout, timeUnit) completes this CompletableFuture with the given value if not already completed before the given timeout
complete normally or exceptionally depends on timeout orTimeout(timeout, timeUnit) exceptionally completes this CompletableFuture with a TimeoutException if not already completed before the given timeout
complete exceptionally synchronous completeExceptionally(throwable) completes this CompletableFuture with the given exception if not already completed
cancel(mayInterruptIfRunning) completes this CompletableFuture with a CancellationException, if not already completed

The cancel(boolean mayInterruptIfRunning) method has implementation specifics in the CompletableFuture class. The parameter mayInterruptIfRunning does not affect because thread interrupts are not used here to control processing. When the cancel method is called, the computation is canceled with the CancellationException, but the Thread.interrupt() method is not called to interrupt the underlying thread.

code examples

Methods to read futures

The CompletableFuture class has methods for reading futures, waiting if necessary. Note that in most cases, these methods should be used as the final step in a computation pipeline.

Summary of methods to read futures:

Behavior Thrown exceptions Method Description
blocking throws checked and unchecked exceptions get() returns the result value when complete (waits if necessary) or throws an exception if completed exceptionally
throws only unchecked exceptions join() returns the result value when complete (waits if necessary) or throws an unchecked if completed exceptionally
time-blocking throws checked and unchecked exceptions get(timeout, timeUnit) returns the result value when complete (waits for at most the given time) or throws an exception if completed exceptionally
non-blocking throws only unchecked exceptions getNow(valueIfAbsent) returns the result value (or throws an unchecked exception if completed exceptionally) if completed, else returns the given valueIfAbsent

The get() and get(timeout, timeUnit) methods can throw checked exceptions: ExecutionException (if the future is completed exceptionally) and InterruptedException (if the current thread is interrupted). Also, the time-bounded get(timeout, timeUnit) method can throw checked TimeoutException (if the timeout occurs).

The join and getNow methods can only throw unchecked CompletionException (if the future is completed exceptionally).

All of these methods can also throw unchecked CancellationException (if the computation is canceled).

code examples

Methods for bulk future operations

The CompletionStage interface has methods to wait for all (thenCombine, thenAcceptBoth, runAfterBoth) and any (applyToEither, acceptEitherrun, runAfterEither) of two computations to complete. The CompletableFuture class extends this functionality and has two static methods to wait for all or any of many futures to complete.

Summary of methods for bulk futures operations:

Similarity Method Description
similar to runAfterBoth allOf(completableFuture..) returns a new CompletableFuture<Void> that is completed when all of the given CompletableFutures complete
similar to applyToEither anyOf(completableFuture..) returns a new CompletableFuture<Object> that is completed when any of the given CompletableFutures complete, with the same result

Note that all input futures can be of different generic types - the methods have variable arguments of type CompletableFuture<?>.

code examples

Conclusion

The CompletableFuture API is a high-level API that allows you to develop fluent asynchronous code. This API is not simple, but it is worth learning if you want to write efficient asynchronous code.

There are the following rules of thumb for using CompletableFuture API:

  • Know which threads execute which stages, do not allow high-priority threads to execute long-running low-priority tasks
  • Avoid blocking methods inside a computation pipeline
  • Avoid short (hundreds of milliseconds) asynchronous computations because frequent context switching can introduce significant overhead
  • Be aware of the new exception handling mechanism that works differently than the try-catch-finally statements
  • Manage timeouts not to wait too long (perhaps indefinitely) for a stuck computation

The CompletableFuture API is quite complex and justifiable to use when a single result depends on many stages that form a rather complicated directed acyclic graph. It is wise to try simpler asynchronous APIs first, for example, Parallel Streams or ExecutorServices. Be aware of the disadvantages of asynchronous programming - asynchronous code is often much more difficult to implement, understand, and debug. Make sure that the CompletableFuture API is the right tool for your job.

Complete code examples are available in the GitHub repository.