Virtual Threads

Discover Structured Concurrency

You may have heard about structured concurrency recently and you may be wondering what exactly is that about. Structured concurrency is a paradigm that tries to make concurrent programs easier to read and understand, quicker to write and, above all, make them safer.

Before we get into details, why is this needed? Let’s go through the reasons behind it.

Background

To be able to understand the need of structured concurrency we have to look back in time and see how concurrent programs were written a few years ago.

In an unstructured concurrency paradigm, we start threads anywhere in our code and it’s unclear where each thread starts or ends.

We can look at the example below in Java; despite of being easier to understand than other older examples in Java, it still has a major problem: it’s not safe, as the failure of one of the threads does not cancel any of the other tasks and the results are unpredictable.

public static void main(String[] args) throws InterruptedException {
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("I'm task 1 running on " + Thread.currentThread().getName());
});
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("I'm task 2 running on " + Thread.currentThread().getName());
});
CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
System.out.println("I'm task 3 running on " + Thread.currentThread().getName());
throw new RuntimeException("Something went wrong!");
});
CompletableFuture.allOf(future1, future2, future3)
.join();
System.out.println("All tasks started!");
Thread.sleep(3000);
}

If we run the section of code above, we’ll see how task number 3 runs an exception immediately and despite of that happening, none of the other tasks stop. The application continues and the other tasks complete successfully after a couple of seconds. This is not what we’d expect, ideally we’d want the other tasks to be cancelled immediately. Imaging that one of the tasks runs forever, we could be leaking a thread every time something fails.

> Task :UnstructuredConcurrency.main() FAILED
I'm task 3 running on ForkJoinPool.commonPool-worker-3
I'm task 2 running on ForkJoinPool.commonPool-worker-2
I'm task 1 running on ForkJoinPool.commonPool-worker-1

Deprecated Gradle features were used in this build, making it incompatible with Gradle 8.0.

You can use '--warning-mode all' to show the individual deprecation warnings and determine if they come from your own scripts or plugins.

See https://docs.gradle.org/7.4/userguide/command_line_interface.html#sec:command_line_warnings
2 actionable tasks: 1 executed, 1 up-to-date
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.RuntimeException: Something went wrong!

What is the solution then? This is where structured concurrency comes into play, let’s see how.

Structured concurrency

The main goals of structured concurrency are two: being able to write a more understandable concurrent code and avoid thread leaks. The idea is to always guarantee that when the control flow of our code gets split into multiple concurrent threads, we make sure that at the end of this flow we always merge these threads together and we always keep control of how they get executed. Basically with structured concurrency we make sure that no orphaned threads are hanging out of control in our application ever.

Discover Structured Concurrency » The Bored Dev
Photo by Alain Pham on Unsplash

This means that the start of our concurrent code and its end are always explicit, in that way any developer can easily understand concurrent code independently of their experience or expertise about concurrent programming; simplicity always wins.

In recent years several programming languages have introduced structured concurrency; Golang, Kotlin or Python are among them. We’re going to take a look at how Kotlin in particular achieves structured concurrency.

Structured Concurrency in Kotlin

Kotlin enforces structured concurrency by using coroutine scopes. Coroutines are the name that Kotlin gives to its “virtual threads“, units of execution that require a OS thread to do work but are not tied to a given OS thread. They can wait for IO without blocking a given OS thread and another coroutine can take the same OS thread during that time. You can read more about Kotlin coroutines here.

Discover Structured Concurrency » The Bored Dev

The idea is that a coroutine scope always belongs to an entity with a finite lifetime, for example a heavy scheduled batch job using thousands of threads to achieve higher throughput.
When that job finishes, as its scope should also die, all the coroutines or virtual threads belonging to that scope shouldn’t survive either.

There is a GlobalScope available in Kotlin, although its use is not recommended. Why? Because global scope will endure during the whole lifetime of an application, in most of the cases that’s not what we want as that will lead to multiple thread leaks.

The only valid use case of global scope is when a given entity’s lifetime is the same as the application or service itself, for example a background job that keeps repeating its execution forever. Let’s imagine that we are implementing a distributed system that requires running a background job in each node to send a periodic heartbeat to the rest of the nodes in the system as long as the application is running. If we wanted to do so in Kotlin, we’d have to run a thread with GlobalScope.launch ; by doing that we are creating a (virtual) thread that will only stop when the application has been told to shut down.

As a rule of thumb you’ll always have to define a scope for your threads, it doesn’t really matter if you’re running one or more threads. By doing so, you’ll guarantee that if something goes wrong in your application within that scope, all the children threads within that scope get cancelled gracefully and you’ll have no thread leaks no matter what!

Let’s take a look at an example in Kotlin to understand the behaviour we’ve just described.

