GPars meets Virtual Threads
Author: Paul King
Published: 2022-06-15 11:28AM
An exciting preview feature coming in JDK19 is Virtual Threads
(JEP 425). In my experiments
so far, virtual threads work well with my favourite Groovy parallel
and concurrency library GPars. GPars has been
around a while (since Java 5 and Groovy 1.8 days) but still has
many useful features. Let’s have a look at a few examples.
If you want to try these out, make sure you have a recent JDK19 (currently EA) and enable preview features with your Groovy tooling.
Parallel Collections
First a refresher, to use the GPars parallel collections feature with normal threads, use the GParsPool.withPool method as follows:
withPool {
assert [1, 2, 3].collectParallel{ it ** 2 } == [1, 4, 9]
}
For any Java readers, don’t get confused with the collectParallel
method name. Groovy’s collect
method (naming inspired by
Smalltalk) is the equivalent of Java’s map
method. So, the
equivalent Groovy code using the Java streams API would be
something like:
assert [1, 2, 3].parallelStream().map(n -> n ** 2).collect(Collectors.toList()) == [1, 4, 9]
Now, let’s bring virtual threads into the picture. Luckily, GPars parallel collection facilities provide a hook for using an existing custom executor service. This makes using virtual threads for such code easy:
withExistingPool(Executors.newVirtualThreadPerTaskExecutor()) {
assert [1, 2, 3].collectParallel{ it ** 2 } == [1, 4, 9]
}
Nice! But let’s move onto some areas examples which might be less familiar to Java developers.
GPars has additional features for providing custom thread pools
and the remaining examples rely on those features. The current
version of GPars doesn’t have a DefaultPool
constructor that
takes a vanilla executor service, so, we’ll write our own class:
@AutoImplement
class VirtualPool implements Pool {
private final ExecutorService pool = Executors.newVirtualThreadPerTaskExecutor()
int getPoolSize() { pool.poolSize }
void execute(Runnable task) { pool.execute(task) }
ExecutorService getExecutorService() { pool }
}
It is essentially a delegate from the GPars Pool
interface
to the virtual threads executor service.
We’ll use this in the remaining examples.
Agents
Agents provide a thread-safe non-blocking wrapper around an otherwise potentially mutable shared state object. They are inspired by agents in Clojure.
In our case we’ll use an agent to "protect" a plain ArrayList
.
For this simple case, we could have used some synchronized list,
but in general, agents eliminate the need to find thread-safe
implementation classes or indeed care at all about the thread
safety of the underlying wrapped object.
def mutableState = [] // a non-synchronized mutable list
def agent = new Agent(mutableState)
agent.attachToThreadPool(new VirtualPool()) // omit line for normal threads
agent { it << 'Dave' } // one thread updates list
agent { it << 'Joe' } // another thread also updating
assert agent.val.size() == 2
Actors
Actors allow for a message passing-based concurrency model. The actor model ensures that at most one thread processes the actor’s body at any time. The GPars API and DSLs for actors are quite rich supporting many features. We’ll look at a simple example here.
GPars manages actor thread pools in groups. Let’s create one backed by virtual threads:
def vgroup = new DefaultPGroup(new VirtualPool())
Now we can write an encrypting and decrypting actor pair as follows:
def decryptor = vgroup.actor {
loop {
react { String message ->
reply message.reverse()
}
}
}
def console = vgroup.actor {
decryptor << 'lellarap si yvoorG'
react {
println 'Decrypted message: ' + it
}
}
console.join() // output: Decrypted message: Groovy is parallel
Dataflow
Dataflow offers an inherently safe and robust declarative
concurrency model. Dataflows are also managed via thread
groups, so we’ll use vgroup
which we created earlier.
We have three logical tasks which can run in parallel and perform their work. The tasks need to exchange data and they do so using dataflow variables. Think of dataflow variables as one-shot channels safely and reliably transferring data from producers to their consumers.
def df = new Dataflows()
vgroup.task {
df.z = df.x + df.y
}
vgroup.task {
df.x = 10
}
vgroup.task {
df.y = 5
}
assert df.z == 15
The dataflow framework works out how to schedule the individual tasks and ensures that a task’s input variables are ready when needed.
Conclusion
We have had a quick glimpse at using virtual threads with Groovy and GPars. It is very early days, so expect much more to emerge in this space once virtual threads are released in preview in production versions of JDK19 and eventually beyond a preview feature.