Syncing MySQL Incremental Data to Elasticsearch Using Canal
Canal is an open-source project developed by Alibaba that parses MySQL database incremental logs to provide incremental data subscription and consumption.
This guide demonstrates how to sync MySQL incremental data to Elasticsearch using Canal.
Cluster Architecture
In this architecture, each server represents a Canal instance running as a single JVM process.
A server contains one or more instances, where each instance can be thought of as a configured task.
Each instance comprises the following modules:
-
eventParser
Connects to the data source, simulates the slave protocol to interact with the master, and parses the protocol
-
eventSink
Links Parser and Store components, handling data filtering, processing, and distribution
-
eventStore
Stores data persistently
-
metaManager
Manages incremental subscription and consumption metadata
In production scenarios, Canal high availability depends on ZooKeeper. The client mode can be simply categorized into two types: TCP Mode and MQ Mode.
The MQ mode offers the advantage of decoupling. Data changes are sent to message queues such as Kafka or RocketMQ, and consumers process the messages by executing relevant logic sequentially.
Sequential Consumption:
For a specified Topic, all messages are partitioned by Sharding Key into blocks. Messages with in the same partition follow a strict First-In-First-Out (FIFO) order for publishing and consumption. Message order is guaranteed within a partition, but no order guarantees exist across different partitions.
MySQL Configuration
- For self-hosted MySQL, enable the Binlog write feature first and set binlog-format to ROW mode. Add the following to my.cnf:
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1
Note: For Alibaba Cloud RDS for MySQL, binlog is enabled by default and the account has binlog dump permissions. You can skip this configuration step.
- Grant the Canal account the privileges required to act as a MySQL slave:
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
- Create the
productstable in the database:
CREATE TABLE `products` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT,
`product_name` VARCHAR(255) COLLATE utf8mb4_bin NOT NULL,
`amount` DECIMAL(10,2) NOT NULL,
`state` TINYINT(4) NOT NULL,
`created_at` datetime NOT NULL,
`modified_at` datetime NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
Elasticsearch Configuration
Create the products index using Kibana:
PUT /products
{
"settings": {
"number_of_shards": 2,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"id": {
"type": "keyword"
},
"productName": {
"type": "text"
},
"amount": {
"type": "double"
},
"state": {
"type": "integer"
},
"createdAt": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
},
"modifiedAt": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
}
}
}
}
After execution, the index is created as shown in the console.
RocketMQ Configuration
Create a topic named data-sync-topic. Canal will send Binlog change data to this topic.
Canal Configuration
We use Canal version 1.1.6. Navigate to the conf directory.
1. Configure canal.properties
# Cluster mode ZooKeeper addresses
canal.zkServers=localhost:2181
# Server mode: tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode=rocketMQ
# Instance list
canal.destinations=data-sync
# Configuration root directory
canal.conf.dir=../conf
# Global spring configuration component file for production cluster deployment
canal.instance.global.spring.xml=classpath:spring/default-instance.xml
###### Below are default values, shown for reference
# Canal batch size, default 50K. Due to kafka max message size, do not exceed 1M (keep under 900K)
canal.mq.canalBatchSize=50
# Canal data fetch timeout in milliseconds, empty means no timeout
canal.mq.canalGetTimeout=100
# Use flat JSON format
canal.mq.flatMessage=true
2. Instance Configuration File
Create an instance directory named data-sync under the conf directory. Create the configuration file instance.properties inside data-sync:
# Update with your actual database connection details
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username and password for database access
...
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
...
# Table regex pattern
canal.instance.filter.regex=mytest.products
# MQ configuration
canal.mq.topic=data-sync-topic
# Dynamic topic routing based on database or table names
#canal.mq.dynamicTopic=mytest,.*,mytest.users,mytest\..*,.*\..*
canal.mq.partition=0
# Hash partition configuration
#canal.mq.partitionsNum=3
# Schema.table: primary key, multiple tables separated by commas
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################
3. Service Startup
Start two Canal server instances. Monitor their status through the ZooKeeper GUI.
Modify a record in the products table and observe new messages appearing in the RocketMQ console.
Consumer Implementation
1. Index Operation Service
2. Message Listener
The consumer logic has two key aspects:
- Sequential consumpsion in the listener
- Convert message data to JSON strings, extract latest table data from the
datanodes (batch operations may contain multiple records). Then execute the corresponding index operation service methods based on the operation type:UPDATE,INSERT, orDELETE.
Summary
Canal is a remarkably interesting open-source project. Many companies use it to build Data Transmission Services (DTS).
Exploring this project teaches valuable lessons in network programming, multi-threading models, and high-performance queues like Disruptor.