Fading Coder

One Final Commit for the Last Sprint

Home > Notes > Content

Set Up a Single-Node RocketMQ Cluster with Admin Console and Java Test Utilities

Notes 1

Environment Requirements

  • Linux operating system
  • JDK 1.8 installed
  • RocketMQ vertion 4.9.4

Download RocketMQ

Retrieve the binary package from the Apache archive: https://archive.apache.org/dist/rocketmq/

Installation Steps

1. Extract the Package

unzip rocketmq-all-4.9.4-bin-release.zip

Note: If unzip is not found, install it via apt install unzip (Debian/Ubuntu) or equivalent for your distribution.

2. Adjust Memory Configuration

Default memory settings often trigger "insufficient memory" errors. Modify these files in the bin directory of your extracted folder (e.g., /home/rocketmq-all-4.9.4-bin-release/bin):

  • runserver.sh: Reduce JVM heap size parameters
  • runbroker.sh: Lower default memory allocation limits
  • tools.sh: Adjust memory settings for utility tools

3. Configure Single Master Broker

Edit conf/broker.conf and add/update these lines with your server's public/private IP:

namesrvAddr=YOUR_SERVER_IP:9876
brokerIP1=YOUR_SERVER_IP

Start RocketMQ Services

Navigate to your RocketMQ root directory (e.g., /home/rocketmq-all-4.9.4-bin-release):

1. Start Name Server

ohup sh bin/mqnamesrv > logs/mqnamesrv.out &

Verify success by checking the logs/mqnamesrv.out file.

2. Start Broker

nohup sh bin/mqbroker -n YOUR_SERVER_IP:9876 -c ./conf/broker.conf > logs/broker.out &

Confirm startup via the logs/broker.out file.

Basic Functionality Test (Official Examples)

From the RocketMQ root directory:

1. Run Producer

export NAMESRV_ADDR=YOUR_SERVER_IP:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

This sends 1000 test messages to the cluster.

2. Run Consumer

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

This consumes the 1000 test messages sent by the producer.

Stop RocketMQ Services

Shut down services in reverse order of startup:

sh bin/mqshutdown broker
sh bin/mqshutdown namesrv

Troubleshooting Disk Space Errors

If you encounter service not available now, maybe disk full:

  1. Free up disk space on your server
  2. Add diskMaxUsedSpaceRatio=99 to your broker.conf to raise the allowed disk usage threshold

Deploy RocketMQ Admin Console

1. Obtain Source Code

Download the rocketmq-externals repository from GitHub: https://github.com/apache/rocketmq-externals Locate the rocketmq-console subproject.

2. Configure the Console

Edit src/main/resources/application.properties:

server.port=10003
rocketmq.config.namesrvAddr=YOUR_SERVER_IP:9876

Update pom.xml to use a stable RocketMQ version: Change <rocketmq.version>4.4.0-SNAPSHOT</rocketmq.version> to <rocketmq.version>4.4.0</rocketmq.version>

3. Build the Console

Ensure Maven is installed, then run from the rocketmq-console directory:

mvn clean package -Dmaven.test.skip=true

Note: If you get "You must specify a valid lifecycle phase", add <defaultGoal>compile</defaultGoal> inside the <build> section of pom.xml.

4. Start the Console

ohup java -jar target/rocketmq-console-ng-1.0.0.jar > console.out &

Access the web UI at http://YOUR_SERVER_IP:10003/index.html; language can be switched in the UI.

Java Test Utility Class

Maven Dependencies

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.4</version>
</dependency>
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-common</artifactId>
    <version>4.9.4</version>
</dependency>

Utility Code

package com.example.rocketmq.utils;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;

public class RocketMQHelper {
    private static final Logger log = LoggerFactory.getLogger(RocketMQHelper.class);
    private static final int DEFAULT_TIMEOUT = 3000;

    private final String nameServerAddress;
    private String messageTag;

    public RocketMQHelper(String nameServerAddress) {
        this.nameServerAddress = nameServerAddress;
    }

