Using Gatherers with Groovy
Author: Paul King
Published: 2023-12-09 03:30PM (Last updated: 2024-01-18 10:00PM)
An interesting feature being previewed in JDK22 is Gatherers (JEP 461). This blog looks at using that feature with Groovy. The examples in this blog were tested with Groovy 4.0.16 using JDK version 22-ea+27-2262. As the JDK version we used is still in early access status, you should read the disclaimers to understand that this JDK feature is subject to change before final release. If and when the feature becomes final, it looks like Groovy will automatically support it without needing any additional tweaks.
Understanding Gatherers
Java developers are by now very familiar with streams.
A stream is a potentially unbounded sequence of values supporting lazy computation.
Processing streams is done via a stream pipeline which consists of three parts:
a source of elements, zero or more intermediate operations (like filter
and map
),
and a terminal operation.
This framework is very powerful and efficient and offers some extensibility via a customisable terminal operation. The available intermediate operations is fixed in size, and while the built-in ones are very useful, some complex tasks cannot easily be expressed as stream pipelines. Enter gatherers. Gatherers provide the ability to customize intermediate operations.
With gatherers, the stream API is updated to support a gather
intermediate operation
which takes a gatherer and returns a transformed stream.
Let’s dive into a few more details of gatherers.
A gatherer is defined by four pieces of functionality:
-
The optional initializer is just a
Supplier
which returns some (initial) state. -
The integrator is typically the most important part. It satisfies the following interface:
interface Integrator<A, T, R> { boolean integrate(A state, T element, Downstream<? super R> downstream); }
where
state
is some state — we’ll use a list as state in a few of the upcoming examples, but it could just as easily be an instance of some other class or record,element
is the next element in the current stream to be processed, anddownstream
is a hook for creating the elements that will be processed in the next stage of the stream pipeline. -
The optional
finisher
has access to the state and downstream pipeline hook. It performs any last step actions which might be needed. -
The optional combiner is used to evaluate the gatherer in parallel when processing an input stream in parallel. The examples we’ll look at in this blog post are inherently ordered in nature and thus cannot be parallelized, so we won’t discuss this aspect further here.
Over and above, the Gatherer API, there are a number of built-in gathers
like windowFixed
, windowSliding
, and fold
, among others.
Before getting into functionality where gatherers will become essential, let’s start off by looking at accessing collections where functionality is well provided for in both the collection and stream APIs and related extension methods.
Accessing parts of a collection
Groovy provides very flexible indexing variants to select specific elements from a collection:
assert (1..8)[0..2] == [1, 2, 3] // index by closed range
assert (1..8)[3<..<6] == [5, 6] // index by open range
assert (1..8)[0..2,3..4,5] == [1, 2, 3, 4, 5, 6] // index by multiple ranges
assert (1..8)[0..2,3..-1] == 1..8 // ditto
assert (1..8)[0,2,4,6] == [1,3,5,7] // select odd numbers
assert (1..8)[1,3,5,7] == [2,4,6,8] // select even numbers
You can also pick out a window of elements using take
and drop
:
assert (1..8).take(3) == [1, 2, 3] // same as [0..2]
assert (1..8).drop(2).take(3) == [3, 4, 5] // same as [2..4]
Stream users might do the same thing using skip
and limit
:
assert (1..8).stream().limit(3).toList() == [1, 2, 3]
assert (1..8).stream().skip(2).limit(3).toList() == [3, 4, 5]
We can see here there are stream equivalents for drop
and take
,
but what about some of Groovy’s more elaborate mechanisms for manipulating collections?
I’m glad you asked. Let’s look at stream equivalents for collate
and chop
.
Collate
Groovy’s collate
method splits a collection into fixed size chunks:
assert (1..8).collate(3) == [[1, 2, 3], [4, 5, 6], [7, 8]]
The last chunk in this example is smaller than the chunk size. It contains the remaining elements left over after all full size chunks have been created. If we don’t want the leftover chunk, we can ask for it to be excluded using an optional boolean parameter:
assert (1..8).collate(3, false) == [[1, 2, 3], [4, 5, 6]]
Such functionality isn’t really possible with streams unless you wanted to process the stream multiple times, or you shoved all the logic in the collector, but then you’d be giving up some of the key benefits of streams. Luckily, with gatherers, we can now obtain this functionality.
The first case is so common, there is a built-in gatherer (Gatherers#windowFixed
) for it:
assert (1..8).stream().gather(windowFixed(3)).toList() ==
[[1, 2, 3], [4, 5, 6], [7, 8]]
There is no exact equivalent to handle the less common case of discarding the leftover elements, but it’s easy enough to write our own gatherer:
<T> Gatherer<T, ?, List<T>> windowFixedTruncating(int windowSize) {
Gatherer.ofSequential(
() -> [], // initializer
Gatherer.Integrator.ofGreedy { window, element, downstream -> // integrator
window << element
if (window.size() < windowSize) return true
var result = List.copyOf(window)
window.clear()
downstream.push(result)
}
)
}
We have an initializer which just returns an empty list as our initial state. The integrator keeps adding elements to the state (our list or window). Once the list is filled to the window size, we’ll output it to the downstream, and then clear the list ready for the next window.
The code here is essentially a simplified version of windowFixed
, we can
just leave out the finisher that windowFixed
would require to potentially
output the partially-filled window at the end.
A few details:
-
Our operation is sequential since it is inherently ordered, hence we used
ofSequential
to mark it so. -
We will also always process all elements, so we create a greedy gatherer using
ofGreedy
. While not strictly necessary, this allows for optimisation of the pipeline. -
We have specifically left out some validation logic out of this example to focus on the new gatherer functionality. Check out how
windowFixed
throwsIllegalArgumentException
for window sizes less than 1 to see what should really also be added here if you were using this in production.
We’d use windowFixedTruncating
like this:
assert (1..8).stream().gather(windowFixedTruncating(3)).toList() ==
[[1, 2, 3], [4, 5, 6]]
The default when using collate
is to start the next chunk/window
at the element directly after the previous one, but there are overloads
which also take a step size. This is used to calculate the index at which
the second (and subsequent) window(s) will start.
There is an optional keepRemaining
boolean
to handle the leftover case as well.
If we want to slide along by 1 and discard leftovers, we’d use:
assert (1..5).collate(3, 1, false) == [[1, 2, 3], [2, 3, 4], [3, 4, 5]]
This aligns with the built-in windowSliding
gatherer:
assert (1..5).stream().gather(windowSliding(3)).toList() ==
[[1, 2, 3], [2, 3, 4], [3, 4, 5]]
If we want the step size to be other than 1, or we want control over the leftovers, there is no built-in gatherer option, but we can again write one ourselves. Let’s consider some examples. We’ll look at a gatherer implementation shortly, but first Groovy’s collection variants:
assert (1..5).collate(3, 1) == [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5], [5]]
assert (1..8).collate(3, 2) == [[1, 2, 3], [3, 4, 5], [5, 6, 7], [7, 8]]
assert (1..8).collate(3, 2, false) == [[1, 2, 3], [3, 4, 5], [5, 6, 7]]
assert (1..8).collate(3, 4, false) == [[1, 2, 3], [5, 6, 7]]
assert (1..8).collate(3, 3) == [[1, 2, 3], [4, 5, 6], [7, 8]] // same as collate(3)
Now let’s write our gatherer:
<T> Gatherer<T, ?, List<T>> windowSlidingByStep(int windowSize, int stepSize, boolean keepRemaining = true) {
int skip = 0
Gatherer.ofSequential(
() -> [], // initializer
Gatherer.Integrator.ofGreedy { window, element, downstream -> // integrator
if (skip) {
skip--
return true
}
window << element
if (window.size() < windowSize) return true
var result = List.copyOf(window)
skip = stepSize > windowSize ? stepSize - windowSize : 0
[stepSize, windowSize].min().times { window.removeFirst() }
downstream.push(result)
},
(window, downstream) -> { // finisher
if (keepRemaining) {
while(window.size() > stepSize) {
downstream.push(List.copyOf(window))
stepSize.times{ window.removeFirst() }
}
downstream.push(List.copyOf(window))
}
}
)
}
Some points:
-
Our gatherer is still sequential for the same reasons as previously. We are still processing every element, so we again created a greedy gatherer.
-
We have a little bit of optimization baked into the code. If our step size is bigger than the window size, we can do no further processing in our gatherer for the elements in between our windows. We could simplify the code and store those elements only to throw them away later, but it’s not too much effort to make the algorithm as efficient as possible.
-
We also need a finisher here which handles the leftover chunk(s) when required.
-
As per the previous example, we chose to elide some argument validation logic.
And we’d use it like this:
assert (1..5).stream().gather(windowSlidingByStep(3, 1)).toList() ==
[[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5], [5]]
assert (1..8).stream().gather(windowSlidingByStep(3, 2)).toList() ==
[[1, 2, 3], [3, 4, 5], [5, 6, 7], [7, 8]]
assert (1..8).stream().gather(windowSlidingByStep(3, 2, false)).toList() ==
[[1, 2, 3], [3, 4, 5], [5, 6, 7]]
assert (1..8).stream().gather(windowSlidingByStep(3, 4, false)).toList() ==
[[1, 2, 3], [5, 6, 7]]
assert (1..8).stream().gather(windowSlidingByStep(3, 3)).toList() ==
[[1, 2, 3], [4, 5, 6], [7, 8]]
Before leaving this section, let’s look at a few examples using Groovy’s language integrated query capabilities as an alternative way to manipulate collections.
Firstly, the equivalent of what we saw with take
/ limit
:
assert GQL {
from n in 1..8
limit 3
select n
} == [1, 2, 3]
Then, the equivalent if we added in drop
/ skip
:
assert GQL {
from n in 1..8
limit 2, 3
select n
} == [3, 4, 5]
Finally, a sliding window equivalent:
assert GQL {
from ns in (
from n in 1..8
select n, (lead(n) over(orderby n)), (lead(n, 2) over(orderby n))
)
limit 3
select ns
}*.toList() == [[1, 2, 3], [2, 3, 4], [3, 4, 5]]
Chop
A related collection extension method in Groovy is chop
.
For this method, we also create chunks from the original collection but rather
than specifying a fixed size that applies to all chunks, we specify the size we
want for each chunk. We give a list of sizes, and each size is only used once.
The special size of -1
indicates that we want the remainder of the collection as
the last chunk.
assert (1..8).chop(3) == [[1, 2, 3]]
assert (1..8).chop(3, 2, 1) == [[1, 2, 3], [4, 5], [6]]
assert (1..8).chop(3, -1) == [[1, 2, 3], [4, 5, 6, 7, 8]]
There is no original stream or pre-built gatherer for this functionality. We’ll write our own:
<T> Gatherer<T, ?, List<T>> windowMultiple(int... steps) {
var remaining = steps.toList()
Gatherer.ofSequential(
() -> [],
Gatherer.Integrator.of { window, element, downstream ->
if (!remaining) {
return false
}
window << element
if (remaining[0] != -1) remaining[0]--
if (remaining[0]) return true
remaining.removeFirst()
var result = List.copyOf(window)
window.clear()
downstream.push(result)
},
(window, downstream) -> {
if (window) {
var result = List.copyOf(window)
downstream.push(result)
}
}
)
}
Some points:
-
This is also an ordered algorithm, so we use
ofSequential
again. -
This is similar to what we used for collate, but we have a different sized window for each chunk size as we process the elements.
-
Once we hit the last chunk, we don’t want to process further elements unless we see the special -1 marker, so we won’t create a greedy gatherer.
-
We do need a finisher to potentially output elements that have been stored but not yet pushed downstream.
We’d use windowMultiple
like this:
assert (1..8).stream().gather(windowMultiple(3)).toList() ==
[[1, 2, 3]]
assert (1..8).stream().gather(windowMultiple(3, 2, 1)).toList() ==
[[1, 2, 3], [4, 5], [6]]
assert (1..8).stream().gather(windowMultiple(3, -1)).toList() ==
[[1, 2, 3], [4, 5, 6, 7, 8]]
Inject, fold and scan
Groovy’s inject
is a little different to the stream APIs reduce
intermediate operator.
The latter expects a binary operator which restricts the types of the elements
being consumed and produced.
The inject
method can have different types for its arguments as shown here:
assert (1..5).inject(''){ string, number -> string + number } == '12345'
The fold
built-in gatherer allows us to write the equivalent functionality for stream processing as shown here:
assert (1..5).stream()
.gather(fold(() -> '', (string, number) -> string + number))
.findFirst()
.get() == '12345'
Let’s look at another inject
example. This time cumulative sum.
If we have a sequence of numbers, the cumulative sum is another sequence
whose value at any index is determined by accumulating all the
numbers from the original sequence up to and including the index in question, e.g. the cumulative sum of [1, 2, 3, 4]
is [1, 3, 6, 10]
.
This is again a good fit for Groovy’s inject
:
assert (1..5).inject([]) { acc, next ->
acc + [acc ? acc.last() + next : next]
} == [1, 3, 6, 10, 15]
Groovy has a number of alternatives to achieve this functionality.
Here is one using inits
:
assert (1..5).inits().grep().reverse()*.sum() == [1, 3, 6, 10, 15]
inits
is a list processing function which we cover in more detail
in the next section.
Before examining gatherer equivalents, we should note that this particular operation is deemed useful enough that Java actually has built-in library function for arrays:
Integer[] nums = 1..5
Arrays.parallelPrefix(nums, Integer::sum)
assert nums == [1, 3, 6, 10, 15]
Cumulative sum isn’t well suited to traditional streams,
but now with gatherers, we can use the scan
built-in gatherer
to have similar functionality when processing streams:
assert (1..5).stream()
.gather(scan(() -> 0, Integer::sum))
.toList() == [1, 3, 6, 10, 15]
Testing for a subsequence (fun with inits
and tails
)
As a final example, let’s have a look at how we might test if one list is a subset of another.
We’ll start with a list of words, and a list containing the ordered search terms:
var words = 'the quick brown fox jumped over the lazy dog'.split().toList()
var search = 'brown fox'.split().toList()
It turns out that this is solved already in the JDK for collections:
assert Collections.indexOfSubList(words, search) != -1
Let’s have a look at some possible stream implementations.
But first a diversion. For any functional programmers who might have dabbled
with Haskell, you may have seen the book Learn You a Haskell for Great Good!. It sets an interesting exercise for finding a "Needle in the Haystack"
using inits
and tails
. So what are inits
and tails
? They are built-in functions
in Haskell and Groovy:
assert (1..6).inits() == [[1, 2, 3, 4, 5, 6],
[1, 2, 3, 4, 5],
[1, 2, 3, 4],
[1, 2, 3],
[1, 2],
[1],
[]]
assert (1..6).tails() == [[1, 2, 3, 4, 5, 6],
[2, 3, 4, 5, 6],
[3, 4, 5, 6],
[4, 5, 6],
[5, 6],
[6],
[]]
Once we know about these methods, we can paraphrase the "Needle in the Haystack" solution for collections in Groovy as follows:
var found = words.tails().any{ subseq -> subseq.inits().contains(search) }
assert found
It may not be the most efficient implementation of this functionality, but it has a nice symmetry. Let’s now explore some stream-based solutions.
We can start off with a tails
gatherer:
<T> Gatherer<T, ?, List<T>> tails() {
Gatherer.ofSequential(
() -> [],
Gatherer.Integrator.ofGreedy { state, element, downstream ->
state << element
return true
},
(state, downstream) -> {
state.tails().each(downstream::push)
}
)
}
In the integrator, we just store away all the elements, and in the finisher we do all the work. This works but isn’t really properly leveraging the stream pipeline nature.
We can check it works as follows:
assert search.stream().gather(tails()).toList() ==
[['brown', 'fox'], ['fox'], []]
We could continue with this approach to create an initsOfTails
gatherer:
<T> Gatherer<T, ?, List<T>> initsOfTails() {
Gatherer.ofSequential(
() -> [],
Gatherer.Integrator.ofGreedy { state, element, downstream ->
state << element
return true
},
(state, downstream) -> {
state.tails()*.inits().sum().each(downstream::push)
}
)
}
Again, all the work is in the finisher, and we haven’t really made use of the power of the stream pipeline.
It still works of course:
assert words.stream().gather(initsOfTails()).anyMatch { it == search }
But it might have been more efficient to have collected
the stream as a list and used Groovy’s built-in inits
and tails
on that.
But all is not lost. If we are willing to tweak the algorithm slightly,
we could make better use of the stream pipeline. For example, if we don’t
mind getting the inits
results in the reverse order, we could define the following
gatherer for inits
:
<T> Gatherer<T, ?, List<T>> inits() {
Gatherer.ofSequential(
() -> [],
Gatherer.Integrator.ofGreedy { state, element, downstream ->
downstream.push(List.copyOf(state))
state << element
return true
},
(state, downstream) -> {
downstream.push(state)
}
)
}
Which we’d use like this:
assert search.stream().gather(inits()).toList() ==
[[], ['brown'], ['brown', 'fox']]
Further information
Conclusion
It is great that Groovy has a rich set of methods that work with collections. Some of these methods have stream equivalents, and now we see that using gatherers with Groovy, we can emulate more of the methods. Not all algorithms need or benefit from using streams, but it’s great to know that with gatherers, we can likely pick whichever style makes sense.
We are still in the early days of gatherers being available, so expect much more to emerge as this feature becomes more mainstream. We look forward to it advancing past preview status.