Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Building a C MQTT Client with libmosquitto: Publish/Subscribe Patterns and Topic Filtering

Tech 4

MQTT uses a broker-centric model: all data flows through a broker, while clients act as publishers, subscribers, or both. A single process can publish and subscribe at the same time without being a broker itself.

A common pitfall is subscribing and publishing to the same topic from one client. Many brokers will deliver your own publications back to you, which can trigger redundant processing. Typical approaches to avoid that feedback loop:

  • Use distinct topics for inbound and outbound traffic (recommended for simple cases).
  • Filter in the message callback by ignoring messages on your own publish topic.
  • If using MQTT v5, enable the no-local subscription option to prevent receiving self-published messages.

Below are two small C programs using libmosquitto that exchange data over two different topics to avoid echoing. Each process subscribes to one topic and publishes to the other. Both read lines from stdin and publish them, and both print any messages they receive.

mqtt_server.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <mosquitto.h>

#define BROKER_HOST     "localhost"
#define BROKER_PORT     1883
#define KEEPALIVE_SECS  60
#define BUF_SIZE        512

/* Topic pair: server listens on DOWNSTREAM and sends to UPSTREAM */
static const char *TOPIC_DOWNSTREAM = "demo/downstream";
static const char *TOPIC_UPSTREAM   = "demo/upstream";

static void on_log(struct mosquitto *m, void *ud, int level, const char *msg)
{
    (void)m; (void)ud; (void)level; /* unused */
    fprintf(stdout, "%s\n", msg);
}

static void on_subscribe(struct mosquitto *m, void *ud, int mid, int qos_count, const int *granted_qos)
{
    (void)m; (void)ud; (void)mid;
    fprintf(stdout, "Subscribed with QoS:");
    for(int i = 0; i < qos_count; ++i) {
        fprintf(stdout, " %d", granted_qos[i]);
    }
    fprintf(stdout, "\n");
}

static void on_connect(struct mosquitto *m, void *ud, int rc)
{
    (void)ud;
    if(rc == 0) {
        /* Subscribe after successful connection */
        int sret = mosquitto_subscribe(m, NULL, TOPIC_DOWNSTREAM, 1);
        if(sret != MOSQ_ERR_SUCCESS) {
            fprintf(stderr, "subscribe failed: %s\n", mosquitto_strerror(sret));
        }
    } else {
        fprintf(stderr, "connect failed: code=%d\n", rc);
    }
}

static void on_message(struct mosquitto *m, void *ud, const struct mosquitto_message *msg)
{
    (void)m; (void)ud;

    /* Optional: ignore publications that this process might have sent
       if PUB and SUB topics are the same. Not needed here because topics differ. */
    /* if(strcmp(msg->topic, TOPIC_UPSTREAM) == 0) return; */

    if(msg->payloadlen > 0) {
        fprintf(stdout, "[%s] %.*s\n", msg->topic, msg->payloadlen, (const char *)msg->payload);
    } else {
        fprintf(stdout, "[%s] (empty)\n", msg->topic);
    }
    fflush(stdout);
}

int main(void)
{
    struct mosquitto *mosq = NULL;
    char line[BUF_SIZE];
    bool clean_session = true;

    mosquitto_lib_init();

    mosq = mosquitto_new("srv-bridge", clean_session, NULL);
    if(!mosq) {
        fprintf(stderr, "mosquitto_new failed\n");
        mosquitto_lib_cleanup();
        return 1;
    }

    mosquitto_log_callback_set(mosq, on_log);
    mosquitto_connect_callback_set(mosq, on_connect);
    mosquitto_message_callback_set(mosq, on_message);
    mosquitto_subscribe_callback_set(mosq, on_subscribe);

    int rc = mosquitto_connect(mosq, BROKER_HOST, BROKER_PORT, KEEPALIVE_SECS);
    if(rc != MOSQ_ERR_SUCCESS) {
        fprintf(stderr, "connect error: %s\n", mosquitto_strerror(rc));
        mosquitto_destroy(mosq);
        mosquitto_lib_cleanup();
        return 1;
    }

    rc = mosquitto_loop_start(mosq);
    if(rc != MOSQ_ERR_SUCCESS) {
        fprintf(stderr, "loop start error: %s\n", mosquitto_strerror(rc));
        mosquitto_destroy(mosq);
        mosquitto_lib_cleanup();
        return 1;
    }

    while(fgets(line, sizeof(line), stdin) != NULL) {
        size_t n = strlen(line);
        if(n && line[n-1] == '\n') line[n-1] = '\0';
        if(line[0] == '\0') continue;

        int pret = mosquitto_publish(mosq, NULL, TOPIC_UPSTREAM, (int)strlen(line), line, 0, false);
        if(pret != MOSQ_ERR_SUCCESS) {
            fprintf(stderr, "publish error: %s\n", mosquitto_strerror(pret));
        }
    }

    mosquitto_loop_stop(mosq, true);
    mosquitto_destroy(mosq);
    mosquitto_lib_cleanup();
    return 0;
}

