Fanning out across a Fleet with Apache MINA SSHD and Groovy™ 6

Author: Paul King

Published: 2026-05-07 12:00PM


In Part 1 we built a streaming ticker hub on Apache MINA, refactored around Groovy 6’s async toolkit. The data source was a stub. Time to wire up a real fleet of broker boxes via Apache MINA SSHD and let Groovy 6’s parallel collections do the fan-out.

In the previous post we built a streaming ticker hub on Apache MINA and refactored its IoHandler around Groovy 6’s async/await, AsyncChannel, BroadcastChannel, @ActiveObject, and AsyncScope. The last loose end was honest: the prices came from a synthetic FakeTickSource running locally.

This post wires the hub up to a real-feeling source: a small fleet of "broker boxes" that each speak SSH. Every tick interval, we fan out across the fleet in parallel, ask each box for the latest price for every symbol, aggregate the responses, and publish to the same BroadcastChannel-backed registry from Part 1. The hub does not change at all — the new feed is a drop-in replacement.

Three Groovy 6 features carry this episode:

  • Apache MINA SSHD as both server (the broker boxes) and client (the fleet driver)

  • Parallel collection methods — collectParallel here, and a quick tour of its siblings

  • Pool.virtual() and ParallelScope.withPool { } for binding the right execution pool to the right kind of work

Examples in this post use Apache MINA SSHD 2.17.1 alongside the same MINA 2.2.6, Apache Commons RNG Simple 1.7, and Groovy 6 on JDK 21+ from Part 1. The full source is available on GitHub (the new files sit alongside Part 1’s, so you can run either entry point).

What we are building

Conceptually, one tick of the new feed looks like this:

hub                       ParallelScope            broker-1, broker-2, broker-3
 │                              │                          │
 ├─FleetTickSource (async)─────►│                          │
 │                              ├─collectParallel──────────┼─►quote AAPL,GOOG,MSFT,TSLA
 │                              │                          │   (one virtual thread per host)
 │                              │◄──── per-host quotes ────┤
 │                              │                          │
 │◄──aggregate (mean) + publish─┤                          │
 │                                                         │
 ├─BroadcastChannel.send  → all subscribed sessions        │

Each broker box is an Apache MINA SSHD server running locally on its own port, backed by a custom Command implementation that handles a single shell command: quote SYM1,SYM2,SYM3. It returns SYM PRICE lines (one per symbol) and exits.

A 60-second tour of Apache MINA SSHD

Apache MINA SSHD is a full SSH stack — both server and client, plus SFTP, SCP and port forwarding — built on the same NIO foundations as MINA itself. For our purposes there are only a handful of moving parts.

Server side (BrokerBoxServer):

  • SshServer.setUpDefaultServer() — a server pre-configured with sensible ciphers, MACs, key-exchange algorithms, and signatures.

  • SimpleGeneratorHostKeyProvider — generates an in-memory host keypair on first use; no files written.

  • AcceptAllPasswordAuthenticator.INSTANCE — accepts any password (we are demonstrating, not shipping).

  • CommandFactory — given a command line, returns a Command whose start(channel, env) runs the work. The framework supplies it with stdin/stdout/stderr streams and an ExitCallback.

Client side (Fleet):

  • SshClient.setUpDefaultClient() — the client mirror of the server.

  • AcceptAllServerKeyVerifier.INSTANCE — skip host-key verification (again, demonstration).

  • client.connect(user, host, port).verify(timeout).session — open a session.

  • session.addPasswordIdentity(…​) then session.auth().verify(timeout) — log in.

  • session.createExecChannel("quote AAPL") — run a non-interactive command. Set an output stream, open().verify(timeout), then waitFor(EnumSet.of(ClientChannelEvent.CLOSED), timeout).

That is the entire SSHD surface area for this post. Now the fun part: making each round-trip happen across the whole fleet at once.

The broker box

