Using Gatherers with Groovy

Author: Paul King
Published: 2023-12-09 03:30PM (Last updated: 2024-01-18 10:00PM)


An interesting feature being previewed in JDK22 is Gatherers (JEP 461). This blog looks at using that feature with Groovy. The examples in this blog were tested with Groovy 4.0.16 using JDK version 22-ea+27-2262. As the JDK version we used is still in early access status, you should read the disclaimers to understand that this JDK feature is subject to change before final release. If and when the feature becomes final, it looks like Groovy will automatically support it without needing any additional tweaks.

Understanding Gatherers

Java developers are by now very familiar with streams. A stream is a potentially unbounded sequence of values supporting lazy computation. Processing streams is done via a stream pipeline which consists of three parts: a source of elements, zero or more intermediate operations (like filter and map), and a terminal operation.

This framework is very powerful and efficient and offers some extensibility via a customisable terminal operation. The available intermediate operations is fixed in size, and while the built-in ones are very useful, some complex tasks cannot easily be expressed as stream pipelines. Enter gatherers. Gatherers provide the ability to customize intermediate operations.

With gatherers, the stream API is updated to support a gather intermediate operation which takes a gatherer and returns a transformed stream. Let’s dive into a few more details of gatherers.

A gatherer is defined by four pieces of functionality:

  • The optional initializer is just a Supplier which returns some (initial) state.

  • The integrator is typically the most important part. It satisfies the following interface:

    interface Integrator<A, T, R> {
        boolean integrate(A state, T element, Downstream<? super R> downstream);
    }

    where state is some state — we’ll use a list as state in a few of the upcoming examples, but it could just as easily be an instance of some other class or record, element is the next element in the current stream to be processed, and downstream is a hook for creating the elements that will be processed in the next stage of the stream pipeline.

  • The optional finisher has access to the state and downstream pipeline hook. It performs any last step actions which might be needed.

  • The optional combiner is used to evaluate the gatherer in parallel when processing an input stream in parallel. The examples we’ll look at in this blog post are inherently ordered in nature and thus cannot be parallelized, so we won’t discuss this aspect further here.

Over and above, the Gatherer API, there are a number of built-in gathers like windowFixed, windowSliding, and fold, among others.

Before getting into functionality where gatherers will become essential, let’s start off by looking at accessing collections where functionality is well provided for in both the collection and stream APIs and related extension methods.

Accessing parts of a collection

Groovy provides very flexible indexing variants to select specific elements from a collection:

assert (1..8)[0..2] == [1, 2, 3]                   // index by closed range
assert (1..8)[3<..<6] == [5, 6]                    // index by open range
assert (1..8)[0..2,3..4,5] == [1, 2, 3, 4, 5, 6]   // index by multiple ranges
assert (1..8)[0..2,3..-1] == 1..8                  // ditto
assert (1..8)[0,2,4,6] == [1,3,5,7]                // select odd numbers
assert (1..8)[1,3,5,7] == [2,4,6,8]                // select even numbers

You can also pick out a window of elements using take and drop:

assert (1..8).take(3) == [1, 2, 3]                 // same as [0..2]
assert (1..8).drop(2).take(3) == [3, 4, 5]         // same as [2..4]

Stream users might do the same thing using skip and limit:

assert (1..8).stream().limit(3).toList() == [1, 2, 3]
assert (1..8).stream().skip(2).limit(3).toList() == [3, 4, 5]

We can see here there are stream equivalents for drop and take, but what about some of Groovy’s more elaborate mechanisms for manipulating collections? I’m glad you asked. Let’s look at stream equivalents for collate and chop.

Collate

collate a list - produced by Dall-E 3 Groovy’s collate method splits a collection into fixed size chunks:

assert (1..8).collate(3) == [[1, 2, 3], [4, 5, 6], [7, 8]]

The last chunk in this example is smaller than the chunk size. It contains the remaining elements left over after all full size chunks have been created. If we don’t want the leftover chunk, we can ask for it to be excluded using an optional boolean parameter:

assert (1..8).collate(3, false) == [[1, 2, 3], [4, 5, 6]]

Such functionality isn’t really possible with streams unless you wanted to process the stream multiple times, or you shoved all the logic in the collector, but then you’d be giving up some of the key benefits of streams. Luckily, with gatherers, we can now obtain this functionality.

