Combining multiple API calls with CompletableFuture

After having had an introduction to the new concurrency paradigm in Java in my previous article “A new concurrency model in Java“, I’ve considered that it’d be beneficial to show a more realistic and practical example with CompletableFuture.

So what we’re going to do is to implement a component responsible for fetching posts from different social media platforms and we’re going to do this in an asynchronous way.
For the sake of brevity and simplicity, we’re going to hide the details of how we call these external APIs. We will just assume that we were provided with some API clients that return a CompletableFuture of a collection of posts.

So how this would look like? Let’s see the following diagram:

Parallel API calls

As you can see in the diagram, our social media service will call these three APIs to fetch the latest posts from those three platforms for our user.
This could be done sequentially, calling every API in some given order. For example, let’s say that we have a PostFetcher interface and we have an implementation of this interface for each social media platform that we’re interested in fetching posts from.

The interface would look like this:

CompletableFuture call

And we’d be calling these APIs sequentially in this way:

Sequential API calls
Sequential and blocking use of CompletableFuture

Although this would work (the users would be getting their posts if everything goes well), this is very inefficient and potentially problematic for our service. This is a good example of a bad use of CompletableFuture and how can we lose all the benefits of using CompletableFutures. Let’s see why!

First of all, we’re calling join after fetching posts from each platform. We have to remember that join is blocking our thread waiting for the completion of the CompletableFuture.
Also, we’re not combining the CompletableFutures to run them in parallel and do something on completion, we’re calling them individually in a sequential manner.
So basically we’re waiting for each API call to complete and remain idle until we receive the response. We’d be doing something like what’s shown in the next diagram:

Sequential API calls


Let’s imagine now that the first API call, that in our example is the Twitter API, is the slowest of the three to respond, for example 12 seconds. In those 12 seconds, instead of calling the other two APIs to have the job done by the time we receive the response from Twitter; we’re still waiting to start the second API call. This is a waste of time and we should avoid it, as our social media service response time will always be the sum of the response times of the three API calls!
For example:

Sequential calls

Twitter call    -> 6.1 seconds
Facebook call   -> 2.5 seconds
Instagram call  -> 3.7 seconds 

Total response time -> 12.3 seconds

In the example shown above our total response time would be 12.3 seconds.

So what do we really want?. What we want is to combine the three API calls using CompletableFuture and run them in parallel; when the three have completed we’ll combine the results of the three into one single collection and return that to the user. In this case our response time would be the less performant of the three; how much would that be with the same response times from the previous example?

Parallel calls

Twitter call    -> 6.1 seconds
Facebook call   -> 2.5 seconds
Instagram call  -> 3.7 seconds 

Total response time -> 6.1 seconds

It’d take 6.1 seconds, as that the slowest of the three calls; a 50% improvement with respect to the sequential calls.

So now that we understand why are we going to implement it in that way, let’s see how can we reflect that in code in Java using CompletableFutures!

Implementation using CompletableFuture

What we are going to do initially is to allow the client to configure what social media platforms to call by setting the user in this configuration.
Based on that, we’ll be calling only those APIs for which we know the username of; then we’ll get the right implementation of PostFetcher interface to get the posts for that user and platform.
An interface has been defined to represent the entry point to our service, SocialMediaService:

CompletableFuture

So this first part will look like this:

CompletableFuture Stream

At this point we have then a List of CompletableFuture ready to be called when we need the data. This is a very important concept to remember, the CompletableFuture won’t be doing anything until it gets subscribed by the client!

So now that we have this list, we want to combine the three CompletableFuture to call them in parallel and once they’re completed return one single collection of posts.

If you read the latest post, we learned how to combine two CompletableFutures by using thenCombine and thenCombineAsync. Let’s remember what we said about them:

thenCombine and thenCombineAsync To be used when we need to run two CompletableFutures at the same time and we produce a result when both CompletableFutures complete.

Ok, so combining two CompletableFutures seems easy; we already saw that in the last article, but what about three or more?

