Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Implementing TCP Servers with Spring Boot Using BIO, NIO, and Netty

Tech 3

BIO (Blocking I/O)

Blocking I/O dedicates a thread to each connection. Accept, read, and write operations block the calling thread until data is available or an operation completes. Under high concurrency this approach consumes many threads and context switches.

BIO echo server

BioEchoServer.java

package demo.bio;

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;

public class BioEchoServer {
    private static final Logger log = Logger.getLogger(BioEchoServer.class.getName());

    public static void main(String[] args) throws IOException {
        int port = 10002;
        try (ServerSocket serverSocket = new ServerSocket(port)) {
            ExecutorService pool = Executors.newCachedThreadPool();
            log.info("BIO server listening on port " + port);
            while (true) {
                Socket client = serverSocket.accept();
                pool.submit(new ClientTask(client));
            }
        }
    }

    static class ClientTask implements Runnable {
        private final Socket socket;

        ClientTask(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            String remote = socket.getRemoteSocketAddress().toString();
            log.info("Connected: " + remote);
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
                 BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8))) {

                String line = reader.readLine();
                log.info("Received from " + remote + ": " + line);
                writer.write(line + "\n");
                writer.flush();
            } catch (IOException e) {
                log.log(Level.WARNING, "I/O error for " + remote, e);
            } finally {
                try { socket.close(); } catch (IOException ignored) {}
            }
        }
    }
}

BIO client

BioEchoClient.java

package demo.bio;

import java.io.*;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.logging.Logger;

public class BioEchoClient {
    private static final Logger log = Logger.getLogger(BioEchoClient.class.getName());

    public static void main(String[] args) throws IOException {
        try (Socket socket = new Socket("127.0.0.1", 10002);
             BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8));
             BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))) {

            String payload = "BIO client -> server test";
            writer.write(payload + "\n");
            writer.flush();

            String echoed = reader.readLine();
            log.info("Server replied: " + echoed);
        }
    }
}

NIO (Non-blocking I/O)

Non-blocking I/O uses channels, selectros, and buffers to let a small number of threads manage many connections. Channels are registered with a Selector and readiness events drive I/O.

NIO echo server

NioReactorServer.java

package demo.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;

public class NioReactorServer {
    private static final Logger log = Logger.getLogger(NioReactorServer.class.getName());
    private static final int PORT = 10002;
    private static final int BUF_SIZE = 1024;

    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        ServerSocketChannel server = ServerSocketChannel.open();
        server.configureBlocking(false);
        server.bind(new InetSocketAddress(PORT));
        server.register(selector, SelectionKey.OP_ACCEPT);
        log.info("NIO server listening on " + PORT);

        while (true) {
            selector.select();
            Set<SelectionKey> ready = selector.selectedKeys();
            Iterator<SelectionKey> it = ready.iterator();
            while (it.hasNext()) {
                SelectionKey key = it.next();
                it.remove();
                try {
                    if (!key.isValid()) continue;
                    if (key.isAcceptable()) {
                        handleAccept(key, selector);
                    } else if (key.isReadable()) {
                        handleRead(key);
                    } else if (key.isWritable()) {
                        handleWrite(key);
                    }
                } catch (IOException ex) {
                    log.log(Level.WARNING, "Channel error", ex);
                    closeQuietly(key);
                }
            }
        }
    }

    private static void handleAccept(SelectionKey key, Selector selector) throws IOException {
        ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
        SocketChannel client = ssc.accept();
        if (client == null) return;
        client.configureBlocking(false);
        // Allocate a per-connection buffer as attachment
        client.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(BUF_SIZE));
        log.info("Accepted " + client.getRemoteAddress());
    }

    private static void handleRead(SelectionKey key) throws IOException {
        SocketChannel ch = (SocketChannel) key.channel();
        ByteBuffer buf = (ByteBuffer) key.attachment();
        buf.clear();
        int read = ch.read(buf);
        if (read == -1) {
            log.info("Closed by peer: " + ch.getRemoteAddress());
            ch.close();
            key.cancel();
            return;
        }
        buf.flip();
        byte[] data = new byte[buf.remaining()];
        buf.get(data);
        String msg = new String(data).trim();
        log.info("Received: " + msg);

        String response = msg + " | echo @ " + System.currentTimeMillis();
        key.attach(ByteBuffer.wrap(response.getBytes()));
        key.interestOps(SelectionKey.OP_WRITE);
    }

    private static void handleWrite(SelectionKey key) throws IOException {
        SocketChannel ch = (SocketChannel) key.channel();
        ByteBuffer out = (ByteBuffer) key.attachment();
        ch.write(out);
        if (!out.hasRemaining()) {
            out.compact();
            key.attach(ByteBuffer.allocate(BUF_SIZE));
            key.interestOps(SelectionKey.OP_READ);
        }
    }

    private static void closeQuietly(SelectionKey key) {
        try { key.channel().close(); } catch (IOException ignored) {}
        key.cancel();
    }
}