BrokerBoxServer boots an SSHD server with a one-line CommandFactory. The custom Command holds the wire streams the framework injects into it, parses the requested command on start, and writes one SYM PRICE line per requested symbol:

class BrokerBoxServer {
    String name
    int    port
    Map<String, BigDecimal> basePrices

    void start() {
        sshd = SshServer.setUpDefaultServer()
        sshd.host = '127.0.0.1'
        sshd.port = port
        sshd.keyPairProvider     = new SimpleGeneratorHostKeyProvider()
        sshd.passwordAuthenticator = AcceptAllPasswordAuthenticator.INSTANCE
        sshd.commandFactory = { ChannelSession ch, String line ->
            new QuoteCommand(commandLine: line, basePrices: basePrices, rng: rng)
        } as CommandFactory
        sshd.start()
    }

    private static class QuoteCommand implements Command {
        String commandLine
        Map<String, BigDecimal> basePrices
        UniformRandomProvider rng
        OutputStream out, err
        ExitCallback callback
        // ... setOutputStream / setErrorStream / setExitCallback ...

        @Override
        void start(ChannelSession channel, Environment env) {
            async {
                var parts = commandLine.trim().split(/\s+/, 2)
                if (parts[0] != 'quote' || parts.length < 2) {
                    err.write("usage: quote SYM1,SYM2,...\n".getBytes('UTF-8'))
                    callback.onExit(2); return
                }
                parts[1].split(',')*.trim().each { sym ->
                    var base = basePrices[sym]
                    if (base == null) return
                    var jitter = (rng.nextInt(40) - 20) / 100.0
                    var price = base + jitter
                    out.write("$sym $price\n".getBytes('UTF-8'))
                }
                out.flush()
                callback.onExit(0)
            }
        }
    }
}

Two small things worth noting:

  • The work happens inside async { …​ }, not on the SSHD framework thread that called start. SSHD expects start to return promptly so the framework can keep handling other sessions; spawning the work onto a virtual thread is the modern fix.

  • Each broker carries a small per-call price jitter, so different brokers report slightly different prices for the same symbol — giving us something interesting to aggregate.

The fleet — parallel fan-out via SSH

Here is the heart of Part 2:

class Fleet implements AutoCloseable {
    List<HostSpec> hosts
    private final SshClient client = SshClient.setUpDefaultClient()

    Fleet() {
        client.serverKeyVerifier = AcceptAllServerKeyVerifier.INSTANCE
        client.start()
    }

    Map<String, List<BigDecimal>> sweep(List<String> symbols) {
        // Pool.virtual() = one virtual thread per host. SSH I/O is overwhelmingly
        // blocking and parking, exactly what virtual threads were designed for.
        var perHost = ParallelScope.withPool(Pool.virtual()) { scope ->
            hosts.collectParallel { host ->
                try {
                    fetchQuotes(host, symbols)
                } catch (Throwable t) {
                    System.err.println "WARN ${host.name}: ${t.message}"
                    [:]
                }
            }
        }
        // Pivot List<Map<sym,price>> into Map<sym, List<price>>.
        symbols.collectEntries { sym ->
            [(sym): perHost*.get(sym)]
        }
    }

    private Map<String, BigDecimal> fetchQuotes(HostSpec host, List<String> symbols) {
        var session = client.connect(host.user, host.host, host.port)
                .verify(Duration.ofSeconds(2)).session
        try {
            session.addPasswordIdentity(host.password)
            session.auth().verify(Duration.ofSeconds(2))
            var out = new ByteArrayOutputStream()
            var channel = session.createExecChannel("quote ${symbols.join(',')}")
            try {
                channel.out = out
                channel.open().verify(Duration.ofSeconds(2))
                channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), Duration.ofSeconds(5))
            } finally {
                channel.close(false)
            }
            parseQuotes(out.toString('UTF-8'))
        } finally {
            session.close(false)
        }
    }
}

Three things deserve attention.