This is where it gets a bit tricky, specially if we want to do it in a dynamic way. Our example it’s been designed in such a way that if we decided to use an additional social media platform, we don’t have to modify our implementation of our social media service; this has been done in such a way to respect the Open-Closed Principle.
Our implementation will look like this then:

Parallel API calls with CompletableFuture

We have used reduce method part of Java Stream to create a new CompletableFuture from each two consecutive CompletableFuture and concatenate their results. The combineApiCalls method does the following then:

CompletableFuture reduce

Basically we get every CompletableFuture pair (c1, c2), get their posts (posts1, posts2) and concatenate them using their streams.
To make it clearer let’s say that the three API calls return 3 posts each, so the reduce will concat 3 + 3 posts the first time; then the result (6 posts) will be concatenated to the remaining API call result (3 posts), getting finally the collection of 6 + 3 = 9 elements. I hope that makes sense!

What we’re doing here is wrapping everything in one single CompletableFuture that the client will trigger on their side when subscribing to it, for example by using get or join.
We could have decided to organise our CompletableFutures in a different way, for example by using thenCompose; however, we have to remember from my last post that with thenCompose we wait for the previous CompletableFuture to complete. That would mean that we’d be executing our tasks sequentially again and this would be inefficient.
This is why is so important to understand how each method works.


One more thing worth mentioning about our implementation is that the version of the reduce method that we use returns an Optional, so we return an empty collection in the case that there’s no data present in an elegant way using orElse.


I know it can look a bit difficult to understand at the beginning, we are using two fairly new concepts in Java. The shift to functional programming and asynchronous calls is a big jump from an imperative and sequential way of programming code, but you’ll get used to it even if it feels difficult right now.

So this would be the final result of our SocialMediaService implementation:

package com.theboredev;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.BinaryOperator;
import java.util.function.Predicate;
import java.util.stream.Stream;
import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;
public class FakeSocialMediaService implements SocialMediaService {
private static final Logger LOGGER = LoggerFactory.getLogger(FakeSocialMediaService.class);
@Override
public CompletableFuture<Collection<Post>> fetchPosts(SocialMediaConfig config) {
LOGGER.info("Fetching posts for {} on thread {}", config, Thread.currentThread().getName());
final List<CompletableFuture<Collection<Post>>> futures = Stream.of(SocialMediaPlatform.values())
.filter(hasUserForThatPlatform(config))
.map(platform -> {
final PostFetcher fetcher = PostFetcherFactory.postFetcherFrom(platform);
return fetcher.fetchLatestPostsFor(config.getUserFor(platform));
})
.collect(toList());
return futures.stream()
.reduce(combineApiCalls())
.orElse(CompletableFuture.completedFuture(emptyList()));
}
private Predicate<SocialMediaPlatform> hasUserForThatPlatform(SocialMediaConfig config) {
return platform -> config.getUserFor(platform) != null;
}
private BinaryOperator<CompletableFuture<Collection<Post>>> combineApiCalls() {
return (c1, c2) -> c1
.thenCombine(c2, (posts1, posts2) -> {
return Stream.concat(posts1.stream(), posts2.stream()).collect(toList());
});
}
}

For the implementation of every API call I’ve used some fake implementations that return a CompletableFuture with a fixed number of posts after a short delay. Here is one of the fake implementations:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.LocalTime;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
public class FacebookPostFetcher implements PostFetcher {
private static final Logger LOGGER = LoggerFactory.getLogger(FacebookPostFetcher.class);
@Override
public CompletableFuture<Collection<Post>> fetchLatestPostsFor(String username) {
final Executor executor = CompletableFuture.delayedExecutor(100, TimeUnit.MILLISECONDS);
return CompletableFuture.supplyAsync(() -> {
LOGGER.info("Fetching posts from Facebook for user {} on thread {}", username, Thread.currentThread().getName());
return List.of(
new Post(username, "http://url/image.png", "This is Facebook post 1", LocalTime.of(4, 12, 18)),
new Post(username, "http://url/image.png", "This is Facebook post 2", LocalTime.of(4, 12, 18)),
new Post(username, "http://url/image.png", "This is Facebook post 3", LocalTime.of(4, 12, 18))
);
}, executor);
}
}

