Streaming Servers with Apache MINA and Groovy™ 6
Published: 2026-04-30 12:00PM
|
Apache MINA gives us a battle-tested asynchronous network framework with a callback-driven |
Apache MINA is "a network application framework which helps users develop high
performance and high scalability network applications easily". It wraps Java NIO behind a small set of
event-driven abstractions: an acceptor, a filter chain (codecs, logging, SSL), a per-connection session,
and an IoHandler you implement to react to lifecycle and message events.
That callback shape is the same shape that motivated async/await to exist in the first place.
Once a handler needs to do anything beyond echo bytes — talk to a database, fan out to several
backends, broadcast to other sessions — the linear flow disappears into a tangle of synchronized
sets, futures, and follow-on callbacks.
Groovy 6 adds a unified concurrency toolkit that lets us refactor those handlers into top-to-bottom async code:
-
Native
async { }/awaitrunning on virtual threads (JDK 21+) -
AsyncChannelandBroadcastChannelfor Go-style and pub/sub communication -
for awaitfor iterating async sources (channels, generators, reactive streams) -
@ActiveObject/@ActiveMethodfor race-free shared state with ordinary class syntax -
AsyncScopefor structured concurrency
In this post we put them to work together. We will build a small streaming "ticker hub":
clients subscribe to symbols and receive a stream of price ticks; they can also issue a one-shot
multi-symbol QUOTE that fans out concurrently. The result is a MINA server whose entire protocol
fits in one method that you can read straight through.
Examples in this post use the freshly released Apache MINA 2.2.6 and Apache Commons RNG Simple 1.7, together with Groovy 6 on JDK 21+. The full source is available on GitHub.
What we are building
A simple line-based protocol over TCP:
SUBSCRIBE AAPL,GOOG -> server starts pushing TICK lines for those symbols UNSUBSCRIBE GOOG -> server stops pushing for that symbol QUOTE AAPL,MSFT,TSLA -> server returns one snapshot line per symbol concurrently QUIT -> server replies BYE and closes the connection
Server pushes look like:
SUBSCRIBED AAPL TICK AAPL 173.42 TICK AAPL 174.10 QUOTE AAPL 174.10 QUOTE MSFT 412.55 BYE
The ticks themselves are produced by a stub FakeTickSource for now. In a follow-up post we will
replace that stub with real data fetched in parallel from a fleet of sources — but the protocol,
broadcast machinery, and per-session async logic will not need to change.
A 60-second tour of MINA
A MINA server only needs five concepts:
-
NioSocketAcceptor— the TCP listener (UDP, in-VM and serial transports also exist). -
The filter chain — a pipeline through which bytes flow in both directions. We add three: a codec filter (turns bytes into decoded messages), a logging filter, and — in production — an
SslFilter. -
A
ProtocolCodecFilterwith aTextLineCodecFactory— turns the byte stream into newline-delimitedStringmessages and back. Our handler will see `String`s, not raw `IoBuffer`s. -
IoHandlerAdapter— the extension point. Override the lifecycle callbacks you care about:sessionOpened,messageReceived,messageSent,sessionIdle,sessionClosed,exceptionCaught. -
IoSession— one per connected client. Usesession.write(obj)to send (the codec encodes for you),session.setAttribute(key, value)for per-connection scratch state, andsession.closeOnFlush()to disconnect.
The entire server boot is about ten lines:
var acceptor = new NioSocketAcceptor()
acceptor.filterChain.with {
addLast 'codec', new ProtocolCodecFilter(new TextLineCodecFactory(StandardCharsets.UTF_8))
addLast 'logger', new LoggingFilter()
}
acceptor.handler = new TickerHandler(registry: registry)
acceptor.bind(new InetSocketAddress(0)) // ephemeral port
So far, so MINA. The interesting bit is what we put inside TickerHandler.
The classic shape, and why we want a different one
A MINA IoHandler traditionally does all of its work inside messageReceived. The
chat
example shipped with MINA is a good representative — it carries the entire chat protocol inside one
big switch over the parsed command, with shared state held in synchronized sets:
public class ChatProtocolHandler extends IoHandlerAdapter {
private final Set<IoSession> sessions = Collections.synchronizedSet(new HashSet<>());
private final Set<String> users = Collections.synchronizedSet(new HashSet<>());
@Override
public void messageReceived(IoSession session, Object message) {
String[] result = ((String) message).split(" ", 2);
switch (ChatCommand.valueOf(result[0]).toInt()) {
case ChatCommand.LOGIN:
/* register session, broadcast join, ... */
break;
case ChatCommand.BROADCAST:
/* iterate sessions, write to each ... */
break;
case ChatCommand.QUIT:
session.write("QUIT OK");
session.closeNow();
break;
}
}
public void broadcast(String message) {
synchronized (sessions) {
for (IoSession s : sessions) {
if (s.isConnected()) s.write("BROADCAST OK " + message);
}
}
}
}
That works, and for many years it was the right shape. But every concurrent feature — broadcast,
shared user list, session lifecycle — ends up either as synchronized collections or as more
callbacks. Adding a downstream call (e.g., look up the user’s permissions over HTTP) means
either blocking a precious NIO thread or chaining yet more callbacks.
We will refactor along a single principle: the handler does nothing except shuttle events into a
channel, and the actual protocol runs as one straight-line async task per session.
The bridging pattern
Here is the entire MINA-facing surface of our server:
class TickerHandler extends IoHandlerAdapter {
static final String INBOX = 'inbox'
TickerRegistry registry
@Override
void sessionOpened(IoSession session) {
var inbox = AsyncChannel.create(64) // bounded => back-pressure
session.setAttribute(INBOX, inbox)
async {
new TickerProtocol(session: session, inbox: inbox, registry: registry).run()
}
}
@Override
void messageReceived(IoSession session, message) {
session.getAttribute(INBOX).send(message.toString())
}
@Override
void sessionClosed(IoSession session) {
session.getAttribute(INBOX)?.close()
}
@Override
void exceptionCaught(IoSession session, Throwable cause) {
cause.printStackTrace()
session.getAttribute(INBOX)?.close()
session.closeNow()
}
}
That is everything we will ever write against IoHandler. The handler now contains zero
protocol logic. Each connected client gets a per-session AsyncChannel (a Go-style
buffered channel) and a single async { } task that drives the protocol from start to finish.
A few things worth noting:
-
AsyncChannel.create(64)is bounded. If the protocol task ever falls behind,sendwill exert back-pressure on MINA. We size the buffer generously enough that the NIO thread never blocks in normal operation. -
async { }launches the per-session task on a virtual thread (on JDK 21+). On JDK 17—20 it falls back to a cached thread pool. Either way, there is noExecutorFilterand no SEDA tuning: virtual threads handle the I/O parking for us. -
sessionClosedcloses the channel. Thefor awaitloop reading from it will then exit cleanly, and the per-session async task will return.
The protocol, top-to-bottom
With the bridging in place, here is the entire protocol — all the conversation logic that the bundled MINA chat handler scattered across callbacks now lives in one method:
void run() {
AsyncScope.withScope {
for (String line in inbox) {
var parts = line.trim().split(/\s+/, 2)
var cmd = parts[0].toUpperCase()
var args = parts.length > 1 ? parts[1] : ''
switch (cmd) {
case 'SUBSCRIBE' -> handleSubscribe(args)
case 'UNSUBSCRIBE' -> handleUnsubscribe(args)
case 'QUOTE' -> handleQuote(args)
case 'QUIT' -> session.with{ write('BYE'); closeOnFlush() }
default -> session.write("ERROR unknown command: $cmd")
}
}
}
}
A plain for loop consumes the inbox as it is filled by messageReceived — AsyncChannel
exposes itself as a blocking iterable, so the loop reads top-to-bottom and exits when the
channel is closed. The whole loop runs inside AsyncScope.withScope { … } so any
subscription tasks we launch underneath (see the next two sections) inherit a structured
lifetime: when the loop exits — on QUIT, on the client disconnecting, or on any exception — the scope guarantees every child task has finished or been cancelled before run() returns.
QUIT is one line: send BYE, ask MINA to close on flush, and the resulting sessionClosed
callback closes the inbox channel, which terminates the for loop. Compare that with calling
session.closeNow() from deep inside a callback chain.
Shared state with @ActiveObject
The price feeds and last-tick cache are shared across every session. The traditional answer
is Collections.synchronizedMap or some other mutex dance. Groovy 6’s @ActiveObject
turns ordinary class methods into messages on a serialised mailbox, so we get race-free state
without writing synchronized once:
@ActiveObject
class TickerRegistry {
private final Map<String, BroadcastChannel> feeds = [:]
private final Map<String, BigDecimal> latest = [:]
@ActiveMethod
Flow.Publisher<BigDecimal> subscribe(String symbol) {
feeds.computeIfAbsent(symbol) { BroadcastChannel.create() }.asPublisher()
}
@ActiveMethod
void publish(String symbol, BigDecimal price) {
latest[symbol] = price
feeds[symbol]?.send(price)
}
@ActiveMethod
BigDecimal lastTick(String symbol) {
latest[symbol]
}
}
@ActiveMethod calls are routed through the registry’s internal actor, so two threads can
freely call publish and subscribe at the same time without us thinking about locks.
BroadcastChannel is the other star of this section. Unlike a plain AsyncChannel (which
delivers each value to one consumer), a broadcast delivers every value to every subscriber.
Calling asPublisher() adapts it to the JDK java.util.concurrent.Flow.Publisher interface,
which for await consumes natively. The same plumbing works against any reactive stream — a Reactor Flux (via the groovy-reactor module) or an RxJava Observable (via groovy-rxjava)
would slot in unchanged.
|
Tip
|
Why |
Per-session subscription with for await
Each SUBSCRIBE spawns a small async task that consumes the broadcast for the requested
symbol and writes ticks to the session:
private void handleSubscribe(String args) {
symbolsOf(args).each { sym ->
if (subs.containsKey(sym)) return
var cancel = new AtomicBoolean(false)
subs[sym] = cancel
async {
for await (price in registry.subscribe(sym)) {
if (cancel.get()) break
session.write("TICK $sym $price")
}
}
session.write("SUBSCRIBED $sym")
}
}
private void handleUnsubscribe(String args) {
symbolsOf(args).each { sym ->
subs.remove(sym)?.set(true)
session.write("UNSUBSCRIBED $sym")
}
}
UNSUBSCRIBE flips an AtomicBoolean; the per-symbol loop notices on its next tick, breaks,
and for await cancels the underlying subscription on the way out.
Two pieces of the new toolkit are doing the heavy lifting here:
-
for await (price in registry.subscribe(sym))consumes the per-symbolFlow.Publisheras if it were a regular collection, while the underlying subscriber/back-pressure protocol is hidden. -
The bounded
AsyncChannelwe created for the inbox insessionOpenedprovides the back-pressure path between the network and our async tasks. If the client cannot drain ticks fast enough, MINA’s write queue eventually pushes back, the broadcast publisher slows its delivery, and nothing falls over.
Structured fan-out for QUOTE
QUOTE AAPL,GOOG,MSFT,TSLA asks for one snapshot per symbol. The natural shape is to fire all
the lookups in parallel and join on the results — which is exactly what AsyncScope is for:
private void handleQuote(String args) {
var syms = symbolsOf(args)
AsyncScope.withScope {
var lookups = syms.collect { sym -> async { [sym, registry.lastTick(sym)] } }
lookups.each { l ->
var (sym, price) = await(l)
session.write("QUOTE $sym ${price ?: 'NA'}")
}
}
}
The scope owns the lifetime of the per-symbol tasks. If any single lookup blows up, the others
are cancelled before the scope exits — no leaked tasks, no half-finished work, no manual
CompletableFuture.allOf plus exception-handling boilerplate. With virtual threads, fanning
out over hundreds of symbols costs essentially nothing.
A simulated tick source
For now the ticks come from a stub: one async task per symbol drips a randomly walked price into the registry every 200 ms.
class FakeTickSource {
TickerRegistry registry
List<String> symbols
long intervalMillis = 250
private final running = new AtomicBoolean(false)
private final rng = RandomSource.XO_RO_SHI_RO_128_PP.create(42L)
void start() {
running.set(true)
symbols.each { sym ->
var price = 50.0G + rng.nextInt(200)
async {
while (running.get()) {
var delta = (rng.nextInt(200) - 100) / 100.0
price = (price + delta).max(0.01g)
registry.publish(sym, price)
sleep intervalMillis
}
}
}
}
void stop() { running.set(false) }
}
We pulled the random number generator from
Apache Commons RNG Simple 1.7 (also a fresh
release): UniformRandomProvider is a drop-in replacement for java.util.Random with the same
nextInt(n) shape, but backed by faster, statistically stronger algorithms — in this case the
recommended general-purpose XO_RO_SHI_RO_128_PP.
In a follow-up post we will replace this stub with real prices fetched in parallel from a fleet
of sources — using Apache MINA SSHD as a client, parallel collections, and Pool.virtual().
The consumer side — everything we built above — does not change at all.
Trying it
The repository ships a TickerServer.groovy script that boots the acceptor on an ephemeral
port, opens an in-process socket client, exercises the protocol, and asserts the output.
A typical run prints something like:
Ticker hub listening on port 56432 --- received --- SUBSCRIBED AAPL SUBSCRIBED MSFT TICK AAPL 173.42 TICK MSFT 412.10 TICK AAPL 174.18 TICK MSFT 411.80 TICK AAPL 173.95 QUOTE AAPL 173.95 QUOTE GOOG NA QUOTE MSFT 411.80 QUOTE TSLA 215.55 UNSUBSCRIBED AAPL TICK MSFT 412.45 BYE OK
You can also try it interactively. Start the server, open another terminal, and use nc:
$ nc localhost 9999
SUBSCRIBE AAPL,MSFT
SUBSCRIBED AAPL
SUBSCRIBED MSFT
TICK AAPL 173.42
TICK MSFT 412.10
...
QUOTE GOOG,TSLA
QUOTE GOOG 198.20
QUOTE TSLA 215.55
UNSUBSCRIBE AAPL
UNSUBSCRIBED AAPL
QUIT
BYE
Recap
We started with the standard MINA pattern — a messageReceived callback containing the
whole protocol switch, plus synchronised collections for shared state — and ended up with
one ten-line IoHandler whose only job is to forward inbound messages into a channel,
plus a single runProtocol method that reads top-to-bottom.
Each Groovy 6 feature earned its place:
-
async { }andfor awaitflatten the per-session loop, with virtual threads doing the parking on blocking I/O. -
AsyncChannelbridges between MINA’s callback-driven world and the linear protocol loop, with bounded buffers providing natural back-pressure. -
BroadcastChanneland itsasPublisher()view give us pub/sub broadcast for free, withfor awaitconsuming reactive streams as if they were collections. -
@ActiveObjectmakes shared state race-free by routing methods through a serialised mailbox, without a singlesynchronizedkeyword. -
AsyncScopeprovides structured lifetimes — both for the protocol task itself and for the parallelQUOTEfan-out.
Coming up
Our hub trusts whoever feeds it. Next time, we will wire a fleet of broker boxes
to feed the BroadcastChannel`s we built today — using Apache MINA SSHD as a client,
parallel collection methods (`collectParallel, eachParallel), Pool.virtual() for the
I/O pool, and async file I/O on Path for log capture. The producer side gets all the
attention; the consumer side — everything we built in this post — stays unchanged.
Further information
Conclusion
Apache MINA’s filter chain and IoHandler model still give us a clean, well-understood
place to plug in a network protocol. What changes with Groovy 6 is what we put inside
the handler: instead of cramming protocol logic into callbacks and synchronising on shared
collections, we let the handler shuttle events into a channel and write the protocol as
one straightforward async method. The new toolkit — async/await, channels, broadcast,
@ActiveObject, and AsyncScope — composes naturally with MINA, and the result is
network code you can read top-to-bottom.