GPars meets Virtual Threads

Author: Paul King
Published: 2022-06-15 11:28AM (Last updated: 2023-04-14 06:23PM)


gpars An exciting feature coming in JDK21 is Virtual Threads (JEP 444). It has been previewed in JDK19 (JEP 425) and JDK20 (JEP 436) but JDK21 will be the first version that the feature is available without using Java’s preview switch. The examples in this blog were tested with Groovy 4.0.11 using JDK version 21-ea+18-1480 (and using JDK19 with preview features enabled).

Virtual threads work well with my favourite Groovy parallel and concurrency library GPars. GPars has been around a while (since Java 5 and Groovy 1.8 days) but still has many useful features. Let’s have a look at a few examples.

If you want to try these out, use the latest JDK21ea version which has virtual thread support as standard. Or use a recent JDK19-20 version and enable preview features with your Groovy tooling.

Parallel Collections

First a refresher, we’ll first look at using the GPars parallel collections feature with normal threads. Let’s start with a list of numbers:

var nums = [1, 2, 3]

To calculate the list of squares of our original numbers in parallel with normal threads, we use the GParsPool.withPool method as follows:

withPool {
    assert nums.collectParallel{ it ** 2 } == [1, 4, 9]
}

For any Java readers, don’t get confused with the collectParallel method name. Groovy’s collect method (naming inspired by Smalltalk) is the equivalent of Java’s map method. So, the equivalent Groovy code using the Java streams API would be something like:

assert nums.parallelStream().map(n -> n ** 2).toList() == [1, 4, 9]

Now, let’s bring virtual threads into the picture. Luckily, GPars parallel collection facilities provide a hook for using an existing custom executor service (GParsExecutorsPool.withExistingPool). This makes using virtual threads for such code easy. First we create our pool (backed by virtual threads):

var vtPool = Executors.newVirtualThreadPerTaskExecutor()

Now, we can use it as follows:

withExistingPool(vtPool) {
    assert nums.collectParallel{ it ** 2 } == [1, 4, 9]
}

Or we can use one of the other many '*Parallel' methods, in this case findAllParallel:

var isEven = n -> n % 2 == 0
withExistingPool(vtPool) {
    assert (1..9).findAllParallel(isEven) == (2..8).step(2)
}

Nice! Using virtual threads is very simple!

Let’s look at one more example, the FizzBuzz example:

var result = withExistingPool(vtPool) {
    (1..15).collectParallel {
        switch(it) {
            case { it % 15 == 0 } -> 'FizzBuzz'
            case { it % 5 == 0 } -> 'Buzz'
            case { it % 3 == 0 } -> 'Fizz'
            default -> it
        }
    }.join(',')
}
assert result == '1,2,Fizz,4,Buzz,Fizz,7,8,Fizz,Buzz,11,Fizz,13,14,FizzBuzz'

Now, let’s move onto some examples which might be less familiar to Java developers.

GPars has additional features for providing custom thread pools and the remaining examples rely on those features. The current version of GPars doesn’t have a DefaultPool constructor that takes a vanilla executor service; so, we’ll write our own class:

@AutoImplement
class VirtualPool implements Pool {
    private final ExecutorService pool = Executors.newVirtualThreadPerTaskExecutor()
    int getPoolSize() { pool.poolSize }
    void execute(Runnable task) { pool.execute(task) }
    ExecutorService getExecutorService() { pool }
}

It is essentially a delegate from the GPars Pool interface to the virtual threads executor service.

We’ll use this in the remaining examples.

Agents

Agents provide a thread-safe non-blocking wrapper around an otherwise potentially mutable shared state object. They are inspired by agents in Clojure.

In our case we’ll use an agent to "protect" a plain ArrayList. For this simple case, we could have used some synchronized list, but in general, agents eliminate the need to find thread-safe implementation classes or indeed care at all about the thread safety of the underlying wrapped object.

var mutableState = []     // a non-synchronized mutable list
var agent = new Agent(mutableState)

agent.attachToThreadPool(new VirtualPool()) // omit line for normal threads

agent { it << 'Dave' }    // one thread updates list
agent { it << 'Joe' }     // another thread also updating
assert agent.val.size() == 2

Actors

Actors allow for a message passing-based concurrency model. The actor model ensures that at most one thread processes the actor’s body at any time. The GPars API and DSLs for actors are quite rich supporting many features. We’ll look at a simple example here.

GPars manages actor thread pools in groups. Let’s create one backed by virtual threads:

var vgroup = new DefaultPGroup(new VirtualPool())

Now we can write an encrypting and decrypting actor pair as follows:

var decryptor = vgroup.actor {
    loop {
        react { String message ->
            reply message.reverse()
        }
    }
}

var console = vgroup.actor {
    decryptor << 'lellarap si yvoorG'
    react {
        println 'Decrypted message: ' + it
    }
}

console.join() // output: Decrypted message: Groovy is parallel

Dataflow

Dataflow offers an inherently safe and robust declarative concurrency model. Dataflows are also managed via thread groups, so we’ll use vgroup which we created earlier.

For the sake of an example, we’ll create a scenario where two tasks are producing some results and a third task is adding the results of the other tasks.

gpars dataflow

We have three logical tasks which can run in parallel and perform their work. The tasks need to exchange data and they do so using dataflow variables. Think of dataflow variables as one-shot channels safely and reliably transferring data from producers to their consumers.

var df = new Dataflows()

vgroup.with {
    task {
        df.z = df.x + df.y
    }

    task {
        df.x = 10
    }

    task {
        df.y = 5
    }

    assert df.z == 15
}

This code is declarative in style. We can specify the three tasks in any order. We aren’t giving any indication of which tasks should occur first. The dataflow framework works out how to schedule the individual tasks and ensures that a task’s input variables are ready when needed.

Conclusion

We have had a quick glimpse at using virtual threads with Groovy and GPars. It is still early days with virtual threads, so expect much more to emerge as JDK21 becomes more mainstream.