Implementing MQTT Message Reception in Spring Boot Applications
MQTT is a lightweight, publish-subscribe network protocol designed for constrained devices and low-bandwidth, high-latency networks. It is a popular choice for Internet of Things (IoT) applications due to its low power consumption and reliability.
Dependencies for Spring Boot MQTT Integration
To begin, add the following dependencies to your pom.xml file:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
External Configuration Properties
Define MQTT connection parameters in your application.yml or application.properties file:
mqtt:
broker:
url: tcp://your-broker-host:1883
client-id: spring-client-001
username: your_username
password: your_password
connection:
clean-session: false
timeout-seconds: 3
keep-alive-interval: 60
automatic-reconnect: true
topics:
default: device/status
subscriptions:
- sensor/temperature
- sensor/humidity
- actuator/command
Configuration Properties Bean
Create a class to bind these external properties:
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@ConfigurationProperties(prefix = "mqtt")
public class MqttConnectionSettings {
private Broker broker = new Broker();
private Connection connection = new Connection();
private Topics topics = new Topics();
public static class Broker {
private String url;
private String clientId;
private String username;
private String password;
// getters and setters
}
public static class Connection {
private boolean cleanSession;
private int timeoutSeconds;
private int keepAliveInterval;
private boolean automaticReconnect;
// getters and setters
}
public static class Topics {
private String defaultValue;
private List<String> subscriptions;
// getters and setters
}
// getters and setters for main fields
}
Configuring the MQTT Client and Message Flow
The following configuration class sets up the client factory, inbound adapter, and message channels.
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@Configuration
public class MqttClientConfiguration {
@Autowired
private MqttConnectionSettings settings;
@Autowired
private MessageHandler inboundMessageProcessor;
@Bean
public MqttConnectOptions connectionOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{settings.getBroker().getUrl()});
options.setUserName(settings.getBroker().getUsername());
options.setPassword(settings.getBroker().getPassword().toCharArray());
options.setCleanSession(settings.getConnection().isCleanSession());
options.setConnectionTimeout(settings.getConnection().getTimeoutSeconds());
options.setKeepAliveInterval(settings.getConnection().getKeepAliveInterval());
options.setAutomaticReconnect(settings.getConnection().isAutomaticReconnect());
return options;
}
@Bean
public MqttPahoClientFactory clientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(connectionOptions());
return factory;
}
@Bean
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inboundAdapter() {
String inboundClientId = settings.getBroker().getClientId() + "-subscriber";
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
inboundClientId,
clientFactory(),
settings.getTopics().getSubscriptions().toArray(new String[0])
);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInboundChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInboundChannel")
public MessageHandler messageHandler() {
return inboundMessageProcessor;
}
}
Processing Inbound Messages
Implement a component that acts as the message handler for processing received MQTT messages.
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
@Component
public class MqttMessageProcessor implements MessageHandler {
private static final Logger log = LoggerFactory.getLogger(MqttMessageProcessor.class);
@Override
public void handleMessage(Message<?> inboundMessage) throws MessagingException {
String topic = (String) inboundMessage.getHeaders().get("mqtt_receivedTopic");
String payload = inboundMessage.getPayload().toString();
log.info("Received MQTT message. Topic: {}, Payload: {}", topic, payload);
// Add your business logic here to process the message.
}
}
Sending Messages via MQTT
To publish messages, configure an outbound gateway.
Add an outbound message handler to the configuration class:
// Inside MqttClientConfiguration class
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler outboundHandler() {
String outboundClientId = settings.getBroker().getClientId() + "-publisher";
MqttPahoMessageHandler handler = new MqttPahoMessageHandler(outboundClientId, clientFactory());
handler.setAsync(true);
handler.setDefaultTopic(settings.getTopics().getDefaultValue());
handler.setDefaultQos(1);
return handler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
Create a service to facilitate sending messages:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Service
public class MqttPublisherService {
@Autowired
private MessageChannel mqttOutboundChannel;
public void publish(String topic, String payload) {
Message<String> message = MessageBuilder.withPayload(payload)
.setHeader("mqtt_topic", topic)
.build();
mqttOutboundChannel.send(message);
}
}