NIO client

NioClient.java

package demo.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.List;
import java.util.logging.Logger;

public class NioClient {
    private static final Logger log = Logger.getLogger(NioClient.class.getName());
    private static final int BUF_SIZE = 1024;

    public static void main(String[] args) throws IOException, InterruptedException {
        try (SocketChannel channel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 10002))) {
            List<String> lines = Arrays.asList("Acme", "Globex", "Initech", "Umbrella", "Wayne");
            for (String s : lines) {
                channel.write(ByteBuffer.wrap(s.getBytes()));

                ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE);
                int n = channel.read(buf);
                buf.flip();
                byte[] data = new byte[n];
                buf.get(data);
                log.info("Reply: " + new String(data).trim());
                Thread.sleep(1500);
            }
        }
    }
}

Netty-based NIO

Direct NIO is powerful but intricate: mastering selectors, channel lifecycles, buffer management, and error handling is non-trivial. Netty builds on NIO with an event-driven pipeline, robust transports, backpressure, and rich codec support.

Typical benefits:

  • High concurrency via non-blocking event loops
  • Fast I/O with minimized copying and batching
  • Encapsulation of low-level details with a pluggable pipeline

Maven configuration

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>demo</groupId>
    <artifactId>netty-tcp</artifactId>
    <version>0.0.1</version>
    <name>netty-tcp</name>

    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>2.1.9.RELEASE</spring-boot.version>
        <netty.version>4.1.43.Final</netty.version>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>${netty.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

Netty server

TcpNettyServer.java

package demo.netty.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;

@Component
@Slf4j
public class TcpNettyServer {
    private final EventLoopGroup acceptGroup = new NioEventLoopGroup();
    private final EventLoopGroup ioGroup = new NioEventLoopGroup();

    @Value("${netty.port:10002}")
    private int port;

    @PostConstruct
    public void boot() throws InterruptedException {
        ServerBootstrap b = new ServerBootstrap();
        b.group(acceptGroup, ioGroup)
         .channel(NioServerSocketChannel.class)
         .localAddress(new InetSocketAddress(port))
         .option(ChannelOption.SO_BACKLOG, 1024)
         .childOption(ChannelOption.SO_KEEPALIVE, true)
         .childOption(ChannelOption.TCP_NODELAY, true)
         .childHandler(new TcpServerInitializer());

        ChannelFuture f = b.bind().sync();
        if (f.isSuccess()) {
            log.info("Netty TCP server started on {}", port);
        }
    }

    @PreDestroy
    public void shutdown() throws InterruptedException {
        acceptGroup.shutdownGracefully().sync();
        ioGroup.shutdownGracefully().sync();
        log.info("Netty TCP server stopped");
    }
}

TcpServerInitializer.java

package demo.netty.server;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

public class TcpServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) {
        ch.pipeline()
          .addLast("decoder", new StringDecoder(CharsetUtil.UTF_8))
          .addLast("encoder", new StringEncoder(CharsetUtil.UTF_8))
          .addLast(new TcpServerHandler());
    }
}

