Set Up a Single-Node RocketMQ Cluster with Admin Console and Java Test Utilities
Environment Requirements
- Linux operating system
- JDK 1.8 installed
- RocketMQ vertion 4.9.4
Download RocketMQ
Retrieve the binary package from the Apache archive: https://archive.apache.org/dist/rocketmq/
Installation Steps
1. Extract the Package
unzip rocketmq-all-4.9.4-bin-release.zip
Note: If unzip is not found, install it via apt install unzip (Debian/Ubuntu) or equivalent for your distribution.
2. Adjust Memory Configuration
Default memory settings often trigger "insufficient memory" errors. Modify these files in the bin directory of your extracted folder (e.g., /home/rocketmq-all-4.9.4-bin-release/bin):
runserver.sh: Reduce JVM heap size parametersrunbroker.sh: Lower default memory allocation limitstools.sh: Adjust memory settings for utility tools
3. Configure Single Master Broker
Edit conf/broker.conf and add/update these lines with your server's public/private IP:
namesrvAddr=YOUR_SERVER_IP:9876
brokerIP1=YOUR_SERVER_IP
Start RocketMQ Services
Navigate to your RocketMQ root directory (e.g., /home/rocketmq-all-4.9.4-bin-release):
1. Start Name Server
ohup sh bin/mqnamesrv > logs/mqnamesrv.out &
Verify success by checking the logs/mqnamesrv.out file.
2. Start Broker
nohup sh bin/mqbroker -n YOUR_SERVER_IP:9876 -c ./conf/broker.conf > logs/broker.out &
Confirm startup via the logs/broker.out file.
Basic Functionality Test (Official Examples)
From the RocketMQ root directory:
1. Run Producer
export NAMESRV_ADDR=YOUR_SERVER_IP:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
This sends 1000 test messages to the cluster.
2. Run Consumer
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
This consumes the 1000 test messages sent by the producer.
Stop RocketMQ Services
Shut down services in reverse order of startup:
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
Troubleshooting Disk Space Errors
If you encounter service not available now, maybe disk full:
- Free up disk space on your server
- Add
diskMaxUsedSpaceRatio=99to yourbroker.confto raise the allowed disk usage threshold
Deploy RocketMQ Admin Console
1. Obtain Source Code
Download the rocketmq-externals repository from GitHub: https://github.com/apache/rocketmq-externals
Locate the rocketmq-console subproject.
2. Configure the Console
Edit src/main/resources/application.properties:
server.port=10003
rocketmq.config.namesrvAddr=YOUR_SERVER_IP:9876
Update pom.xml to use a stable RocketMQ version:
Change <rocketmq.version>4.4.0-SNAPSHOT</rocketmq.version> to <rocketmq.version>4.4.0</rocketmq.version>
3. Build the Console
Ensure Maven is installed, then run from the rocketmq-console directory:
mvn clean package -Dmaven.test.skip=true
Note: If you get "You must specify a valid lifecycle phase", add <defaultGoal>compile</defaultGoal> inside the <build> section of pom.xml.
4. Start the Console
ohup java -jar target/rocketmq-console-ng-1.0.0.jar > console.out &
Access the web UI at http://YOUR_SERVER_IP:10003/index.html; language can be switched in the UI.
Java Test Utility Class
Maven Dependencies
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.4</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-common</artifactId>
<version>4.9.4</version>
</dependency>
Utility Code
package com.example.rocketmq.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
public class RocketMQHelper {
private static final Logger log = LoggerFactory.getLogger(RocketMQHelper.class);
private static final int DEFAULT_TIMEOUT = 3000;
private final String nameServerAddress;
private String messageTag;
public RocketMQHelper(String nameServerAddress) {
this.nameServerAddress = nameServerAddress;
}
private DefaultMQProducer buildProducer(String producerGroup) {
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(nameServerAddress);
producer.setSendMsgTimeout(DEFAULT_TIMEOUT);
producer.setRetryTimesWhenSendFailed(3);
return producer;
}
public SendResult sendMessage(String producerGroup, String topic, String content, String messageKey) throws MQClientException, Exception {
return sendMessage(producerGroup, topic, content, messageKey, 0);
}
public SendResult sendMessage(String producerGroup, String topic, String content, String messageKey, int delayLevel) throws MQClientException, Exception {
if (StringUtils.isEmpty(content)) {
throw new IllegalArgumentException("Message content for topic [" + topic + "] cannot be empty");
}
DefaultMQProducer producer = buildProducer(producerGroup);
Message msg = new Message(topic, messageTag, messageKey, content.getBytes(RemotingHelper.DEFAULT_CHARSET));
if (delayLevel > 0) {
msg.setDelayTimeLevel(delayLevel);
}
try {
producer.start();
return producer.send(msg);
} catch (MQClientException e) {
log.error("Failed to send message to topic [{}]", topic, e);
throw e;
} catch (Exception e) {
log.error("Error sending message to topic [{}]", topic, e);
throw e;
} finally {
producer.shutdown();
}
}
private DefaultMQPushConsumer buildConsumer(String consumerGroup, String topic, boolean useBroadcasting) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(nameServerAddress);
consumer.subscribe(topic, messageTag);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
if (useBroadcasting) {
consumer.setMessageModel(MessageModel.BROADCASTING);
}
return consumer;
}
public void startConsumer(String consumerGroup, String topic, MessageHandler handler) throws MQClientException, Exception {
startConsumer(consumerGroup, topic, handler, false);
}
public void startConsumer(String consumerGroup, String topic, MessageHandler handler, boolean useBroadcasting) throws MQClientException, Exception {
DefaultMQPushConsumer consumer = buildConsumer(consumerGroup, topic, useBroadcasting);
try {
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
try {
for (MessageExt msg : msgs) {
handler.process(msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
log.error("Error consuming messages from topic [{}]", topic, e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
} catch (Exception e) {
log.error("Failed to start consumer for topic [{}]", topic, e);
throw e;
} finally {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("Shutting down consumer for topic [{}]", topic);
consumer.shutdown();
}));
}
}
@FunctionalInterface
public interface MessageHandler {
void process(MessageExt message) throws IOException;
}
public static void main(String[] args) throws Exception {
// Example consumer usage
RocketMQHelper consumerHelper = new RocketMQHelper("YOUR_SERVER_IP:9876");
consumerHelper.startConsumer("test-consumer-group", "test-topic-04", (msg) -> {
String body = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
JSONObject data = JSON.parseObject(body);
Integer orderId = data.getInteger("orderId");
Integer userId = data.getInteger("buyUserId");
String orderCode = data.getString("orderCode");
System.out.printf("Consumed order: ID=%d, User=%d, Code=%s%n", orderId, userId, orderCode);
log.info("Consumed message - Key: {}, Tag: {}, Body: {}", msg.getKeys(), msg.getTags(), body);
});
}
}