Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Implementing Automatic Reconnection and Subscription Recovery in Eclipse Paho MQTT Java Client

Tech May 10 4

The Eclipse Paho Java client handles MQTT communication through three distinct background threads responsible for sending, processing, and receiving messages. A critical behavior to note is that if an unhandled runtime exception occurs during message processing, the client will forcibly disconnect. The library offers two primary client implementations: MqttClient for synchronous blocking operations and MqttAsyncClient for non-blocking operations. While MqttClient appears synchronous, it internally wraps asynchronous logic to maintain API compatibility.

Message handling relies on a callback pattern. Developers register listeners to track delivery progress and incoming messages. For message persistence, specifically for QoS 1 and 2 delivery guarantees, the MqttClientPersistence interface provides implementations like MemoryPersistence for in-memory storage and MqttDefaultFilePersistence for disk-based storage.

Reconnection Strategies

Since MQTT is designed for unreliable networks, handling connection loss is essential. Two primary approaches exist for managing reconnections:

  1. Manually implementing reconnection logic within the connectionLost callback method.
  2. Leveraging the built-in automatic reconnect feature via MqttConnectOptions.setAutomaticReconnect(true).

The second approach is generally preferred as it delegates complexity to the library. Below is an implementation demonstrating a custom client wrapper with topic caching for re-subscription:

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.ArrayList;
import java.util.List;

public class MqttConnectionManager {
    
    private final MqttClient mqttClient;
    private final MqttConnectOptions options;
    private final List<String> subscribedTopics = new ArrayList<>();

    public MqttConnectionManager(String brokerUri, String clientId) throws MqttException {
        this.mqttClient = new MqttClient(brokerUri, clientId, new MemoryPersistence());
        this.options = new MqttConnectOptions();
        this.options.setAutomaticReconnect(true);
        this.options.setCleanSession(true);
        this.options.setConnectionTimeout(30);
        this.mqttClient.setCallback(new ReconnectCallback(this));
    }

    public void connect() throws MqttException {
        if (!mqttClient.isConnected()) {
            mqttClient.connect(options);
        }
    }

    public void subscribe(String topic, int qos) throws MqttException {
        mqttClient.subscribe(topic, qos);
        subscribedTopics.add(topic);
    }

    public void publish(String topic, String payload, int qos) throws MqttException {
        MqttMessage message = new MqttMessage(payload.getBytes());
        message.setQos(qos);
        mqttClient.publish(topic, message);
    }

    public boolean isConnected() {
        return mqttClient.isConnected();
    }

    public void terminate() throws MqttException {
        mqttClient.disconnect();
        mqttClient.close();
    }

    public void resubscribeAll() {
        subscribedTopics.forEach(topic -> {
            try {
                mqttClient.subscribe(topic, 1);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        });
    }
}

The corresponding callback handler implements MqttCallbackExtended, which provides the connectComplete hook for re-subscription:

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class ReconnectCallback implements MqttCallbackExtended {

    private final MqttConnectionManager manager;

    public ReconnectCallback(MqttConnectionManager manager) {
        this.manager = manager;
    }

    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
        if (reconnect) {
            System.out.println("Reconnected to broker: " + serverURI);
            manager.resubscribeAll();
        } else {
            System.out.println("Connected to broker: " + serverURI);
        }
    }

    @Override
    public void connectionLost(Throwable cause) {
        System.err.println("Connection lost: " + cause.getMessage());
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) {
        System.out.println("Received: " + new String(message.getPayload()));
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        // Delivery confirmation logic
    }
}

Re-subscription After Reconnection

Restoring topic subscriptions after a reconnect can be handled in three ways:

  1. Persistent Sessions: Set MqttConnectOptions.setCleanSession(false). The broker retains subscriptions, but this depends heavily on broker implementation and resource limits.
  2. Extended Callback: Use MqttCallbackExtended and re-subscribe inside connectComplete. This pairs naturally with setAutomaticReconnect(true).
  3. Client Wrapper Cache: Maintain a local list of subscribed topics within a wrapper class and re-subscribe iteratively upon successful reconnection.
Tags: Java

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.