Things have changed considerably in the last few years in terms of how we write code in concurrent models.
In the past we always had a tendency to share state and use complex concurrency mechanisms to synchronise the threads to allow them a fair use of the resources in our application, those were the times when reading “Java Concurrency in practice” was almost a must for many Java developers in our industry that had to deal with concurrent tasks.
Java Concurrency Model
Now the paradigm has shifted completely to an approach that makes a lot more of sense. This paradigm is mainly composed by three principles: non-shared state, immutability and non-blocking operations.
Non-shared state + immutability
Although I still see some developers (wrongly) using this approach, what we used to do in the old times was to have components with shared state and we implemented mechanisms to synchronise the access to this state. For instance the use of synchronized or atomic variables in Java, which caused any other thread who wanted to access that resource to remain blocked until the resource was released. That’s quite bad, isn’t it?
For example, in the silly example shown below if multiple threads were trying to call the method pleaseBlockMe they’d remain blocked waiting to gain the lock to be able to execute the method, leading to very bad performance and very possibly a thread starvation in our thread pool.
This approach was quite complex because by sharing state the developer had to think hard and be always alert on how a shared resource was being accessed to avoid race conditions, deadlocks and any other possible issues.
So that’s why the community came up with a completely new and much simpler approach that consists precisely in avoiding that complexity. The best way to solve concurrency synchronisation complexity is by completely avoiding it, so the idea is that every thread creates a new “one-time” immutable object and the application will use other simpler means to update the application state. Makes sense, right?
If you need to have a better understanding about concurrency and multi-threading and why we do things in the way we do them nowadays, I’d recommend that you read the book “Java Concurrency in Practice” by Brian Goetz.
Another concept that has been widely adopted in the recent years has been the use of non-blocking operations.
This basically consists in avoid blocking a thread while it’s waiting for the outcome of a second operation. This could be an IO operation, another thread performing a different task, etc.
What we do in async programming while our application is waiting for an IO operation is to release that thread so it can be reused by a different thread in our application while we wait for that operation to be completed. This is very important for the resiliency of our application for several reasons, one of them being the elimination of the risk of exhausting our thread pool. This would result in our application not being able to serve any request to our application.
CompletionStage in Java
Let’s focus now in the main topic of this post, how Java gives now support to write asynchronous tasks.
Since JDK 1.8 developers have available a new interface that makes writing asynchronous code in Java quite simple and flexible. The main interface part of this API is CompletionStage, although the most popular class in this new paradigm is CompletableFuture, this is actually an implementation of CompletionStage interface.
In previous versions of Java we had the Future interface available, but this interface was very limited, as we could only get the result or cancel the execution of the future.
CompletionStage provides a wider variety of methods that allow us to have full of control of our asynchronous operations, being able to create chain of tasks and synchronise them in an asynchronous way.
So let’s get started with some code, let’s create our first completable:
If we run this example, we won’t be able to see the “EXECUTED!” string printed in our console. Why is that?
That’s because when we use runAsync method, the Runnable will be executed in a different thread part of the common ForkJoinPool in our JVM, while our main method will be executed in the main thread, so the execution of our method finishes before our async task has completed.
To prove that we can do the following:
We’re using the join blocking method to wait for the completion of our task and we print the thread in which each process is executed.
The result is:
Task :MyExample.main() Main thread : main Async task thread : ForkJoinPool.commonPool-worker-3 EXECUTED!
As expected, our async task is executed by a thread from the common ForkJoinPool.
CompletionStage interface also provides a supplyAsync method which accepts a Supplier of a value, we’ll be using this when we need a result back. If we don’t need any result back then we’d have to use the runAsync method provided, which accepts a Runnable.
One more thing to keep in mind is that although by default any async operation in the CompletionStage interface will be executed in the common ForkJoinPool, we can override that default behaviour by providing an ExecutorService as an additional parameter. In this case that CompletableFuture will use the thread pool defined in that ExecutorService.
So we could do this for instance:
If we run our main method again, we can see that the thread no longer belongs to the common ForkJoinPool.
Task :MyExample.main() Main thread : main Async task thread : pool-1-thread-1 EXECUTED!
All of this is something that is good to know, but let’s get to the point. Two of the main benefits in using CompletionStage interface are:
- Defining callbacks for our async tasks
- Ability to chain multiple tasks
Let’s talk about them!
A callback is simply a task that we attach to the completion of our CompletionStage task.
There are three different ways of providing a callback, and each of them is provided in a synchronous and asynchronous version. The latter will be executed in a different thread from the common ForkJoinPool, while the former will be executed in the running thread.
These are the available callback methods in CompletionStage:
- thenRun and thenRunAsync
These methods accept a Runnable as a callback, so it doesn’t either consume a result nor produce any element.
- thenAccept and thenAcceptAsync
These methods accept a Consumer as a callback, so it accepts a result from the CompletableFuture but it doesn’t produce any result.
- thenApply and thenApplyAsync
These methods accept a Function as a callback, which accept a result from the CompletableFuture and transforms the result using this function.
What method to use will depend on your circumstances; what kind of data your tasks will produce or consume and how will that data be processed.
A very simple example could be a CompletableFuture that produces a text and a consumer which has to print that text into our console. So we’d have:
Notice that in this case we don’t need a Thread.sleep() to wait for the result. We have defined a blocking consumer by using thenAccept but this consumer won’t start until our CompletableFuture emits a result on completion. The use of thenAccept means that the consumer will be executed in the same running thread and therefore our execution won’t be finished until the consumer is executed. If we had used thenAcceptAsync, we wouldn’t be able to see the text printed in our console, as it runs in a different thread!
I hope this example has made this difference clear to you.
Let’s take a look now at how can we chain multiple CompletionStages to define performant pipelines of tasks using CompletionStage API.
There are different ways of chaining different CompletionStages, which one we use will depend on how the tasks depend on each other and how we process their outcome.
These are the available methods:
- thenCompose and thenComposeAsync
To be used when a CompletableFuture depends on the completion of a previous CompletableFuture to start.
- thenCombine and thenCombineAsync
To be used when we need to run two CompletableFutures at the same time and we produce a result when both CompletableFutures complete.
- thenAcceptBoth and thenAcceptBothAsync
To be used when we run two CompletableFutures at the same time but in this case we just consume the outcome of these two CompletableFutures, not producing any result.
- acceptEither and acceptEitherAsync
In this case we run two CompletableFutures at the same time but we just care about the first one to complete.
Now that we know what the API provides to us to handle our tasks, let’s imagine that we have the following steps in the process of registering a user in our system.
We could implement that using a chain of CompletionStage objects, the result we could have would be something like:
As you can see, we create a chain CompletableFuture to hold all our CompletableFutures. That means that our tasks will start only when we complete this chain CompletableFuture.
The createUser task starts right after the chain task is completed and once it completes, we have defined a callback to log the new generated user id by using thenAcceptAsync.
Right after that, by using thenAcceptBothAsync we combine two CompletableFutures that can be executed at the same time: registerAddress and registerPaymentDetails.
This combination depends at the same time on the completion of the createUser CompletableFuture, something that has been done using thenComposeAsync method.
At last we have the sendEmail CompletableFuture, which depends on the completion on both registerAddress and registerPaymentDetails and finally we just define its callback by using thenAcceptAsync, which prints the result of the operation.
If we execute this method, we can see that the tasks synchronise themselves as it’s been configured in the chain. There are some logs that have been hidden in the code shown above just for simplicity, but they just log the start time in milliseconds and what thread is running for each task. There are also some fake delays to prove that the tasks have to wait for their dependencies before starting.
Task :MyExample.main() Create user start : 1743488939 createUser - ForkJoinPool.commonPool-worker-5 User created with id : b22ca6a9-8a83-4ce1-838e-1abc2e1df0ca Register address start : 1743489495 Register payment details start : 1743489496 paymentDetails - ForkJoinPool.commonPool-worker-3 registerAddress - ForkJoinPool.commonPool-worker-7 Registered address was : Flat 0, 100 Some Street. W1 XYZ Registered payment details success : true Send email start : 1743489998 sendEmail - ForkJoinPool.commonPool-worker-7 Email sent : true
That’s it! This is how we can have complete control on how our chain of tasks can be executed.
It’s worth mentioning that although we used async versions of the methods in our example, in many cases we won’t always want that. The reason is that in some cases moving data from one thread to the other can be expensive, so in those cases where there’s no need to run the task in a different thread we can stick to the running thread by using non-async versions of the method. For instance, give preference to the use of thenCompose instead of thenComposeAsync.
In some cases we won’t want our application to raise an exception and finish execution unexpectedly. In those cases we can use a feature of CompletionStage that allows us to define an error handler for our tasks.
In order to do that we have to use the method exceptionally.
We can see the way to use it by looking at a silly example:
In this code we will return Result.COMPLETED when the subtask completes with some text, but if that text is empty or not present we’ll raise an exception.
If that exception is raised, instead of bubbling it up and let our app finish unexpectedly, we have defined a handler for it which returns a Result.FAILED type. That’s it! Simple, right?
There are other ways to handle exceptions like the use of whenComplete or handle, but they imply the use of BiFunction or BiConsumer and checking the input to be able to handle them properly. It could be a matter of opinion but I prefer the first approach for being cleaner.
Another interesting feature of CompletableFuture that is available since JDK 9 is the definition of a timeout for our tasks, to avoid waiting for ever for the completion of a task.
In our previous example, if the subtask never completes we’d be waiting forever for its completion; therefore, we’ve modified it slightly to define a timeout and return a Result.TIMEOUT when a timeout is raised.
Java have made huge steps in the last releases to become a language more adapted to our times, specially since the introductions of Lambdas, which gave a new wide variety of possibilities to use Functional Programming to solve some of the intrinsic issues of the language.
This new model gives us a lot of flexibility to write concise, expressive and flexible code to synchronise asynchronous tasks.
In the latest release of Java 19 some new proposals have been introduced to introduce structured concurrency and virtual threads into Java. You can understand more about structured concurrency in our article “Understanding Structured Concurrency“.
I’ve tried to show how this new model works and some of the things you’ll have to keep in mind when you write concurrent tasks in Java. I hope you have enjoyed reading this article and feel free to add your comments below. Thank you very much for reading this article!
Great article! Very well explained. I like how you categorise the different options of the API with a brief explanation. Thanks!
You must log in to post a comment.