Building a C MQTT Client with libmosquitto: Publish/Subscribe Patterns and Topic Filtering
MQTT uses a broker-centric model: all data flows through a broker, while clients act as publishers, subscribers, or both. A single process can publish and subscribe at the same time without being a broker itself.
A common pitfall is subscribing and publishing to the same topic from one client. Many brokers will deliver your own publications back to you, which can trigger redundant processing. Typical approaches to avoid that feedback loop:
- Use distinct topics for inbound and outbound traffic (recommended for simple cases).
- Filter in the message callback by ignoring messages on your own publish topic.
- If using MQTT v5, enable the no-local subscription option to prevent receiving self-published messages.
Below are two small C programs using libmosquitto that exchange data over two different topics to avoid echoing. Each process subscribes to one topic and publishes to the other. Both read lines from stdin and publish them, and both print any messages they receive.
mqtt_server.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <mosquitto.h>
#define BROKER_HOST "localhost"
#define BROKER_PORT 1883
#define KEEPALIVE_SECS 60
#define BUF_SIZE 512
/* Topic pair: server listens on DOWNSTREAM and sends to UPSTREAM */
static const char *TOPIC_DOWNSTREAM = "demo/downstream";
static const char *TOPIC_UPSTREAM = "demo/upstream";
static void on_log(struct mosquitto *m, void *ud, int level, const char *msg)
{
(void)m; (void)ud; (void)level; /* unused */
fprintf(stdout, "%s\n", msg);
}
static void on_subscribe(struct mosquitto *m, void *ud, int mid, int qos_count, const int *granted_qos)
{
(void)m; (void)ud; (void)mid;
fprintf(stdout, "Subscribed with QoS:");
for(int i = 0; i < qos_count; ++i) {
fprintf(stdout, " %d", granted_qos[i]);
}
fprintf(stdout, "\n");
}
static void on_connect(struct mosquitto *m, void *ud, int rc)
{
(void)ud;
if(rc == 0) {
/* Subscribe after successful connection */
int sret = mosquitto_subscribe(m, NULL, TOPIC_DOWNSTREAM, 1);
if(sret != MOSQ_ERR_SUCCESS) {
fprintf(stderr, "subscribe failed: %s\n", mosquitto_strerror(sret));
}
} else {
fprintf(stderr, "connect failed: code=%d\n", rc);
}
}
static void on_message(struct mosquitto *m, void *ud, const struct mosquitto_message *msg)
{
(void)m; (void)ud;
/* Optional: ignore publications that this process might have sent
if PUB and SUB topics are the same. Not needed here because topics differ. */
/* if(strcmp(msg->topic, TOPIC_UPSTREAM) == 0) return; */
if(msg->payloadlen > 0) {
fprintf(stdout, "[%s] %.*s\n", msg->topic, msg->payloadlen, (const char *)msg->payload);
} else {
fprintf(stdout, "[%s] (empty)\n", msg->topic);
}
fflush(stdout);
}
int main(void)
{
struct mosquitto *mosq = NULL;
char line[BUF_SIZE];
bool clean_session = true;
mosquitto_lib_init();
mosq = mosquitto_new("srv-bridge", clean_session, NULL);
if(!mosq) {
fprintf(stderr, "mosquitto_new failed\n");
mosquitto_lib_cleanup();
return 1;
}
mosquitto_log_callback_set(mosq, on_log);
mosquitto_connect_callback_set(mosq, on_connect);
mosquitto_message_callback_set(mosq, on_message);
mosquitto_subscribe_callback_set(mosq, on_subscribe);
int rc = mosquitto_connect(mosq, BROKER_HOST, BROKER_PORT, KEEPALIVE_SECS);
if(rc != MOSQ_ERR_SUCCESS) {
fprintf(stderr, "connect error: %s\n", mosquitto_strerror(rc));
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return 1;
}
rc = mosquitto_loop_start(mosq);
if(rc != MOSQ_ERR_SUCCESS) {
fprintf(stderr, "loop start error: %s\n", mosquitto_strerror(rc));
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return 1;
}
while(fgets(line, sizeof(line), stdin) != NULL) {
size_t n = strlen(line);
if(n && line[n-1] == '\n') line[n-1] = '\0';
if(line[0] == '\0') continue;
int pret = mosquitto_publish(mosq, NULL, TOPIC_UPSTREAM, (int)strlen(line), line, 0, false);
if(pret != MOSQ_ERR_SUCCESS) {
fprintf(stderr, "publish error: %s\n", mosquitto_strerror(pret));
}
}
mosquitto_loop_stop(mosq, true);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return 0;
}
mqtt_client.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <mosquitto.h>
#define BROKER_HOST "localhost"
#define BROKER_PORT 1883
#define KEEPALIVE_SECS 60
#define BUF_SIZE 512
/* Client listens on UPSTREAM and sends to DOWNSTREAM (opposite of server) */
static const char *TOPIC_UPSTREAM = "demo/upstream";
static const char *TOPIC_DOWNSTREAM = "demo/downstream";
static void on_log(struct mosquitto *m, void *ud, int level, const char *msg)
{
(void)m; (void)ud; (void)level;
fprintf(stdout, "%s\n", msg);
}
static void on_subscribe(struct mosquitto *m, void *ud, int mid, int qos_count, const int *granted_qos)
{
(void)m; (void)ud; (void)mid;
fprintf(stdout, "Subscribed with QoS:");
for(int i = 0; i < qos_count; ++i) {
fprintf(stdout, " %d", granted_qos[i]);
}
fprintf(stdout, "\n");
}
static void on_connect(struct mosquitto *m, void *ud, int rc)
{
(void)ud;
if(rc == 0) {
int sret = mosquitto_subscribe(m, NULL, TOPIC_UPSTREAM, 1);
if(sret != MOSQ_ERR_SUCCESS) {
fprintf(stderr, "subscribe failed: %s\n", mosquitto_strerror(sret));
}
} else {
fprintf(stderr, "connect failed: code=%d\n", rc);
}
}
static void on_message(struct mosquitto *m, void *ud, const struct mosquitto_message *msg)
{
(void)m; (void)ud;
if(msg->payloadlen > 0) {
fprintf(stdout, "[%s] %.*s\n", msg->topic, msg->payloadlen, (const char *)msg->payload);
} else {
fprintf(stdout, "[%s] (empty)\n", msg->topic);
}
fflush(stdout);
}
int main(void)
{
struct mosquitto *mosq = NULL;
char line[BUF_SIZE];
bool clean_session = true;
mosquitto_lib_init();
mosq = mosquitto_new("cli-bridge", clean_session, NULL);
if(!mosq) {
fprintf(stderr, "mosquitto_new failed\n");
mosquitto_lib_cleanup();
return 1;
}
mosquitto_log_callback_set(mosq, on_log);
mosquitto_connect_callback_set(mosq, on_connect);
mosquitto_message_callback_set(mosq, on_message);
mosquitto_subscribe_callback_set(mosq, on_subscribe);
int rc = mosquitto_connect(mosq, BROKER_HOST, BROKER_PORT, KEEPALIVE_SECS);
if(rc != MOSQ_ERR_SUCCESS) {
fprintf(stderr, "connect error: %s\n", mosquitto_strerror(rc));
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return 1;
}
rc = mosquitto_loop_start(mosq);
if(rc != MOSQ_ERR_SUCCESS) {
fprintf(stderr, "loop start error: %s\n", mosquitto_strerror(rc));
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return 1;
}
while(fgets(line, sizeof(line), stdin) != NULL) {
size_t n = strlen(line);
if(n && line[n-1] == '\n') line[n-1] = '\0';
if(line[0] == '\0') continue;
int pret = mosquitto_publish(mosq, NULL, TOPIC_DOWNSTREAM, (int)strlen(line), line, 0, false);
if(pret != MOSQ_ERR_SUCCESS) {
fprintf(stderr, "publish error: %s\n", mosquitto_strerror(pret));
}
}
mosquitto_loop_stop(mosq, true);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return 0;
}
Makefile
# Link with libmosquitto and pthread
CC := gcc
CFLAGS := -O2 -Wall
LDLIBS := -lmosquitto -lpthread
all: Client Server
@echo "Build completed"
Client: mqtt_client.c
$(CC) $(CFLAGS) -o Client mqtt_client.c $(LDLIBS)
Server: mqtt_server.c
$(CC) $(CFLAGS) -o Server mqtt_server.c $(LDLIBS)
clean:
-$(RM) Server Client
Notes
- The sample uses distinct topics for engress and egress to avoid processing self-published messages.
- To filter echoes when using a single topic, compare msg->topic in the message callback and ignore your own publish topic.
- For MQTT v5, the no-local option on subscription can prevent the broker from forwarding a client’s own publications back to it.