Fanning out across a Fleet with Apache MINA SSHD and Groovy™ 6
Published: 2026-05-07 12:00PM
|
In Part 1 we built a streaming ticker hub on Apache MINA, refactored around Groovy 6’s async toolkit. The data source was a stub. Time to wire up a real fleet of broker boxes via Apache MINA SSHD and let Groovy 6’s parallel collections do the fan-out. |
In the previous post we built a streaming ticker hub on
Apache MINA and refactored its IoHandler around Groovy 6’s
async/await, AsyncChannel, BroadcastChannel, @ActiveObject, and AsyncScope. The
last loose end was honest: the prices came from a synthetic FakeTickSource running locally.
This post wires the hub up to a real-feeling source: a small fleet of "broker boxes" that
each speak SSH. Every tick interval, we fan out across the fleet in parallel, ask each box
for the latest price for every symbol, aggregate the responses, and publish to the same
BroadcastChannel-backed registry from Part 1. The hub does not change at all — the new
feed is a drop-in replacement.
Three Groovy 6 features carry this episode:
-
Apache MINA SSHD as both server (the broker boxes) and client (the fleet driver)
-
Parallel collection methods —
collectParallelhere, and a quick tour of its siblings -
Pool.virtual()andParallelScope.withPool { }for binding the right execution pool to the right kind of work
Examples in this post use Apache MINA SSHD 2.17.1 alongside the same MINA 2.2.6, Apache Commons RNG Simple 1.7, and Groovy 6 on JDK 21+ from Part 1. The full source is available on GitHub (the new files sit alongside Part 1’s, so you can run either entry point).
What we are building
Conceptually, one tick of the new feed looks like this:
hub ParallelScope broker-1, broker-2, broker-3 │ │ │ ├─FleetTickSource (async)─────►│ │ │ ├─collectParallel──────────┼─►quote AAPL,GOOG,MSFT,TSLA │ │ │ (one virtual thread per host) │ │◄──── per-host quotes ────┤ │ │ │ │◄──aggregate (mean) + publish─┤ │ │ │ ├─BroadcastChannel.send → all subscribed sessions │
Each broker box is an Apache MINA SSHD server running locally on its own port, backed by a
custom Command implementation that handles a single shell command:
quote SYM1,SYM2,SYM3. It returns SYM PRICE lines (one per symbol) and exits.
A 60-second tour of Apache MINA SSHD
Apache MINA SSHD is a full SSH stack — both server and client, plus SFTP, SCP and port forwarding — built on the same NIO foundations as MINA itself. For our purposes there are only a handful of moving parts.
Server side (BrokerBoxServer):
-
SshServer.setUpDefaultServer()— a server pre-configured with sensible ciphers, MACs, key-exchange algorithms, and signatures. -
SimpleGeneratorHostKeyProvider— generates an in-memory host keypair on first use; no files written. -
AcceptAllPasswordAuthenticator.INSTANCE— accepts any password (we are demonstrating, not shipping). -
CommandFactory— given a command line, returns aCommandwhosestart(channel, env)runs the work. The framework supplies it with stdin/stdout/stderr streams and anExitCallback.
Client side (Fleet):
-
SshClient.setUpDefaultClient()— the client mirror of the server. -
AcceptAllServerKeyVerifier.INSTANCE— skip host-key verification (again, demonstration). -
client.connect(user, host, port).verify(timeout).session— open a session. -
session.addPasswordIdentity(…)thensession.auth().verify(timeout)— log in. -
session.createExecChannel("quote AAPL")— run a non-interactive command. Set an output stream,open().verify(timeout), thenwaitFor(EnumSet.of(ClientChannelEvent.CLOSED), timeout).
That is the entire SSHD surface area for this post. Now the fun part: making each round-trip happen across the whole fleet at once.
The broker box
BrokerBoxServer boots an SSHD server with a one-line CommandFactory. The custom Command
holds the wire streams the framework injects into it, parses the requested command on start,
and writes one SYM PRICE line per requested symbol:
class BrokerBoxServer {
String name
int port
Map<String, BigDecimal> basePrices
void start() {
sshd = SshServer.setUpDefaultServer()
sshd.host = '127.0.0.1'
sshd.port = port
sshd.keyPairProvider = new SimpleGeneratorHostKeyProvider()
sshd.passwordAuthenticator = AcceptAllPasswordAuthenticator.INSTANCE
sshd.commandFactory = { ChannelSession ch, String line ->
new QuoteCommand(commandLine: line, basePrices: basePrices, rng: rng)
} as CommandFactory
sshd.start()
}
private static class QuoteCommand implements Command {
String commandLine
Map<String, BigDecimal> basePrices
UniformRandomProvider rng
OutputStream out, err
ExitCallback callback
// ... setOutputStream / setErrorStream / setExitCallback ...
@Override
void start(ChannelSession channel, Environment env) {
async {
var parts = commandLine.trim().split(/\s+/, 2)
if (parts[0] != 'quote' || parts.length < 2) {
err.write("usage: quote SYM1,SYM2,...\n".getBytes('UTF-8'))
callback.onExit(2); return
}
parts[1].split(',')*.trim().each { sym ->
var base = basePrices[sym]
if (base == null) return
var jitter = (rng.nextInt(40) - 20) / 100.0
var price = base + jitter
out.write("$sym $price\n".getBytes('UTF-8'))
}
out.flush()
callback.onExit(0)
}
}
}
}
Two small things worth noting:
-
The work happens inside
async { … }, not on the SSHD framework thread that calledstart. SSHD expectsstartto return promptly so the framework can keep handling other sessions; spawning the work onto a virtual thread is the modern fix. -
Each broker carries a small per-call price jitter, so different brokers report slightly different prices for the same symbol — giving us something interesting to aggregate.
The fleet — parallel fan-out via SSH
Here is the heart of Part 2:
class Fleet implements AutoCloseable {
List<HostSpec> hosts
private final SshClient client = SshClient.setUpDefaultClient()
Fleet() {
client.serverKeyVerifier = AcceptAllServerKeyVerifier.INSTANCE
client.start()
}
Map<String, List<BigDecimal>> sweep(List<String> symbols) {
// Pool.virtual() = one virtual thread per host. SSH I/O is overwhelmingly
// blocking and parking, exactly what virtual threads were designed for.
var perHost = ParallelScope.withPool(Pool.virtual()) { scope ->
hosts.collectParallel { host ->
try {
fetchQuotes(host, symbols)
} catch (Throwable t) {
System.err.println "WARN ${host.name}: ${t.message}"
[:]
}
}
}
// Pivot List<Map<sym,price>> into Map<sym, List<price>>.
symbols.collectEntries { sym ->
[(sym): perHost*.get(sym)]
}
}
private Map<String, BigDecimal> fetchQuotes(HostSpec host, List<String> symbols) {
var session = client.connect(host.user, host.host, host.port)
.verify(Duration.ofSeconds(2)).session
try {
session.addPasswordIdentity(host.password)
session.auth().verify(Duration.ofSeconds(2))
var out = new ByteArrayOutputStream()
var channel = session.createExecChannel("quote ${symbols.join(',')}")
try {
channel.out = out
channel.open().verify(Duration.ofSeconds(2))
channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), Duration.ofSeconds(5))
} finally {
channel.close(false)
}
parseQuotes(out.toString('UTF-8'))
} finally {
session.close(false)
}
}
}
Three things deserve attention.
hosts.collectParallel { … }. Groovy 6 adds a family of parallel extension methods to
Collection: collectParallel, findAllParallel, eachParallel, sumParallel,
injectParallel, groupByParallel. They are direct parallels of collect, findAll,
each, etc., but each iteration runs in its own task on a pool. We use collectParallel
because every host returns a quote map and we need them all back. If we only cared about
side effects (e.g., kicking off a deploy on every host) eachParallel would be the right
choice; if we wanted to shed slow hosts, findAllParallel with a quick health probe
would do.
ParallelScope.withPool(Pool.virtual()) { scope → … }. The pool factory
(Pool.virtual(), Pool.io(), Pool.cpu(), Pool.fixed(n)) decides what kind of executor
runs each parallel task. We bind it for the scope of the closure; inside, every parallel
collection method picks up the bound pool. SSH I/O is blocking I/O, so Pool.virtual() is
the right choice on JDK 21+. The full picture is below.
Per-host failure isolation. A try/catch around fetchQuotes returns an empty map on
failure for that one host. The other hosts' results still come through, the pivot still
works, and the broken host gets logged. Without this, a single hung or unauthenticated box
would poison the whole sweep.
Picking the right pool
Groovy 6 ships four pool factories. The choice matters more than people think:
| Factory | When to use it |
|---|---|
|
Blocking I/O on JDK 21+. SSH, JDBC, blocking HTTP, file I/O. The carrier thread parks
cheaply; you can have thousands of in-flight tasks for negligible cost. The default for
|
|
Blocking I/O on JDK 17—20, where virtual threads aren’t available. A bounded cached pool sized to handle moderate I/O concurrency without exhausting OS threads. |
|
CPU-bound work that wants |
|
When you need a strict cap ( |
For our SSH fleet sweep, Pool.virtual() is the obvious pick. Each host fetch spends most of
its life parked in verify(…) or waitFor(…); we never want to limit ourselves to
nCores of those.
Wiring it into the hub
Here is the entire change to wire the fleet feed into Part 1’s MINA hub. Compare it with
the equivalent block in Part 1’s TickerServer.groovy:
// Three brokers, each on its own ephemeral port
var brokers = (1..3).collect { i ->
new BrokerBoxServer(name: "broker-$i", port: freePort(), basePrices: basePrices).tap { start() }
}
var hosts = brokers.collect { b ->
new HostSpec(name: b.name, host: '127.0.0.1', port: b.port, user: 'demo', password: 'demo')
}
var fleet = new Fleet(hosts: hosts)
var registry = new TickerRegistry()
var feed = new FleetTickSource(registry: registry, fleet: fleet, symbols: symbols, intervalMillis: 500)
feed.start()
// ----- everything below this line is unchanged from Part 1 -----
var acceptor = new NioSocketAcceptor()
acceptor.filterChain.with {
addLast 'codec', new ProtocolCodecFilter(new TextLineCodecFactory(StandardCharsets.UTF_8))
addLast 'logger', new LoggingFilter()
}
acceptor.handler = new TickerHandler(registry: registry)
acceptor.bind(new InetSocketAddress(0))
FleetTickSource is the only new class wired in, and it has the same shape as
FakeTickSource from Part 1: same start() and stop(), same job of publishing to the
registry. The whole protocol stack — TickerHandler, TickerProtocol, TickerRegistry,
the per-session AsyncChannel and per-symbol BroadcastChannel — runs unchanged.
FleetTickSource itself is small:
class FleetTickSource {
TickerRegistry registry
Fleet fleet
List<String> symbols
long intervalMillis = 1000
private final running = new AtomicBoolean(false)
void start() {
running.set(true)
async {
while (running.get()) {
var quotes = fleet.sweep(symbols)
quotes.each { sym, prices ->
if (prices) {
var mean = prices.sum() / prices.size()
registry.publish(sym, mean)
}
}
sleep intervalMillis
}
}
}
void stop() { running.set(false) }
}
Each tick, sweep the fleet, average the per-broker prices, publish. The whole feed is a
seven-line async { } loop — and inside that one fleet.sweep(…) call lurks a
fan-out across every host on a virtual thread per host.
Trying it
The repository ships a FleetTickerServer.groovy script alongside Part 1’s TickerServer
that boots three SSHD broker servers on ephemeral ports, builds the fleet, runs the same
in-process socket client demo, and asserts the output. A typical run prints something like:
Started brokers on ports [54113, 54114, 54115] Ticker hub (fleet-backed) listening on port 54116 --- received --- SUBSCRIBED AAPL SUBSCRIBED MSFT TICK AAPL 173.42 TICK MSFT 412.05 TICK AAPL 173.39 TICK MSFT 412.13 TICK AAPL 173.45 QUOTE AAPL 173.45 QUOTE GOOG 198.18 QUOTE MSFT 412.13 QUOTE TSLA 215.55 UNSUBSCRIBED AAPL TICK MSFT 412.10 BYE OK
Three things changed compared to Part 1’s run:
-
The startup banner mentions broker ports.
-
The TICK and QUOTE prices are now consensus values across three brokers, not a single RNG walk — they cluster more tightly around the base price because the per-broker jitter averages out.
-
The
QUOTEreply forGOOGreturns a real number instead ofNA: even though no client ever subscribed to it, the fleet sweep populates the last-tick cache for every symbol on every cycle.
Recap
Part 2 reused everything we built in Part 1 — the IoHandler shuttle, the for await
protocol loop, the @ActiveObject registry, the BroadcastChannel`s, the `AsyncScope
fan-out for QUOTE — and slotted in a fleet-backed feed in front of it. The producer side
is now real; the consumer side did not move.
Each new Groovy 6 feature earned its place:
-
collectParallel(and its siblingseachParallel,findAllParallel,sumParallel,injectParallel,groupByParallel) gives the fan-out an obvious shape: it is acollect, just done in parallel. -
ParallelScope.withPool { }binds a pool for a block; every parallel collection method inside that block uses it. -
Pool.virtual()picks the right executor for blocking I/O on JDK 21+. The same code falls back toPool.io()semantics on JDK 17—20 if you swap the factory.
And we added a third Apache project to the mix: Apache MINA SSHD as both server (the
broker boxes) and client (the fleet driver), with the entire wire-level dance hidden behind
session.createExecChannel(cmd) and a few verify(…) calls.
Coming up
Two threads naturally hang off the end of this post:
-
Async file I/O on
Path.textAsync,bytesAsync,writeAsync, andwriteBytesAsyncmake per-broker log capture a one-liner; we deliberately kept the sweep clean here, but a follow-up could thread that in. -
A real CLI shell on top of the broker. SSHD also supports
setShellFactoryfor an interactive shell. Pairing that with Groovy’sasync/awaitand its newgroovy-http-builderwould let a broker proxy real-time market data from a backend HTTP source. That is its own post.
Further information
Conclusion
The producer side of a streaming system is often where the real-world messiness lives:
multiple data sources, blocking network calls, partial failures, aggregation. Groovy 6’s
parallel collection methods, pool factories, and ParallelScope — combined with Apache
MINA SSHD as a first-class SSH client — make that messiness fit into a handful of small,
readable classes. And because the producer is the only thing that changed from Part 1, the
hub, the protocol, the channels, and every async primitive we built last time slot in
unmodified.