hosts.collectParallel { …​ }. Groovy 6 adds a family of parallel extension methods to Collection: collectParallel, findAllParallel, eachParallel, sumParallel, injectParallel, groupByParallel. They are direct parallels of collect, findAll, each, etc., but each iteration runs in its own task on a pool. We use collectParallel because every host returns a quote map and we need them all back. If we only cared about side effects (e.g., kicking off a deploy on every host) eachParallel would be the right choice; if we wanted to shed slow hosts, findAllParallel with a quick health probe would do.

ParallelScope.withPool(Pool.virtual()) { scope → …​ }. The pool factory (Pool.virtual(), Pool.io(), Pool.cpu(), Pool.fixed(n)) decides what kind of executor runs each parallel task. We bind it for the scope of the closure; inside, every parallel collection method picks up the bound pool. SSH I/O is blocking I/O, so Pool.virtual() is the right choice on JDK 21+. The full picture is below.

Per-host failure isolation. A try/catch around fetchQuotes returns an empty map on failure for that one host. The other hosts' results still come through, the pivot still works, and the broken host gets logged. Without this, a single hung or unauthenticated box would poison the whole sweep.

Picking the right pool

Groovy 6 ships four pool factories. The choice matters more than people think:

Factory When to use it

Pool.virtual()

Blocking I/O on JDK 21+. SSH, JDBC, blocking HTTP, file I/O. The carrier thread parks cheaply; you can have thousands of in-flight tasks for negligible cost. The default for async { } already chooses this on JDK 21+, so ParallelScope.withPool(Pool.virtual()) just makes it explicit for parallel collections.

Pool.io()

Blocking I/O on JDK 17—​20, where virtual threads aren’t available. A bounded cached pool sized to handle moderate I/O concurrency without exhausting OS threads.

Pool.cpu()

CPU-bound work that wants nCores parallelism: image transforms, encoding, parsing hot paths, anything that pegs cores. Exceeding nCores here just adds context-switch cost.

Pool.fixed(n)

When you need a strict cap (Pool.fixed(8) to throttle external load on a downstream service, for instance), or when the default sizing doesn’t fit.

For our SSH fleet sweep, Pool.virtual() is the obvious pick. Each host fetch spends most of its life parked in verify(…​) or waitFor(…​); we never want to limit ourselves to nCores of those.

Wiring it into the hub

Here is the entire change to wire the fleet feed into Part 1’s MINA hub. Compare it with the equivalent block in Part 1’s TickerServer.groovy:

// Three brokers, each on its own ephemeral port
var brokers = (1..3).collect { i ->
    new BrokerBoxServer(name: "broker-$i", port: freePort(), basePrices: basePrices).tap { start() }
}
var hosts = brokers.collect { b ->
    new HostSpec(name: b.name, host: '127.0.0.1', port: b.port, user: 'demo', password: 'demo')
}

var fleet    = new Fleet(hosts: hosts)
var registry = new TickerRegistry()
var feed     = new FleetTickSource(registry: registry, fleet: fleet, symbols: symbols, intervalMillis: 500)
feed.start()

// ----- everything below this line is unchanged from Part 1 -----
var acceptor = new NioSocketAcceptor()
acceptor.filterChain.with {
    addLast 'codec',  new ProtocolCodecFilter(new TextLineCodecFactory(StandardCharsets.UTF_8))
    addLast 'logger', new LoggingFilter()
}
acceptor.handler = new TickerHandler(registry: registry)
acceptor.bind(new InetSocketAddress(0))

FleetTickSource is the only new class wired in, and it has the same shape as FakeTickSource from Part 1: same start() and stop(), same job of publishing to the registry. The whole protocol stack — TickerHandler, TickerProtocol, TickerRegistry, the per-session AsyncChannel and per-symbol BroadcastChannel — runs unchanged.

FleetTickSource itself is small:

class FleetTickSource {
    TickerRegistry registry
    Fleet fleet
    List<String> symbols
    long intervalMillis = 1000
    private final running = new AtomicBoolean(false)

    void start() {
        running.set(true)
        async {
            while (running.get()) {
                var quotes = fleet.sweep(symbols)
                quotes.each { sym, prices ->
                    if (prices) {
                        var mean = prices.sum() / prices.size()
                        registry.publish(sym, mean)
                    }
                }
                sleep intervalMillis
            }
        }
    }

    void stop() { running.set(false) }
}

Each tick, sweep the fleet, average the per-broker prices, publish. The whole feed is a seven-line async { } loop — and inside that one fleet.sweep(…​) call lurks a fan-out across every host on a virtual thread per host.

Trying it

The repository ships a FleetTickerServer.groovy script alongside Part 1’s TickerServer that boots three SSHD broker servers on ephemeral ports, builds the fleet, runs the same in-process socket client demo, and asserts the output. A typical run prints something like:

Started brokers on ports [54113, 54114, 54115]
Ticker hub (fleet-backed) listening on port 54116
--- received ---
SUBSCRIBED AAPL
SUBSCRIBED MSFT
TICK AAPL 173.42
TICK MSFT 412.05
TICK AAPL 173.39
TICK MSFT 412.13
TICK AAPL 173.45
QUOTE AAPL 173.45
QUOTE GOOG 198.18
QUOTE MSFT 412.13
QUOTE TSLA 215.55
UNSUBSCRIBED AAPL
TICK MSFT 412.10
BYE
OK

Three things changed compared to Part 1’s run:

  • The startup banner mentions broker ports.

  • The TICK and QUOTE prices are now consensus values across three brokers, not a single RNG walk — they cluster more tightly around the base price because the per-broker jitter averages out.

  • The QUOTE reply for GOOG returns a real number instead of NA: even though no client ever subscribed to it, the fleet sweep populates the last-tick cache for every symbol on every cycle.

Recap

Part 2 reused everything we built in Part 1 — the IoHandler shuttle, the for await protocol loop, the @ActiveObject registry, the BroadcastChannel`s, the `AsyncScope fan-out for QUOTE — and slotted in a fleet-backed feed in front of it. The producer side is now real; the consumer side did not move.

Each new Groovy 6 feature earned its place:

  • collectParallel (and its siblings eachParallel, findAllParallel, sumParallel, injectParallel, groupByParallel) gives the fan-out an obvious shape: it is a collect, just done in parallel.

  • ParallelScope.withPool { } binds a pool for a block; every parallel collection method inside that block uses it.

  • Pool.virtual() picks the right executor for blocking I/O on JDK 21+. The same code falls back to Pool.io() semantics on JDK 17—​20 if you swap the factory.

And we added a third Apache project to the mix: Apache MINA SSHD as both server (the broker boxes) and client (the fleet driver), with the entire wire-level dance hidden behind session.createExecChannel(cmd) and a few verify(…​) calls.

Coming up

Two threads naturally hang off the end of this post:

  • Async file I/O on Path. textAsync, bytesAsync, writeAsync, and writeBytesAsync make per-broker log capture a one-liner; we deliberately kept the sweep clean here, but a follow-up could thread that in.

  • A real CLI shell on top of the broker. SSHD also supports setShellFactory for an interactive shell. Pairing that with Groovy’s async/await and its new groovy-http-builder would let a broker proxy real-time market data from a backend HTTP source. That is its own post.

Conclusion

The producer side of a streaming system is often where the real-world messiness lives: multiple data sources, blocking network calls, partial failures, aggregation. Groovy 6’s parallel collection methods, pool factories, and ParallelScope — combined with Apache MINA SSHD as a first-class SSH client — make that messiness fit into a handful of small, readable classes. And because the producer is the only thing that changed from Part 1, the hub, the protocol, the channels, and every async primitive we built last time slot in unmodified.

Update history

07/May/2026: Initial version.