suspend fun main(): Unit {
GlobalScope.async {
val result = sumNumbersToRandomNumbers(1, 10)
println("Task completed with result $result!")
}.await()
}
suspend fun sumNumbersToRandomNumbers(n1: Int, n2: Int): Int =
coroutineScope {
val job1 = async {
delay(1000)
println("Adding random number to $n1")
n1 + nextRandom()
}
val job2 = async {
delay(500)
println("Adding random number to $n2")
n2 + nextRandom()
}
job1.await() + job2.await()
}
private fun nextRandom(): Int = (0..9999).random()
view raw Coroutines.kt hosted with ❤ by GitHub
Credit: Author

In the example above we run two background threads within a given scope. The result of running this code will be something similar to this:

Adding random number to 10
Adding random number to 1
Task completed with result 13232!

Process finished with exit code 0

Job two prints its value first due to the longer delay we’ve specified to simulate an IO delay.

In order to prove that this code is safe when something goes wrong, we’re going to throw an exception in the second coroutine during the time that the first thread is waiting for one second. In the example we saw earlier using CompletableFuture, when an exception was raised the other threads continued like nothing wrong happened; in this later example that won’t happen, we’ll be cancelling any running thread within that scope. The code used to simulate what we just described is the following:

suspend fun main(): Unit {
GlobalScope.async {
val result = sumNumbersToRandomNumbers(1, 10)
println("Task completed with result $result!")
}.await()
}
suspend fun sumNumbersToRandomNumbers(n1: Int, n2: Int): Int =
coroutineScope {
val job1 = async {
delay(1000)
println("Adding random number to $n1")
n1 + nextRandom()
}
val job2 = async {
delay(500)
throw RuntimeException("Something went wrong!")
println("Adding random number to $n2")
n2 + nextRandom()
}
job1.await() + job2.await()
}
private fun nextRandom(): Int = (0..9999).random()
view raw Coroutines.kt hosted with ❤ by GitHub
Credit: Author

If you check the second job, we raise an exception after 500ms. At that point job 1 should still be waiting for an additional 500ms and then print something. However, if things are implemented correctly, we should see an exception being thrown and the runtime should cancel all the running threads immediately. That’s exactly what happens when we run this code:

Exception in thread "main" java.lang.RuntimeException: Something went wrong!
	at org.theboreddev.CoroutinesKt$sumNumbersToRandomNumbers$2$job2$1.invokeSuspend(Coroutines.kt:27)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664)

Feel free to run it yourself and play around with it to see that it works as expected.

We’ve seen how Kotlin allows us to group coroutines within the same scope and how it automatically handles any thread within that scope accordingly. But, what about Java? Can we do something similar in Java?

Structured Concurrency in Java

Discover Structured Concurrency » The Bored Dev

The short answer is NO, or at least NOT YET. The JDK team has been working on some changes in the Java concurrency model for a long time, this project is called Project Loom. The changes implemented as part of this work will introduce virtual threads and structured concurrency in Java. With regards to structured concurrency, it’s been included in Java 19 as part of JEP 428 but it’s still in “incubator” phase. Virtual threads have been introduced as a preview feature in Java 19 as well as part of JEP 425, you’ll have to enable previews in order to be able to test it.

When these changes become stable, Java concurrency model and the way we currently understand multithreading in Java will completely change for the better.

The existing limitation in Java that ties a Java thread to an OS thread will no longer exist, being able to build more robust, reliable and performant systems. This is a huge step forward in the Java ecosystem!

The code below is an example of the new proposal for structured concurrency in Java, you’ll be able to see that it’s a completely new paradigm in Java.

Response handle() throws ExecutionException, InterruptedException {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        Future<String>  user  = scope.fork(() -> findUser());
        Future<Integer> order = scope.fork(() -> fetchOrder());

        scope.join();           // Join both forks
        scope.throwIfFailed();  // ... and propagate errors

        // Here, both forks have succeeded, so compose their results
        return new Response(user.resultNow(), order.resultNow());
    }
}

We’re not going to look into much details around Java virtual threads in this article, but I promise we’ll talk more about them very soon. From what we’ve seen so far, it looks very promising but we are going to leave it here as a little teaser for you to start getting interested in the future concurrency model in Java!

Conclusion

We’ve seen how some languages implement virtual threads to build more resilient systems where an OS thread can be shared among hundreds or thousands or virtual threads, as virtual threads are very lightweight compared to existing Java threads. Even Java is now migrating to this new paradigm, that allows a concurrent code easier to understand and to write. Very soon there will no need to write error-prone concurrent code where we have to consider a ton of possible situations where our code could execute wrongly!

If you have interest in how Golang implements concurrency, you can start by looking at “goroutineshere.

That’s all from us today! We really hope you’ve enjoyed this article and got a better understanding about what structured concurrency is and why we need it. As always, if you like our articles please follow us to be notified when a new article gets published.

Thanks for reading us!