The first case is so common, there is a built-in gatherer (Gatherers#windowFixed) for it:

assert (1..8).stream().gather(windowFixed(3)).toList() ==
    [[1, 2, 3], [4, 5, 6], [7, 8]]

There is no exact equivalent to handle the less common case of discarding the leftover elements, but it’s easy enough to write our own gatherer:

<T> Gatherer<T, ?, List<T>> windowFixedTruncating(int windowSize) {
    Gatherer.ofSequential(
        () -> [],                                                      // initializer
        Gatherer.Integrator.ofGreedy { window, element, downstream ->  // integrator
            window << element
            if (window.size() < windowSize) return true
            var result = List.copyOf(window)
            window.clear()
            downstream.push(result)
        }
    )
}

We have an initializer which just returns an empty list as our initial state. The integrator keeps adding elements to the state (our list or window). Once the list is filled to the window size, we’ll output it to the downstream, and then clear the list ready for the next window.

The code here is essentially a simplified version of windowFixed, we can just leave out the finisher that windowFixed would require to potentially output the partially-filled window at the end.

A few details:

  • Our operation is sequential since it is inherently ordered, hence we used ofSequential to mark it so.

  • We will also always process all elements, so we create a greedy gatherer using ofGreedy. While not strictly necessary, this allows for optimisation of the pipeline.

  • We have specifically left out some validation logic out of this example to focus on the new gatherer functionality. Check out how windowFixed throws IllegalArgumentException for window sizes less than 1 to see what should really also be added here if you were using this in production.

We’d use windowFixedTruncating like this:

assert (1..8).stream().gather(windowFixedTruncating(3)).toList() ==
    [[1, 2, 3], [4, 5, 6]]

The default when using collate is to start the next chunk/window at the element directly after the previous one, but there are overloads which also take a step size. This is used to calculate the index at which the second (and subsequent) window(s) will start. There is an optional keepRemaining boolean to handle the leftover case as well. If we want to slide along by 1 and discard leftovers, we’d use:

assert (1..5).collate(3, 1, false) == [[1, 2, 3], [2, 3, 4], [3, 4, 5]]

This aligns with the built-in windowSliding gatherer:

assert (1..5).stream().gather(windowSliding(3)).toList() ==
    [[1, 2, 3], [2, 3, 4], [3, 4, 5]]

If we want the step size to be other than 1, or we want control over the leftovers, there is no built-in gatherer option, but we can again write one ourselves. Let’s consider some examples. We’ll look at a gatherer implementation shortly, but first Groovy’s collection variants:

assert (1..5).collate(3, 1) == [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5], [5]]
assert (1..8).collate(3, 2) == [[1, 2, 3], [3, 4, 5], [5, 6, 7], [7, 8]]
assert (1..8).collate(3, 2, false) == [[1, 2, 3], [3, 4, 5], [5, 6, 7]]
assert (1..8).collate(3, 4, false) == [[1, 2, 3], [5, 6, 7]]
assert (1..8).collate(3, 3) == [[1, 2, 3], [4, 5, 6], [7, 8]]  // same as collate(3)

Now let’s write our gatherer:

<T> Gatherer<T, ?, List<T>> windowSlidingByStep(int windowSize, int stepSize, boolean keepRemaining = true) {
    int skip = 0
    Gatherer.ofSequential(
        () -> [],                                                      // initializer
        Gatherer.Integrator.ofGreedy { window, element, downstream ->  // integrator
            if (skip) {
                skip--
                return true
            }
            window << element
            if (window.size() < windowSize) return true
            var result = List.copyOf(window)
            skip = stepSize > windowSize ? stepSize - windowSize : 0
            [stepSize, windowSize].min().times { window.removeFirst() }
            downstream.push(result)
        },
        (window, downstream) -> {                                      // finisher
            if (keepRemaining) {
                while(window.size() > stepSize) {
                    downstream.push(List.copyOf(window))
                    stepSize.times{ window.removeFirst() }
                }
                downstream.push(List.copyOf(window))
            }
        }
    )
}

Some points:

  • Our gatherer is still sequential for the same reasons as previously. We are still processing every element, so we again created a greedy gatherer.

  • We have a little bit of optimization baked into the code. If our step size is bigger than the window size, we can do no further processing in our gatherer for the elements in between our windows. We could simplify the code and store those elements only to throw them away later, but it’s not too much effort to make the algorithm as efficient as possible.

  • We also need a finisher here which handles the leftover chunk(s) when required.

  • As per the previous example, we chose to elide some argument validation logic.

And we’d use it like this:

assert (1..5).stream().gather(windowSlidingByStep(3, 1)).toList() ==
    [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5], [5]]
assert (1..8).stream().gather(windowSlidingByStep(3, 2)).toList() ==
    [[1, 2, 3], [3, 4, 5], [5, 6, 7], [7, 8]]