TcpServerHandler.java

package demo.netty.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class TcpServerHandler extends SimpleChannelInboundHandler<String> {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        log.info("Client online: {}", ctx.channel().remoteAddress());
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) {
        log.info("Server got: {}", msg);
        ctx.writeAndFlush("ack: " + msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.warn("Server error", cause);
        ctx.close();
    }
}

Netty client

TcpNettyClient.java

package demo.netty.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.concurrent.TimeUnit;

@Component
@Slf4j
public class TcpNettyClient {
    private final EventLoopGroup group = new NioEventLoopGroup();

    @Value("${netty.host:127.0.0.1}")
    private String host;

    @Value("${netty.port:10002}")
    private int port;

    private volatile Channel channel;

    public void send(String msg) {
        Channel ch = channel;
        if (ch != null && ch.isActive()) {
            ch.writeAndFlush(msg);
        } else {
            log.warn("Channel not active, drop: {}", msg);
        }
    }

    @PostConstruct
    public void connect() {
        Bootstrap b = new Bootstrap();
        b.group(group)
         .channel(NioSocketChannel.class)
         .option(ChannelOption.SO_KEEPALIVE, true)
         .option(ChannelOption.TCP_NODELAY, true)
         .handler(new TcpClientInitializer());

        ChannelFuture future = b.connect(host, port);
        future.addListener((ChannelFutureListener) f -> {
            if (f.isSuccess()) {
                channel = f.channel();
                log.info("Connected to {}:{}", host, port);
            } else {
                log.info("Connect failed, retrying in 10s");
                f.channel().eventLoop().schedule(this::connect, 10, TimeUnit.SECONDS);
            }
        });
    }
}

TcpClientInitializer.java

package demo.netty.client;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

public class TcpClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) {
        ch.pipeline()
          .addLast("decoder", new StringDecoder(CharsetUtil.UTF_8))
          .addLast("encoder", new StringEncoder(CharsetUtil.UTF_8))
          .addLast(new TcpClientHandler());
    }
}

TcpClientHandler.java

package demo.netty.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class TcpClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        log.info("Client active: {}", ctx.channel().localAddress());
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) {
        log.info("Client received: {}", msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.warn("Client error", cause);
        ctx.close();
    }
}

Example runtime output

Server (Netty):

... INFO  TcpNettyServer - Netty TCP server started on 10002
... INFO  TcpServerHandler - Client online: /127.0.0.1:54321
... INFO  TcpServerHandler - Server got: hello

Client (Netty):

... INFO  TcpNettyClient - Connected to 127.0.0.1:10002
... INFO  TcpClientHandler - Client active: /127.0.0.1:54321
... INFO  TcpClientHandler - Client received: ack: hello

Netty core components

  • Bootstrap/ServerBootstrap: Fluent builders that assemble clients and servers and wire up transports, event loops, and pipelines.
  • Future/ChannelFuture: Asynchronous result placeholders. Register listeners to be invoked on completion, failure, or cancellation.
  • Channel: The primary network I/O abstraction. Exposes connection state, configuration, and non-blocking operations (connect, bind, read, write). Implementations include NioSocketChannel, NioServerSocketChannel, NioDatagramChannel.
  • Selector (under the hood): Netty multiplexes many channels on few threads via the JDK selector mechanisms.
  • NioEventLoop: Single-threaded executor bound to a selector, running I/O events and scheduled/non-I/O tasks; time between I/O and tasks is balanced via configuration.
  • NioEventLoopGroup: Manages a set of EventLoops (threads). A Channel is registered to one EventLoop for its lifetime.
  • ChannelHandler: Units of work in the pipeline that handle inbound events or intercept outbound operations.
  • ChannelHandlerContext: Binds a handler to its pipeline and provides operations to propagate events.
  • ChannelPipeline: Ordered chain of handlers that process inbound and outbound events, allowing fine-grained composition and flow control.

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.