    private DefaultMQProducer buildProducer(String producerGroup) {
        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
        producer.setNamesrvAddr(nameServerAddress);
        producer.setSendMsgTimeout(DEFAULT_TIMEOUT);
        producer.setRetryTimesWhenSendFailed(3);
        return producer;
    }

    public SendResult sendMessage(String producerGroup, String topic, String content, String messageKey) throws MQClientException, Exception {
        return sendMessage(producerGroup, topic, content, messageKey, 0);
    }

    public SendResult sendMessage(String producerGroup, String topic, String content, String messageKey, int delayLevel) throws MQClientException, Exception {
        if (StringUtils.isEmpty(content)) {
            throw new IllegalArgumentException("Message content for topic [" + topic + "] cannot be empty");
        }

        DefaultMQProducer producer = buildProducer(producerGroup);
        Message msg = new Message(topic, messageTag, messageKey, content.getBytes(RemotingHelper.DEFAULT_CHARSET));
        if (delayLevel > 0) {
            msg.setDelayTimeLevel(delayLevel);
        }

        try {
            producer.start();
            return producer.send(msg);
        } catch (MQClientException e) {
            log.error("Failed to send message to topic [{}]", topic, e);
            throw e;
        } catch (Exception e) {
            log.error("Error sending message to topic [{}]", topic, e);
            throw e;
        } finally {
            producer.shutdown();
        }
    }

    private DefaultMQPushConsumer buildConsumer(String consumerGroup, String topic, boolean useBroadcasting) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(nameServerAddress);
        consumer.subscribe(topic, messageTag);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        if (useBroadcasting) {
            consumer.setMessageModel(MessageModel.BROADCASTING);
        }
        return consumer;
    }

    public void startConsumer(String consumerGroup, String topic, MessageHandler handler) throws MQClientException, Exception {
        startConsumer(consumerGroup, topic, handler, false);
    }

    public void startConsumer(String consumerGroup, String topic, MessageHandler handler, boolean useBroadcasting) throws MQClientException, Exception {
        DefaultMQPushConsumer consumer = buildConsumer(consumerGroup, topic, useBroadcasting);

        try {
            consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
                try {
                    for (MessageExt msg : msgs) {
                        handler.process(msg);
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
                    log.error("Error consuming messages from topic [{}]", topic, e);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            });
            consumer.start();
        } catch (Exception e) {
            log.error("Failed to start consumer for topic [{}]", topic, e);
            throw e;
        } finally {
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                log.info("Shutting down consumer for topic [{}]", topic);
                consumer.shutdown();
            }));
        }
    }

    @FunctionalInterface
    public interface MessageHandler {
        void process(MessageExt message) throws IOException;
    }

    public static void main(String[] args) throws Exception {
        // Example consumer usage
        RocketMQHelper consumerHelper = new RocketMQHelper("YOUR_SERVER_IP:9876");
        consumerHelper.startConsumer("test-consumer-group", "test-topic-04", (msg) -> {
            String body = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
            JSONObject data = JSON.parseObject(body);
            Integer orderId = data.getInteger("orderId");
            Integer userId = data.getInteger("buyUserId");
            String orderCode = data.getString("orderCode");

            System.out.printf("Consumed order: ID=%d, User=%d, Code=%s%n", orderId, userId, orderCode);
            log.info("Consumed message - Key: {}, Tag: {}, Body: {}", msg.getKeys(), msg.getTags(), body);
        });
    }
}

Related Articles

Designing Alertmanager Templates for Prometheus Notifications

How to craft Alertmanager templates to format alert messages, improving clarity and presentation. Alertmanager uses Go’s text/template engine with additional helper functions. Alerting rules referenc...

Deploying a Maven Web Application to Tomcat 9 Using the Tomcat Manager

Tomcat 9 does not provide a dedicated Maven plugin. The Tomcat Manager interface, however, is backward-compatible, so the Tomcat 7 Maven Plugin can be used to deploy to Tomcat 9. This guide shows two...

Skipping Errors in MySQL Asynchronous Replication

When a replica halts because the SQL thread encounters an error, you can resume replication by skipping the problematic event(s). Two common approaches are available. Methods to Skip Errors 1) Skip a...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.