assert (1..8).stream().gather(windowSlidingByStep(3, 2, false)).toList() ==
    [[1, 2, 3], [3, 4, 5], [5, 6, 7]]
assert (1..8).stream().gather(windowSlidingByStep(3, 4, false)).toList() ==
    [[1, 2, 3], [5, 6, 7]]
assert (1..8).stream().gather(windowSlidingByStep(3, 3)).toList() ==
    [[1, 2, 3], [4, 5, 6], [7, 8]]

Before leaving this section, let’s look at a few examples using Groovy’s language integrated query capabilities as an alternative way to manipulate collections.

Firstly, the equivalent of what we saw with take / limit:

assert GQL {
    from n in 1..8
    limit 3
    select n
} == [1, 2, 3]

Then, the equivalent if we added in drop / skip:

assert GQL {
    from n in 1..8
    limit 2, 3
    select n
} == [3, 4, 5]

Finally, a sliding window equivalent:

assert GQL {
    from ns in (
        from n in 1..8
        select n, (lead(n) over(orderby n)), (lead(n, 2) over(orderby n))
    )
    limit 3
    select ns
}*.toList() == [[1, 2, 3], [2, 3, 4], [3, 4, 5]]

Chop

chop a list - produced by Dall-E 3 A related collection extension method in Groovy is chop. For this method, we also create chunks from the original collection but rather than specifying a fixed size that applies to all chunks, we specify the size we want for each chunk. We give a list of sizes, and each size is only used once. The special size of -1 indicates that we want the remainder of the collection as the last chunk.

assert (1..8).chop(3) == [[1, 2, 3]]
assert (1..8).chop(3, 2, 1) == [[1, 2, 3], [4, 5], [6]]
assert (1..8).chop(3, -1) == [[1, 2, 3], [4, 5, 6, 7, 8]]

There is no original stream or pre-built gatherer for this functionality. We’ll write our own:

<T> Gatherer<T, ?, List<T>> windowMultiple(int... steps) {
    var remaining = steps.toList()
    Gatherer.ofSequential(
        () -> [],
        Gatherer.Integrator.of { window, element, downstream ->
            if (!remaining) {
                return false
            }
            window << element
            if (remaining[0] != -1) remaining[0]--
            if (remaining[0]) return true
            remaining.removeFirst()
            var result = List.copyOf(window)
            window.clear()
            downstream.push(result)
        },
        (window, downstream) -> {
            if (window) {
                var result = List.copyOf(window)
                downstream.push(result)
            }
        }
    )
}

Some points:

  • This is also an ordered algorithm, so we use ofSequential again.

  • This is similar to what we used for collate, but we have a different sized window for each chunk size as we process the elements.

  • Once we hit the last chunk, we don’t want to process further elements unless we see the special -1 marker, so we won’t create a greedy gatherer.

  • We do need a finisher to potentially output elements that have been stored but not yet pushed downstream.

We’d use windowMultiple like this:

assert (1..8).stream().gather(windowMultiple(3)).toList() ==
    [[1, 2, 3]]
assert (1..8).stream().gather(windowMultiple(3, 2, 1)).toList() ==
    [[1, 2, 3], [4, 5], [6]]
assert (1..8).stream().gather(windowMultiple(3, -1)).toList() ==
    [[1, 2, 3], [4, 5, 6, 7, 8]]

Inject, fold and scan

Groovy’s inject is a little different to the stream APIs reduce intermediate operator. The latter expects a binary operator which restricts the types of the elements being consumed and produced.

The inject method can have different types for its arguments as shown here:

assert (1..5).inject(''){ string, number -> string + number } == '12345'

The fold built-in gatherer allows us to write the equivalent functionality for stream processing as shown here:

assert (1..5).stream()
             .gather(fold(() -> '', (string, number) -> string + number))
             .findFirst()
             .get() == '12345'

Let’s look at another inject example. This time cumulative sum. If we have a sequence of numbers, the cumulative sum is another sequence whose value at any index is determined by accumulating all the numbers from the original sequence up to and including the index in question, e.g. the cumulative sum of [1, 2, 3, 4] is [1, 3, 6, 10].

This is again a good fit for Groovy’s inject:

assert (1..5).inject([]) { acc, next ->
    acc + [acc ? acc.last() + next : next]
} == [1, 3, 6, 10, 15]

Groovy has a number of alternatives to achieve this functionality. Here is one using inits:

assert (1..5).inits().grep().reverse()*.sum() == [1, 3, 6, 10, 15]

