Streaming Servers with Apache MINA and Groovy™ 6

Author: Paul King

Published: 2026-04-30 12:00PM


Apache MINA gives us a battle-tested asynchronous network framework with a callback-driven IoHandler model. Groovy 6 adds async/await, channels, broadcast streams, and @ActiveObject — exactly the toolkit needed to flatten that callback model into protocol code that reads top-to-bottom.

Apache MINA is "a network application framework which helps users develop high performance and high scalability network applications easily". It wraps Java NIO behind a small set of event-driven abstractions: an acceptor, a filter chain (codecs, logging, SSL), a per-connection session, and an IoHandler you implement to react to lifecycle and message events.

That callback shape is the same shape that motivated async/await to exist in the first place. Once a handler needs to do anything beyond echo bytes — talk to a database, fan out to several backends, broadcast to other sessions — the linear flow disappears into a tangle of synchronized sets, futures, and follow-on callbacks.

Groovy 6 adds a unified concurrency toolkit that lets us refactor those handlers into top-to-bottom async code:

  • Native async { } / await running on virtual threads (JDK 21+)

  • AsyncChannel and BroadcastChannel for Go-style and pub/sub communication

  • for await for iterating async sources (channels, generators, reactive streams)

  • @ActiveObject / @ActiveMethod for race-free shared state with ordinary class syntax

  • AsyncScope for structured concurrency

In this post we put them to work together. We will build a small streaming "ticker hub": clients subscribe to symbols and receive a stream of price ticks; they can also issue a one-shot multi-symbol QUOTE that fans out concurrently. The result is a MINA server whose entire protocol fits in one method that you can read straight through.

Examples in this post use the freshly released Apache MINA 2.2.6 and Apache Commons RNG Simple 1.7, together with Groovy 6 on JDK 21+. The full source is available on GitHub.

What we are building

A simple line-based protocol over TCP:

SUBSCRIBE AAPL,GOOG     -> server starts pushing TICK lines for those symbols
UNSUBSCRIBE GOOG        -> server stops pushing for that symbol
QUOTE AAPL,MSFT,TSLA    -> server returns one snapshot line per symbol concurrently
QUIT                    -> server replies BYE and closes the connection

Server pushes look like:

SUBSCRIBED AAPL
TICK AAPL 173.42
TICK AAPL 174.10
QUOTE AAPL 174.10
QUOTE MSFT 412.55
BYE

The ticks themselves are produced by a stub FakeTickSource for now. In a follow-up post we will replace that stub with real data fetched in parallel from a fleet of sources — but the protocol, broadcast machinery, and per-session async logic will not need to change.

A 60-second tour of MINA

A MINA server only needs five concepts:

  • NioSocketAcceptor — the TCP listener (UDP, in-VM and serial transports also exist).

  • The filter chain — a pipeline through which bytes flow in both directions. We add three: a codec filter (turns bytes into decoded messages), a logging filter, and — in production — an SslFilter.

  • A ProtocolCodecFilter with a TextLineCodecFactory — turns the byte stream into newline-delimited String messages and back. Our handler will see `String`s, not raw `IoBuffer`s.

  • IoHandlerAdapter — the extension point. Override the lifecycle callbacks you care about: sessionOpened, messageReceived, messageSent, sessionIdle, sessionClosed, exceptionCaught.

  • IoSession — one per connected client. Use session.write(obj) to send (the codec encodes for you), session.setAttribute(key, value) for per-connection scratch state, and session.closeOnFlush() to disconnect.

The entire server boot is about ten lines:

var acceptor = new NioSocketAcceptor()
acceptor.filterChain.with {
    addLast 'codec',  new ProtocolCodecFilter(new TextLineCodecFactory(StandardCharsets.UTF_8))
    addLast 'logger', new LoggingFilter()
}
acceptor.handler = new TickerHandler(registry: registry)
acceptor.bind(new InetSocketAddress(0))    // ephemeral port

So far, so MINA. The interesting bit is what we put inside TickerHandler.

The classic shape, and why we want a different one

A MINA IoHandler traditionally does all of its work inside messageReceived. The chat example shipped with MINA is a good representative — it carries the entire chat protocol inside one big switch over the parsed command, with shared state held in synchronized sets:

public class ChatProtocolHandler extends IoHandlerAdapter {
    private final Set<IoSession> sessions = Collections.synchronizedSet(new HashSet<>());
    private final Set<String>    users    = Collections.synchronizedSet(new HashSet<>());

    @Override
    public void messageReceived(IoSession session, Object message) {
        String[] result = ((String) message).split(" ", 2);
        switch (ChatCommand.valueOf(result[0]).toInt()) {
            case ChatCommand.LOGIN:
                /* register session, broadcast join, ... */
                break;
            case ChatCommand.BROADCAST:
                /* iterate sessions, write to each ... */
                break;
            case ChatCommand.QUIT:
                session.write("QUIT OK");
                session.closeNow();
                break;
        }
    }

    public void broadcast(String message) {
        synchronized (sessions) {
            for (IoSession s : sessions) {
                if (s.isConnected()) s.write("BROADCAST OK " + message);
            }
        }
    }
}

That works, and for many years it was the right shape. But every concurrent feature — broadcast, shared user list, session lifecycle — ends up either as synchronized collections or as more callbacks. Adding a downstream call (e.g., look up the user’s permissions over HTTP) means either blocking a precious NIO thread or chaining yet more callbacks.

We will refactor along a single principle: the handler does nothing except shuttle events into a channel, and the actual protocol runs as one straight-line async task per session.

The bridging pattern

Here is the entire MINA-facing surface of our server:

class TickerHandler extends IoHandlerAdapter {
    static final String INBOX = 'inbox'
    TickerRegistry registry

    @Override
    void sessionOpened(IoSession session) {
        var inbox = AsyncChannel.create(64)            // bounded => back-pressure
        session.setAttribute(INBOX, inbox)
        async {
            new TickerProtocol(session: session, inbox: inbox, registry: registry).run()
        }
    }

    @Override
    void messageReceived(IoSession session, message) {
        session.getAttribute(INBOX).send(message.toString())
    }

    @Override
    void sessionClosed(IoSession session) {
        session.getAttribute(INBOX)?.close()
    }

    @Override
    void exceptionCaught(IoSession session, Throwable cause) {
        cause.printStackTrace()
        session.getAttribute(INBOX)?.close()
        session.closeNow()
    }
}

That is everything we will ever write against IoHandler. The handler now contains zero protocol logic. Each connected client gets a per-session AsyncChannel (a Go-style buffered channel) and a single async { } task that drives the protocol from start to finish.

A few things worth noting:

  • AsyncChannel.create(64) is bounded. If the protocol task ever falls behind, send will exert back-pressure on MINA. We size the buffer generously enough that the NIO thread never blocks in normal operation.

  • async { } launches the per-session task on a virtual thread (on JDK 21+). On JDK 17—​20 it falls back to a cached thread pool. Either way, there is no ExecutorFilter and no SEDA tuning: virtual threads handle the I/O parking for us.

  • sessionClosed closes the channel. The for await loop reading from it will then exit cleanly, and the per-session async task will return.

The protocol, top-to-bottom

With the bridging in place, here is the entire protocol — all the conversation logic that the bundled MINA chat handler scattered across callbacks now lives in one method:

void run() {
    AsyncScope.withScope {
        for (String line in inbox) {
            var parts = line.trim().split(/\s+/, 2)
            var cmd  = parts[0].toUpperCase()
            var args = parts.length > 1 ? parts[1] : ''
            switch (cmd) {
                case 'SUBSCRIBE'   -> handleSubscribe(args)
                case 'UNSUBSCRIBE' -> handleUnsubscribe(args)
                case 'QUOTE'       -> handleQuote(args)
                case 'QUIT'        -> session.with{ write('BYE'); closeOnFlush() }
                default            -> session.write("ERROR unknown command: $cmd")
            }
        }
    }
}

A plain for loop consumes the inbox as it is filled by messageReceived — AsyncChannel exposes itself as a blocking iterable, so the loop reads top-to-bottom and exits when the channel is closed. The whole loop runs inside AsyncScope.withScope { …​ } so any subscription tasks we launch underneath (see the next two sections) inherit a structured lifetime: when the loop exits — on QUIT, on the client disconnecting, or on any exception — the scope guarantees every child task has finished or been cancelled before run() returns.

