Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Java NIO Network Programming: From Blocking I/O to Selector Multiplexing

Tech May 7 4

Blocking Mode Network Communication

In blocking mode, a single thread can only handle one client connection at a time. When a socket operation encounters no available data or connection, the thread suspends execution and releases the CPU.

Server Implementation in Blocking Mode

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

public class BlockingServer {
    public static void main(String[] args) throws IOException {
        ByteBuffer buffer = ByteBuffer.allocate(16);
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.bind(new InetSocketAddress(8080));
        
        List<SocketChannel> connectedClients = new ArrayList<>();
        
        while (true) {
            System.out.println("Waiting for incoming connection...");
            SocketChannel clientChannel = serverChannel.accept();
            System.out.println("Client connected: " + clientChannel);
            connectedClients.add(clientChannel);
            
            for (SocketChannel channel : connectedClients) {
                System.out.println("Attempting to read from: " + channel);
                channel.read(buffer);
                buffer.flip();
                System.out.println("Received data: " + StandardCharsets.UTF_8.decode(buffer));
                buffer.clear();
            }
        }
    }
}
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;

public class SimpleClient {
    public static void main(String[] args) throws IOException {
        SocketChannel channel = SocketChannel.open();
        channel.connect(new InetSocketAddress("localhost", 8080));
        
        channel.write(StandardCharsets.UTF_8.encode("First message"));
        channel.write(StandardCharsets.UTF_8.encode("Second message"));
    }
}

Blocking Mode Characteristics

  • Thread Suspension: Both ServerSocketChannel.accept() and SocketChannel.read() halt thread execution until the expected event occurs
  • CPU Idle Time: While blocked, the thread consumes no CPU resources but remains idle
  • Single Connection Limitation: One thread handles one connection; accepting a new connection requires finishing current read operations
  • Thread Starvation: When Client B attempts to connect while processing Client A's data, Client A's read operation blocks Client B's connection attempt

Problems with Blocking Mode

  1. Resource Inefficiency: Threads remain idle waiting for data, wasting CPU cycles
  2. Scalability Issues: Each connection requires a dedicated thread
  3. Memory Consumption: 64-bit JVM allocates 1MB stack per thread; millions of connections require gigabytes of memory
  4. Context Switching: Too many threads degrade performance due to frequent CPU context switches

Non-Blocking Mode Network Communication

Non-blocking mode eliminates thread suspension by returning immediately when no event is ready.

Server Implementation in Non-Blocking Mode

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

public class NonBlockingServer {
    public static void main(String[] args) throws IOException {
        ByteBuffer buffer = ByteBuffer.allocate(16);
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        serverChannel.bind(new InetSocketAddress(8080));
        
        List<SocketChannel> clients = new ArrayList<>();
        
        while (true) {
            SocketChannel client = serverChannel.accept();
            if (client != null) {
                client.configureBlocking(false);
                System.out.println("New connection: " + client);
                clients.add(client);
            }
            
            for (SocketChannel ch : clients) {
                int bytesRead = ch.read(buffer);
                if (bytesRead > 0) {
                    buffer.flip();
                    System.out.println("Data received: " + StandardCharsets.UTF_8.decode(buffer));
                    buffer.clear();
                }
            }
        }
    }
}

Non-Blocking Mode Behavior

  • ServerSocketChannel.accept() returns null instead of blocking when no connection arrives
  • SocketChannel.read() returns 0 instead of blocking when no data is available
  • One thread can handle multiple connections by continuously polling

Drawback of Non-Blocking Mode

The thread continuously executes the loop even when no events occur, resulting in wasteful CPU busy-waiting. The ideal behavior is: do work when events occur, release CPU when idle. Selector multiplexing solves this problem.

Selector-Based Multiplexing

A Selector enables a single thread to monitor multiple Channel events. The thread only processes when actual events occur, eliminating busy-waiting.

Event Types

Event Description
OP_ACCEPT Server socket receives a connection request
OP_CONNECT Client establishes connection to server
OP_READ Channel has readable data
OP_WRITE Channel can accept write data

Basic Selector Usage

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;

public class SelectorServer {
    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        
        SelectionKey serverKey = serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("Registered key: " + serverKey);
        
        serverChannel.bind(new InetSocketAddress(8080));
        
        while (true) {
            selector.select();
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                
                if (key.isAcceptable()) {
                    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                    System.out.println("Acceptable connection: " + ssc.accept());
                }
            }
        }
    }
}

Key Points for Selector Usage

Point 1: Remove Processed Keys

The selectedKeys collection retains keys until manually removed. Failing to call iterator.remove() causes repeated processing and potential NullPointerExceptions.

while (iterator.hasNext()) {
    SelectionKey key = iterator.next();
    iterator.remove();
    // Process event
}

Point 2: Handle Client Disconnection

  • Normal disconnection: read() returns -1; call key.cancel() to unregister
  • Abnormal disconnection: IOException thrown; catch expection and call key.cancel()

Handling Connection and Read Events

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;

public class FullSelectorServer {
    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        ByteBuffer buffer = ByteBuffer.allocate(16);
        
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        serverChannel.bind(new InetSocketAddress(8080));
        
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        
        while (true) {
            selector.select();
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                
                if (key.isAcceptable()) {
                    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                    SocketChannel client = ssc.accept();
                    client.configureBlocking(false);
                    SelectionKey clientKey = client.register(selector, SelectionKey.OP_READ);
                    System.out.println("Client registered: " + clientKey);
                }
                
                if (key.isReadable()) {
                    SocketChannel client = (SocketChannel) key.channel();
                    ByteBuffer tempBuffer = ByteBuffer.allocate(16);
                    int bytesRead = client.read(tempBuffer);
                    
                    if (bytesRead == -1) {
                        System.out.println("Client disconnected normally: " + key);
                        key.cancel();
                    } else {
                        tempBuffer.flip();
                        System.out.println("Message: " + StandardCharsets.UTF_8.decode(tempBuffer));
                    }
                }
            }
        }
    }
}