And finally, to make sure that everything works as expected we have a few unit tests:

import org.junit.Test;
import java.time.LocalTime;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.util.Lists.emptyList;
public class FakeSocialMediaServiceTest {
private final SocialMediaService socialMediaService = new FakeSocialMediaService();
@Test
public void shouldReturnNoPostsIfNoSocialMediaUserHasBeenConfigured() {
final SocialMediaConfig config = new SocialMediaConfig.Builder()
.build();
final CompletableFuture<Collection<Post>> future = socialMediaService.fetchPosts(config);
assertThat(future.join()).isEqualTo(emptyList());
}
@Test
public void shouldReturnPostsOnlyFromOnePlatformIfOnePlatformHasBeenConfigured() {
final SocialMediaConfig config = new SocialMediaConfig.Builder()
.withTwitterUser("myTwitterUser")
.build();
final CompletableFuture<Collection<Post>> future = socialMediaService.fetchPosts(config);
assertThat(future.join()).isEqualTo(List.of(
new Post("myTwitterUser", "http://url/image.png", "This is Twitter post 1", LocalTime.of(4, 12, 18)),
new Post("myTwitterUser", "http://url/image.png", "This is Twitter post 2", LocalTime.of(4, 12, 18)),
new Post("myTwitterUser", "http://url/image.png", "This is Twitter post 3", LocalTime.of(4, 12, 18))
));
}
@Test
public void shouldReturnPostsFromEveryConfiguredPlatform() {
final SocialMediaConfig config = new SocialMediaConfig.Builder()
.withTwitterUser("myTwitterUser")
.withFacebookUser("myFacebookUser")
.withInstagramUser("myInstagramUser")
.build();
final CompletableFuture<Collection<Post>> future = socialMediaService.fetchPosts(config);
assertThat(future.join()).isEqualTo(List.of(
new Post("myTwitterUser", "http://url/image.png", "This is Twitter post 1", LocalTime.of(4, 12, 18)),
new Post("myTwitterUser", "http://url/image.png", "This is Twitter post 2", LocalTime.of(4, 12, 18)),
new Post("myTwitterUser", "http://url/image.png", "This is Twitter post 3", LocalTime.of(4, 12, 18)),
new Post("myFacebookUser", "http://url/image.png", "This is Facebook post 1", LocalTime.of(4, 12, 18)),
new Post("myFacebookUser", "http://url/image.png", "This is Facebook post 2", LocalTime.of(4, 12, 18)),
new Post("myFacebookUser", "http://url/image.png", "This is Facebook post 3", LocalTime.of(4, 12, 18)),
new Post("myInstagramUser", "http://url/image.png", "This is Instagram post 1", LocalTime.of(4, 12, 18)),
new Post("myInstagramUser", "http://url/image.png", "This is Instagram post 2", LocalTime.of(4, 12, 18)),
new Post("myInstagramUser", "http://url/image.png", "This is Instagram post 3", LocalTime.of(4, 12, 18))
));
}
}

That’s it! If you want to take a look at the whole example you can find it on Github here.

Conclusion

As we’ve seen in this more practical example, the use of CompletableFuture gives us a more flexible approach when calling external APIs, having the ability to call them and combine them in any way that we see fit.

It takes a bit of time to get used to this new paradigm but once we have full control over the API, we can write performant code much easier and quicker than we ever dreamed of.

I really hope that you’ve enjoyed reading this article, I’ve made my best to help improve your understanding of asynchronous programming in Java. Please feel free to add any comments below and please subscribe to be notified of my new articles if you’ve enjoyed this reading.

Thank you very much for reading!!!

One thought on “Combining multiple API calls with CompletableFuture

Add yours

Up ↑

Take a look at our recommended books!

Ok!
X
%d bloggers like this: