Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Implementing MQTT Message Reception in Spring Boot Applications

Tech 1

MQTT is a lightweight, publish-subscribe network protocol designed for constrained devices and low-bandwidth, high-latency networks. It is a popular choice for Internet of Things (IoT) applications due to its low power consumption and reliability.

Dependencies for Spring Boot MQTT Integration

To begin, add the following dependencies to your pom.xml file:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

External Configuration Properties

Define MQTT connection parameters in your application.yml or application.properties file:

mqtt:
  broker:
    url: tcp://your-broker-host:1883
    client-id: spring-client-001
    username: your_username
    password: your_password
  connection:
    clean-session: false
    timeout-seconds: 3
    keep-alive-interval: 60
    automatic-reconnect: true
  topics:
    default: device/status
    subscriptions:
      - sensor/temperature
      - sensor/humidity
      - actuator/command

Configuration Properties Bean

Create a class to bind these external properties:

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.List;

@Component
@ConfigurationProperties(prefix = "mqtt")
public class MqttConnectionSettings {
    private Broker broker = new Broker();
    private Connection connection = new Connection();
    private Topics topics = new Topics();

    public static class Broker {
        private String url;
        private String clientId;
        private String username;
        private String password;
        // getters and setters
    }
    public static class Connection {
        private boolean cleanSession;
        private int timeoutSeconds;
        private int keepAliveInterval;
        private boolean automaticReconnect;
        // getters and setters
    }
    public static class Topics {
        private String defaultValue;
        private List<String> subscriptions;
        // getters and setters
    }
    // getters and setters for main fields
}

Configuring the MQTT Client and Message Flow

The following configuration class sets up the client factory, inbound adapter, and message channels.

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

@Configuration
public class MqttClientConfiguration {

    @Autowired
    private MqttConnectionSettings settings;
    @Autowired
    private MessageHandler inboundMessageProcessor;

    @Bean
    public MqttConnectOptions connectionOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{settings.getBroker().getUrl()});
        options.setUserName(settings.getBroker().getUsername());
        options.setPassword(settings.getBroker().getPassword().toCharArray());
        options.setCleanSession(settings.getConnection().isCleanSession());
        options.setConnectionTimeout(settings.getConnection().getTimeoutSeconds());
        options.setKeepAliveInterval(settings.getConnection().getKeepAliveInterval());
        options.setAutomaticReconnect(settings.getConnection().isAutomaticReconnect());
        return options;
    }

    @Bean
    public MqttPahoClientFactory clientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(connectionOptions());
        return factory;
    }

    @Bean
    public MessageChannel mqttInboundChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inboundAdapter() {
        String inboundClientId = settings.getBroker().getClientId() + "-subscriber";
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
                inboundClientId,
                clientFactory(),
                settings.getTopics().getSubscriptions().toArray(new String[0])
        );
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInboundChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInboundChannel")
    public MessageHandler messageHandler() {
        return inboundMessageProcessor;
    }
}

Processing Inbound Messages

Implement a component that acts as the message handler for processing received MQTT messages.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;

@Component
public class MqttMessageProcessor implements MessageHandler {
    private static final Logger log = LoggerFactory.getLogger(MqttMessageProcessor.class);

    @Override
    public void handleMessage(Message<?> inboundMessage) throws MessagingException {
        String topic = (String) inboundMessage.getHeaders().get("mqtt_receivedTopic");
        String payload = inboundMessage.getPayload().toString();
        log.info("Received MQTT message. Topic: {}, Payload: {}", topic, payload);
        // Add your business logic here to process the message.
    }
}

Sending Messages via MQTT

To publish messages, configure an outbound gateway.

Add an outbound message handler to the configuration class:

// Inside MqttClientConfiguration class
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler outboundHandler() {
    String outboundClientId = settings.getBroker().getClientId() + "-publisher";
    MqttPahoMessageHandler handler = new MqttPahoMessageHandler(outboundClientId, clientFactory());
    handler.setAsync(true);
    handler.setDefaultTopic(settings.getTopics().getDefaultValue());
    handler.setDefaultQos(1);
    return handler;
}

@Bean
public MessageChannel mqttOutboundChannel() {
    return new DirectChannel();
}

Create a service to facilitate sending messages:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class MqttPublisherService {

    @Autowired
    private MessageChannel mqttOutboundChannel;

    public void publish(String topic, String payload) {
        Message<String> message = MessageBuilder.withPayload(payload)
                .setHeader("mqtt_topic", topic)
                .build();
        mqttOutboundChannel.send(message);
    }
}

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.