inits is a list processing function which we cover in more detail in the next section.

Before examining gatherer equivalents, we should note that this particular operation is deemed useful enough that Java actually has built-in library function for arrays:

Integer[] nums = 1..5
Arrays.parallelPrefix(nums, Integer::sum)
assert nums == [1, 3, 6, 10, 15]

Cumulative sum isn’t well suited to traditional streams, but now with gatherers, we can use the scan built-in gatherer to have similar functionality when processing streams:

assert (1..5).stream()
             .gather(scan(() -> 0, Integer::sum))
             .toList() == [1, 3, 6, 10, 15]

Testing for a subsequence (fun with inits and tails)

As a final example, let’s have a look at how we might test if one list is a subset of another.

We’ll start with a list of words, and a list containing the ordered search terms:

var words = 'the quick brown fox jumped over the lazy dog'.split().toList()
var search = 'brown fox'.split().toList()

It turns out that this is solved already in the JDK for collections:

assert Collections.indexOfSubList(words, search) != -1

Let’s have a look at some possible stream implementations. But first a diversion. For any functional programmers who might have dabbled with Haskell, you may have seen the book Learn You a Haskell for Great Good!. It sets an interesting exercise for finding a "Needle in the Haystack" using inits and tails. So what are inits and tails? They are built-in functions in Haskell and Groovy:

assert (1..6).inits() == [[1, 2, 3, 4, 5, 6],
                          [1, 2, 3, 4, 5],
                          [1, 2, 3, 4],
                          [1, 2, 3],
                          [1, 2],
                          [1],
                          []]

assert (1..6).tails() == [[1, 2, 3, 4, 5, 6],
                             [2, 3, 4, 5, 6],
                                [3, 4, 5, 6],
                                   [4, 5, 6],
                                      [5, 6],
                                         [6],
                                          []]

Once we know about these methods, we can paraphrase the "Needle in the Haystack" solution for collections in Groovy as follows:

var found = words.tails().any{ subseq -> subseq.inits().contains(search) }
assert found

It may not be the most efficient implementation of this functionality, but it has a nice symmetry. Let’s now explore some stream-based solutions.

We can start off with a tails gatherer:

<T> Gatherer<T, ?, List<T>> tails() {
    Gatherer.ofSequential(
        () -> [],
        Gatherer.Integrator.ofGreedy { state, element, downstream ->
            state << element
            return true
        },
        (state, downstream) -> {
            state.tails().each(downstream::push)
        }
    )
}

In the integrator, we just store away all the elements, and in the finisher we do all the work. This works but isn’t really properly leveraging the stream pipeline nature.

We can check it works as follows:

assert search.stream().gather(tails()).toList() ==
    [['brown', 'fox'], ['fox'], []]

We could continue with this approach to create an initsOfTails gatherer:

<T> Gatherer<T, ?, List<T>> initsOfTails() {
    Gatherer.ofSequential(
        () -> [],
        Gatherer.Integrator.ofGreedy { state, element, downstream ->
            state << element
            return true
        },
        (state, downstream) -> {
            state.tails()*.inits().sum().each(downstream::push)
        }
    )
}

Again, all the work is in the finisher, and we haven’t really made use of the power of the stream pipeline.

It still works of course:

assert words.stream().gather(initsOfTails()).anyMatch { it == search }

But it might have been more efficient to have collected the stream as a list and used Groovy’s built-in inits and tails on that.

But all is not lost. If we are willing to tweak the algorithm slightly, we could make better use of the stream pipeline. For example, if we don’t mind getting the inits results in the reverse order, we could define the following gatherer for inits:

<T> Gatherer<T, ?, List<T>> inits() {
    Gatherer.ofSequential(
        () -> [],
        Gatherer.Integrator.ofGreedy { state, element, downstream ->
            downstream.push(List.copyOf(state))
            state << element
            return true
        },
        (state, downstream) -> {
            downstream.push(state)
        }
    )
}

Which we’d use like this:

assert search.stream().gather(inits()).toList() ==
    [[], ['brown'], ['brown', 'fox']]

Conclusion

It is great that Groovy has a rich set of methods that work with collections. Some of these methods have stream equivalents, and now we see that using gatherers with Groovy, we can emulate more of the methods. Not all algorithms need or benefit from using streams, but it’s great to know that with gatherers, we can likely pick whichever style makes sense.

We are still in the early days of gatherers being available, so expect much more to emerge as this feature becomes more mainstream. We look forward to it advancing past preview status.

Update history

18/Jan/2024: Updated with a scan/cumulative sum example.