Zookeeper and Kafka Message Queue Overview
1. Zookeeper
1.1 Introduction
An open-source distributed service that provides coordination for distributed frameworks.
1.2 Features
① Consists of a leader and multiple followers. ② Cluster remains operational if more than half of the nodes are active, with a minimum of three nodes. ③ Global data consistency, each server maintains a copy of the same data. ④ Update requests are processed in order. ⑤ Atomicity, either all updates succeed or fail.
1.3 Working Mode and Mechanism
1.3.1 Working Mode:
File system combined with notification mechanism.
1.3.2 Mechanism:
① Each server registers its information with the Zookeeper cluster upon startup. ② Clients retrieve the list of active servers from the cluster and monitor changes. ③ When servers go online or offline, Zookeeper updates the list and notifies clients. ④ Clients receive notifications and fetch the updated list of active servers.
1.4 Use Cases and Election Process
1.4.1 Use Cases:
Unified naming, configuration management, cluster management, dynamic node registration, and load balancing.
1.4.2 Election Process:
1.4.2.1 Initial Startup:
Server 1 starts, votes for itself. Server 2 starts, votes for itself, and Server 1 votes for Server 2 due to a higher ID. Server 3 starts, becomes the leader as it has the highest ID.
1.4.2.2 Non-Initial Startup:
Epoch values determine leadership, followed by transaction IDs (ZXID) and server IDs (SID).
2. Zookeeper Cluster Deployment
2.1 Experimental Environment:
z1: 192.168.170.111 myid=1 z2: 192.168.170.113 myid=2 z3: 192.168.170.114 myid=3
2.2 Installing Zookeeper
Three machines execute:
systemctl stop firewalld.service
setenforce 0
# Disable firewall and SELinux
cd /opt
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz
mv apache-zookeeper-3.5.7-bin /usr/local/zookeeper-3.5.7
# Move the compressed package to /opt and extract it, then move the extracted file to /usr/local and rename it
cd /usr/local/zookeeper-3.5.7/conf/
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
# Backup and modify the configuration file
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper-3.5.7/data
dataLogDir=/usr/local/zookeeper-3.5.7/logs
clientPort=2181
server.1=192.168.170.111:3188:3288
server.2=192.168.170.113:3188:3288
server.3=192.168.170.114:3188:3288
mkdir /usr/local/zookeeper-3.5.7/data
mkdir /usr/local/zookeeper-3.5.7/logs
echo 1 > /usr/local/zookeeper-3.5.7/data/myid
echo 2 > /usr/local/zookeeper-3.5.7/data/myid
echo 3 > /usr/local/zookeeper-3.5.7/data/myid
vim /etc/init.d/zookeeper
#!/bin/bash
#chkconfig:2345 20 90
#description:Zookeeper Service Control Script
ZK_HOME='/usr/local/zookeeper-3.5.7'
case $1 in
start)
echo "---------- zookeeper 启动 ------------"
$ZK_HOME/bin/zkServer.sh start
;;
stop)
echo "---------- zookeeper 停止 ------------"
$ZK_HOME/bin/zkServer.sh stop
;;
restart)
echo "---------- zookeeper 重启 ------------"
$ZK_HOME/bin/zkServer.sh restart
;;
status)
echo "---------- zookeeper 状态 ------------"
$ZK_HOME/bin/zkServer.sh status
;;
*)
echo "Usage: $0 {start|stop|restart|status}"
esac
chmod +x /etc/init.d/zookeeper
chkconfig --add zookeeper
service zookeeper start
service zookeeper status
3. Kafka Message Queue
3.1 Why Use a Message Queue?
In high-concurrency environments, synchronous requests may not be handled quick, leading to blocking and potential system failures.
3.2 Benefits of Using a Message Queue
3.2.1 Decoupling
Allows independent expansion or modification of both ends, ensuring they follow the same interface constraints.
3.2.2 Recovery
A failure in one component does not affect the entire system. Message queues reduce coupling, allowing messages to be processed after recovery.
3.2.3 Caching
Helps control and optimize data flow speed, solving enconsistencies between production and consumption speeds.
3.2.4 Flexibility and Peak Handling
Enables critical components to handle sudden traffic spikes without crashing.
3.2.5 Asynchronous Communication
Users can add messages to a queue without processing them immediately. They can add as many messages as needed and process them when required.
3.3 Two Message Queue Modes
① Point-to-Point (One-to-One)
Consumers actively pull data, and once consumed, the message is deleted.
② Publish-Subscribe (One-to-Many, Also Known as Observer Pattern)
Consumers do not delete messages after consumption. Producers publish messages to topics, and multiple consumers can subscribe.
3.4 Kafka Introduction
Kafka is a distributed, partitioned, multi-replica message queue based on the publish-subscribe model, primarily used for real-time log and big data processing.
3.5 Kafka Features
① High throughput, handling tens of thousands of messages per second with low latency. ② Persistence, reliability, and efficient storage mechanisms. ③ Distributed architecture, ensuring no data loss even if one node fails. ④ Fault tolerance, allowing n-1 node failures. ⑤ High concurrency, supporting thousands of clients simultaneously.
3.6 Kafka System Architecture Terms
① Broker: A Kafka server; a cluster consists of multiple brokers. ② Producer: Writes messages to the broker. ③ Consumer: Reads messages from the broker. ④ Zookeeper: Manages cluster metadata and controller elections. ⑤ Topic: Logical classification for messages. ⑥ Partition: Data division within a topic. ⑦ Replica: Multiple copies for fault tolerance. ⑧ Offset: Records consumer progres in Zookeeper.
3.7 Partition Data Routing Rules
① Specify a partition directly. ② If no partition is specified but a key is provided, use a modulo operation on the key. ③ If neither is specified, use round-robin selection.
3.8 Reasons for Partitions and Multiple Partitions per Topic
① Facilitates scalability; partitions can adjust to fit different machines. ② Increases concurrency by reading and writing at the partition level.
3.9 Kafka Architecture and Workflow
① Producers send data to the broker. ② The broker stores data in topics, which have multiple partitions. ③ Partitions store data, and the broker registers with Zookeeper. ④ Consumers receive online server lists from Zookeeper and pull data from the broker. ⑤ Consumers record their progress in Zookeeper.
3.10 Kafka Cluster Deployment
Based on Zookeeper:
cd /opt
tar zxvf kafka_2.13-2.7.1.tgz
mv kafka_2.13-2.7.1 /usr/local/kafka
# Edit the configuration file
broker.id=0
listeners=PLAINTEXT://192.168.10.17:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
zookeeper.connect=192.168.170.111:2181,192.168.170.113:2181,192.168.170.114:2181
vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
vim /etc/init.d/kafka
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/usr/local/kafka'
case $1 in
start)
echo "---------- Kafka 启动 ------------"
${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)
echo "---------- Kafka 停止 ------------"
${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)
$0 stop
$0 start
;;
status)
echo "---------- Kafka 状态 ------------"
count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
if [ "$count" -eq 0 ];then
echo "kafka is not running"
else
echo "kafka is running"
fi
;;
*)
echo "Usage: $0 {start|stop|restart|status}"
esac
chkconfig --add kafka
service kafka start
service kafka status
# On any machine:
kafka-topics.sh --create --zookeeper 192.168.170.111:2181,192.168.170.113:2181,192.168.170.114:2181 --replication-factor 2 --partitions 3 --topic test
kafka-topics.sh --list --zookeeper 192.168.170.111:2181,192.168.170.113:2181,192.168.170.114:2181
kafka-console-producer.sh --broker-list 192.168.170.111:9092,192.168.170.113:9092,192.168.170.114:9092 --topic test
kafka-console-consumer.sh --bootstrap-server 192.168.170.111:9092,192.168.170.113:9092,192.168.170.114:9092 --topic test --from-beginning
3.11 Kafka Error Analysis
[2023-04-10 20:01:57,373] WARN [Producer clientId=console-producer] Bootstrap broker 192.168.30.18:2181 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2023-04-10 20:01:57,475] WARN [Producer clientId=console-producer] Bootstrap broker 192.168.30.19:2181 (id: -2 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2023-04-10 20:01:57,577] WARN [Producer clientId=console-producer] Bootstrap broker 192.168.30.20:2181 (id: -3 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2023-04-10 20:01:57,679] WARN [Producer clientId=console-producer] Bootstrap broker 192.168.30.18:2181 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2023-04-10 20:01:57,782] WARN [Producer clientId=console-producer] Bootstrap broker 192.168.30.19:2181 (id: -2 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
## Error Information
ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1(kafka.admin.TopicCommand$)
## Check if the broker.id is duplicated or if there are issues with the firewall on other machines.