mqtt_client.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <mosquitto.h>

#define BROKER_HOST     "localhost"
#define BROKER_PORT     1883
#define KEEPALIVE_SECS  60
#define BUF_SIZE        512

/* Client listens on UPSTREAM and sends to DOWNSTREAM (opposite of server) */
static const char *TOPIC_UPSTREAM   = "demo/upstream";
static const char *TOPIC_DOWNSTREAM = "demo/downstream";

static void on_log(struct mosquitto *m, void *ud, int level, const char *msg)
{
    (void)m; (void)ud; (void)level;
    fprintf(stdout, "%s\n", msg);
}

static void on_subscribe(struct mosquitto *m, void *ud, int mid, int qos_count, const int *granted_qos)
{
    (void)m; (void)ud; (void)mid;
    fprintf(stdout, "Subscribed with QoS:");
    for(int i = 0; i < qos_count; ++i) {
        fprintf(stdout, " %d", granted_qos[i]);
    }
    fprintf(stdout, "\n");
}

static void on_connect(struct mosquitto *m, void *ud, int rc)
{
    (void)ud;
    if(rc == 0) {
        int sret = mosquitto_subscribe(m, NULL, TOPIC_UPSTREAM, 1);
        if(sret != MOSQ_ERR_SUCCESS) {
            fprintf(stderr, "subscribe failed: %s\n", mosquitto_strerror(sret));
        }
    } else {
        fprintf(stderr, "connect failed: code=%d\n", rc);
    }
}

static void on_message(struct mosquitto *m, void *ud, const struct mosquitto_message *msg)
{
    (void)m; (void)ud;
    if(msg->payloadlen > 0) {
        fprintf(stdout, "[%s] %.*s\n", msg->topic, msg->payloadlen, (const char *)msg->payload);
    } else {
        fprintf(stdout, "[%s] (empty)\n", msg->topic);
    }
    fflush(stdout);
}

int main(void)
{
    struct mosquitto *mosq = NULL;
    char line[BUF_SIZE];
    bool clean_session = true;

    mosquitto_lib_init();

    mosq = mosquitto_new("cli-bridge", clean_session, NULL);
    if(!mosq) {
        fprintf(stderr, "mosquitto_new failed\n");
        mosquitto_lib_cleanup();
        return 1;
    }

    mosquitto_log_callback_set(mosq, on_log);
    mosquitto_connect_callback_set(mosq, on_connect);
    mosquitto_message_callback_set(mosq, on_message);
    mosquitto_subscribe_callback_set(mosq, on_subscribe);

    int rc = mosquitto_connect(mosq, BROKER_HOST, BROKER_PORT, KEEPALIVE_SECS);
    if(rc != MOSQ_ERR_SUCCESS) {
        fprintf(stderr, "connect error: %s\n", mosquitto_strerror(rc));
        mosquitto_destroy(mosq);
        mosquitto_lib_cleanup();
        return 1;
    }

    rc = mosquitto_loop_start(mosq);
    if(rc != MOSQ_ERR_SUCCESS) {
        fprintf(stderr, "loop start error: %s\n", mosquitto_strerror(rc));
        mosquitto_destroy(mosq);
        mosquitto_lib_cleanup();
        return 1;
    }

    while(fgets(line, sizeof(line), stdin) != NULL) {
        size_t n = strlen(line);
        if(n && line[n-1] == '\n') line[n-1] = '\0';
        if(line[0] == '\0') continue;

        int pret = mosquitto_publish(mosq, NULL, TOPIC_DOWNSTREAM, (int)strlen(line), line, 0, false);
        if(pret != MOSQ_ERR_SUCCESS) {
            fprintf(stderr, "publish error: %s\n", mosquitto_strerror(pret));
        }
    }

    mosquitto_loop_stop(mosq, true);
    mosquitto_destroy(mosq);
    mosquitto_lib_cleanup();
    return 0;
}

Makefile

# Link with libmosquitto and pthread

CC      := gcc
CFLAGS  := -O2 -Wall
LDLIBS  := -lmosquitto -lpthread

all: Client Server
	@echo "Build completed"

Client: mqtt_client.c
	$(CC) $(CFLAGS) -o Client mqtt_client.c $(LDLIBS)

Server: mqtt_server.c
	$(CC) $(CFLAGS) -o Server mqtt_server.c $(LDLIBS)

clean:
	-$(RM) Server Client

Notes

  • The sample uses distinct topics for engress and egress to avoid processing self-published messages.
  • To filter echoes when using a single topic, compare msg->topic in the message callback and ignore your own publish topic.
  • For MQTT v5, the no-local option on subscription can prevent the broker from forwarding a client’s own publications back to it.
Tags: c

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.