Message Boundary Problems

The Issue

TCP stream protocol has no message boundaries. When the buffer size differs from message length, data may be split or combined.

Example: Client sends two 6-byte messages ("中国", "万岁"), but server buffer is only 3 bytes:

First read: 中 (corrupted)
Second read: 国万 (corrupted)
Third read: 岁

Common Solutions

  1. Fixed Length: Server reads predetermined byte counts
  2. Delimiter-Based: Use special characters like \n to mark message boundaries
  3. TLV Format: Include Type, Length, and Value fields in each message

Delimiter-Based Message Handling

public class DelimiterServer {
    private static void extractMessages(ByteBuffer source) {
        source.flip();
        int limit = source.limit();
        
        for (int i = 0; i < limit; i++) {
            if (source.get(i) == '\n') {
                int size = i + 1 - source.position();
                ByteBuffer message = ByteBuffer.allocate(size);
                source.limit(i + 1);
                message.put(source);
                message.flip();
                System.out.println(StandardCharsets.UTF_8.decode(message));
                source.limit(limit);
            }
        }
        source.compact();
    }
}

Handling Large Messages with Buffer Expansion

Problem

When a single message exceeds buffer capacity, data is lost.

Solution: Attach Buffer to SelectionKey

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;

public class ExpandableBufferServer {
    private static void extractMessages(ByteBuffer source) {
        source.flip();
        for (int i = 0; i < source.limit(); i++) {
            if (source.get(i) == '\n') {
                ByteBuffer msg = ByteBuffer.allocate(i + 1 - source.position());
                source.limit(i + 1);
                msg.put(source);
                msg.flip();
                System.out.println(StandardCharsets.UTF_8.decode(msg));
                source.limit(source.capacity());
            }
        }
        source.compact();
    }
    
    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        serverChannel.bind(new InetSocketAddress(8080));
        
        SelectionKey serverKey = serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        
        while (true) {
            selector.select();
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                
                if (key.isAcceptable()) {
                    SocketChannel client = serverChannel.accept();
                    client.configureBlocking(false);
                    ByteBuffer initialBuffer = ByteBuffer.allocate(8);
                    client.register(selector, SelectionKey.OP_READ, initialBuffer);
                }
                
                if (key.isReadable()) {
                    SocketChannel client = (SocketChannel) key.channel();
                    ByteBuffer buf = (ByteBuffer) key.attachment();
                    int read = client.read(buf);
                    
                    if (read == -1) {
                        key.cancel();
                    } else {
                        extractMessages(buf);
                        
                        if (buf.position() == buf.limit()) {
                            ByteBuffer expanded = ByteBuffer.allocate(buf.capacity() * 2);
                            buf.flip();
                            expanded.put(buf);
                            key.attach(expanded);
                            System.out.println("Buffer expanded to: " + expanded.capacity());
                        }
                    }
                }
            }
        }
    }
}

Buffer Design Considerasions

  1. Channel Isolation: Each channel requires its own buffer since messages may span multiple reads
  2. Dynamic Sizing: Fixed large buffers waste memory; implement resizable buffers
  3. Expansion Strategy: Double buffer capacity when full, copy existing data to new buffer

Write Events and Non-Blocking Writes

The Problem

When writing large data, write() may complete partially, requiring repeated calls in a loop. This contradicts non-blocking philosophy.

while (buffer.hasRemaining()) {
    int written = channel.write(buffer);
}

Solution: Monitor Write Readiness

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;

public class NonBlockingWriteServer {
    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        serverChannel.bind(new InetSocketAddress(8080));
        
        while (true) {
            selector.select();
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                
                if (key.isAcceptable()) {
                    SocketChannel client = serverChannel.accept();
                    client.configureBlocking(false);
                    SelectionKey clientKey = client.register(selector, 0);
                    
                    String data = "a".repeat(8_000_000);
                    ByteBuffer buffer = Charset.defaultCharset().encode(data);
                    
                    int initialWrite = client.write(buffer);
                    System.out.println("Initial write: " + initialWrite + " bytes");
                    
                    if (buffer.hasRemaining()) {
                        clientKey.interestOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ);
                        clientKey.attach(buffer);
                    }
                }
                
                if (key.isWritable()) {
                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    if (buffer != null) {
                        SocketChannel client = (SocketChannel) key.channel();
                        int written = client.write(buffer);
                        System.out.println("Write event write: " + written + " bytes");
                        
                        if (!buffer.hasRemaining()) {
                            key.attach(null);
                            key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
                        }
                    }
                }
                
                if (key.isReadable()) {
                    SocketChannel client = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    client.read(buffer);
                }
            }
        }
    }
}

Selector API Summary

Creation

Selector selector = Selector.open();

Registration

channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

Registration Rules:

  • Channel must be non-blocking
  • FileChannel cannot use selectors (no non-blocking mode)
  • Multiple event types: interestOps(OP_READ | OP_WRITE)

Event Monitoring

Method Behavior
select() Blocks until at least one event occurs
select(timeout) Blocks until event or timeout
selectNow() Returns immediately

When select() Returns Immediately

  • Event occurs on registered channel
  • selector.wakeup() called
  • selector.close() invoked
  • Thread interrupted

Network I/O Evolution Path

Blocking I/O → Non-Blocking I/O → Selector Multiplexing

Selector multiplexing enables a single thread to efficiently manage thousands of connections by only processing active events, eliminating both busy-waiting and thread-per-connection overhead.

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.