QUIT is one line: send BYE, ask MINA to close on flush, and the resulting sessionClosed callback closes the inbox channel, which terminates the for loop. Compare that with calling session.closeNow() from deep inside a callback chain.

Shared state with @ActiveObject

The price feeds and last-tick cache are shared across every session. The traditional answer is Collections.synchronizedMap or some other mutex dance. Groovy 6’s @ActiveObject turns ordinary class methods into messages on a serialised mailbox, so we get race-free state without writing synchronized once:

@ActiveObject
class TickerRegistry {
    private final Map<String, BroadcastChannel> feeds = [:]
    private final Map<String, BigDecimal>       latest = [:]

    @ActiveMethod
    Flow.Publisher<BigDecimal> subscribe(String symbol) {
        feeds.computeIfAbsent(symbol) { BroadcastChannel.create() }.asPublisher()
    }

    @ActiveMethod
    void publish(String symbol, BigDecimal price) {
        latest[symbol] = price
        feeds[symbol]?.send(price)
    }

    @ActiveMethod
    BigDecimal lastTick(String symbol) {
        latest[symbol]
    }
}

@ActiveMethod calls are routed through the registry’s internal actor, so two threads can freely call publish and subscribe at the same time without us thinking about locks.

BroadcastChannel is the other star of this section. Unlike a plain AsyncChannel (which delivers each value to one consumer), a broadcast delivers every value to every subscriber. Calling asPublisher() adapts it to the JDK java.util.concurrent.Flow.Publisher interface, which for await consumes natively. The same plumbing works against any reactive stream — a Reactor Flux (via the groovy-reactor module) or an RxJava Observable (via groovy-rxjava) would slot in unchanged.

Tip

Why @ActiveObject here and not Agent? Agent wraps a single value updated through a function — great for a counter, a single config snapshot, or any "one mutable thing". When the unit of state has multiple coordinated operations on related fields (as our registry does), @ActiveObject reads more naturally.

Per-session subscription with for await

Each SUBSCRIBE spawns a small async task that consumes the broadcast for the requested symbol and writes ticks to the session:

private void handleSubscribe(String args) {
    symbolsOf(args).each { sym ->
        if (subs.containsKey(sym)) return
        var cancel = new AtomicBoolean(false)
        subs[sym] = cancel
        async {
            for await (price in registry.subscribe(sym)) {
                if (cancel.get()) break
                session.write("TICK $sym $price")
            }
        }
        session.write("SUBSCRIBED $sym")
    }
}

private void handleUnsubscribe(String args) {
    symbolsOf(args).each { sym ->
        subs.remove(sym)?.set(true)
        session.write("UNSUBSCRIBED $sym")
    }
}

UNSUBSCRIBE flips an AtomicBoolean; the per-symbol loop notices on its next tick, breaks, and for await cancels the underlying subscription on the way out.

Two pieces of the new toolkit are doing the heavy lifting here:

  • for await (price in registry.subscribe(sym)) consumes the per-symbol Flow.Publisher as if it were a regular collection, while the underlying subscriber/back-pressure protocol is hidden.

  • The bounded AsyncChannel we created for the inbox in sessionOpened provides the back-pressure path between the network and our async tasks. If the client cannot drain ticks fast enough, MINA’s write queue eventually pushes back, the broadcast publisher slows its delivery, and nothing falls over.

Structured fan-out for QUOTE

QUOTE AAPL,GOOG,MSFT,TSLA asks for one snapshot per symbol. The natural shape is to fire all the lookups in parallel and join on the results — which is exactly what AsyncScope is for:

private void handleQuote(String args) {
    var syms = symbolsOf(args)
    AsyncScope.withScope {
        var lookups = syms.collect { sym -> async { [sym, registry.lastTick(sym)] } }
        lookups.each { l ->
            var (sym, price) = await(l)
            session.write("QUOTE $sym ${price ?: 'NA'}")
        }
    }
}

The scope owns the lifetime of the per-symbol tasks. If any single lookup blows up, the others are cancelled before the scope exits — no leaked tasks, no half-finished work, no manual CompletableFuture.allOf plus exception-handling boilerplate. With virtual threads, fanning out over hundreds of symbols costs essentially nothing.

A simulated tick source

For now the ticks come from a stub: one async task per symbol drips a randomly walked price into the registry every 200 ms.

class FakeTickSource {
    TickerRegistry registry
    List<String> symbols
    long intervalMillis = 250

    private final running = new AtomicBoolean(false)
    private final rng = RandomSource.XO_RO_SHI_RO_128_PP.create(42L)

    void start() {
        running.set(true)
        symbols.each { sym ->
            var price = 50.0G + rng.nextInt(200)
            async {
                while (running.get()) {
                    var delta = (rng.nextInt(200) - 100) / 100.0
                    price = (price + delta).max(0.01g)
                    registry.publish(sym, price)
                    sleep intervalMillis
                }
            }
        }
    }

    void stop() { running.set(false) }
}

We pulled the random number generator from Apache Commons RNG Simple 1.7 (also a fresh release): UniformRandomProvider is a drop-in replacement for java.util.Random with the same nextInt(n) shape, but backed by faster, statistically stronger algorithms — in this case the recommended general-purpose XO_RO_SHI_RO_128_PP.

In a follow-up post we will replace this stub with real prices fetched in parallel from a fleet of sources — using Apache MINA SSHD as a client, parallel collections, and Pool.virtual(). The consumer side — everything we built above — does not change at all.

Trying it

The repository ships a TickerServer.groovy script that boots the acceptor on an ephemeral port, opens an in-process socket client, exercises the protocol, and asserts the output. A typical run prints something like:

Ticker hub listening on port 56432
--- received ---
SUBSCRIBED AAPL
SUBSCRIBED MSFT
TICK AAPL 173.42
TICK MSFT 412.10
TICK AAPL 174.18
TICK MSFT 411.80
TICK AAPL 173.95
QUOTE AAPL 173.95
QUOTE GOOG NA
QUOTE MSFT 411.80
QUOTE TSLA 215.55
UNSUBSCRIBED AAPL
TICK MSFT 412.45
BYE
OK

You can also try it interactively. Start the server, open another terminal, and use nc:

$ nc localhost 9999
SUBSCRIBE AAPL,MSFT
SUBSCRIBED AAPL
SUBSCRIBED MSFT
TICK AAPL 173.42
TICK MSFT 412.10
...
QUOTE GOOG,TSLA
QUOTE GOOG 198.20
QUOTE TSLA 215.55
UNSUBSCRIBE AAPL
UNSUBSCRIBED AAPL
QUIT
BYE

Recap

We started with the standard MINA pattern — a messageReceived callback containing the whole protocol switch, plus synchronised collections for shared state — and ended up with one ten-line IoHandler whose only job is to forward inbound messages into a channel, plus a single runProtocol method that reads top-to-bottom.

Each Groovy 6 feature earned its place:

  • async { } and for await flatten the per-session loop, with virtual threads doing the parking on blocking I/O.

  • AsyncChannel bridges between MINA’s callback-driven world and the linear protocol loop, with bounded buffers providing natural back-pressure.

  • BroadcastChannel and its asPublisher() view give us pub/sub broadcast for free, with for await consuming reactive streams as if they were collections.

  • @ActiveObject makes shared state race-free by routing methods through a serialised mailbox, without a single synchronized keyword.

  • AsyncScope provides structured lifetimes — both for the protocol task itself and for the parallel QUOTE fan-out.

Coming up

Our hub trusts whoever feeds it. Next time, we will wire a fleet of broker boxes to feed the BroadcastChannel`s we built today — using Apache MINA SSHD as a client, parallel collection methods (`collectParallel, eachParallel), Pool.virtual() for the I/O pool, and async file I/O on Path for log capture. The producer side gets all the attention; the consumer side — everything we built in this post — stays unchanged.

Conclusion

Apache MINA’s filter chain and IoHandler model still give us a clean, well-understood place to plug in a network protocol. What changes with Groovy 6 is what we put inside the handler: instead of cramming protocol logic into callbacks and synchronising on shared collections, we let the handler shuttle events into a channel and write the protocol as one straightforward async method. The new toolkit — async/await, channels, broadcast, @ActiveObject, and AsyncScope — composes naturally with MINA, and the result is network code you can read top-to-bottom.

Update history

30/Apr/2026: Initial version.