Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Syncing MySQL Incremental Data to Elasticsearch Using Canal

Tech May 10 2

Canal is an open-source project developed by Alibaba that parses MySQL database incremental logs to provide incremental data subscription and consumption.

This guide demonstrates how to sync MySQL incremental data to Elasticsearch using Canal.

Cluster Architecture

In this architecture, each server represents a Canal instance running as a single JVM process.

A server contains one or more instances, where each instance can be thought of as a configured task.

Each instance comprises the following modules:

  • eventParser

    Connects to the data source, simulates the slave protocol to interact with the master, and parses the protocol

  • eventSink

    Links Parser and Store components, handling data filtering, processing, and distribution

  • eventStore

    Stores data persistently

  • metaManager

    Manages incremental subscription and consumption metadata

In production scenarios, Canal high availability depends on ZooKeeper. The client mode can be simply categorized into two types: TCP Mode and MQ Mode.

The MQ mode offers the advantage of decoupling. Data changes are sent to message queues such as Kafka or RocketMQ, and consumers process the messages by executing relevant logic sequentially.

Sequential Consumption:

For a specified Topic, all messages are partitioned by Sharding Key into blocks. Messages with in the same partition follow a strict First-In-First-Out (FIFO) order for publishing and consumption. Message order is guaranteed within a partition, but no order guarantees exist across different partitions.

MySQL Configuration

  1. For self-hosted MySQL, enable the Binlog write feature first and set binlog-format to ROW mode. Add the following to my.cnf:
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1

Note: For Alibaba Cloud RDS for MySQL, binlog is enabled by default and the account has binlog dump permissions. You can skip this configuration step.

  1. Grant the Canal account the privileges required to act as a MySQL slave:
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
  1. Create the products table in the database:
CREATE TABLE `products` (
	`id` BIGINT(20) NOT NULL AUTO_INCREMENT,
	`product_name` VARCHAR(255) COLLATE utf8mb4_bin NOT NULL,
	`amount` DECIMAL(10,2) NOT NULL,
	`state` TINYINT(4) NOT NULL,
	`created_at` datetime NOT NULL,
	`modified_at` datetime NOT NULL,
   PRIMARY KEY (`id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin

Elasticsearch Configuration

Create the products index using Kibana:

PUT /products
{
    "settings": {
        "number_of_shards": 2,
        "number_of_replicas": 1
    },
    "mappings": {
        "properties": {
            "id": {
                "type": "keyword"
            },
            "productName": {
                "type": "text"
            },
            "amount": {
                "type": "double"
            },
            "state": {
                "type": "integer"
            },
            "createdAt": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
            },
            "modifiedAt": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
            }
        }
    }
}

After execution, the index is created as shown in the console.

RocketMQ Configuration

Create a topic named data-sync-topic. Canal will send Binlog change data to this topic.

Canal Configuration

We use Canal version 1.1.6. Navigate to the conf directory.

1. Configure canal.properties

# Cluster mode ZooKeeper addresses
canal.zkServers=localhost:2181
# Server mode: tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode=rocketMQ
# Instance list
canal.destinations=data-sync
# Configuration root directory
canal.conf.dir=../conf
# Global spring configuration component file for production cluster deployment
canal.instance.global.spring.xml=classpath:spring/default-instance.xml

###### Below are default values, shown for reference
# Canal batch size, default 50K. Due to kafka max message size, do not exceed 1M (keep under 900K)
canal.mq.canalBatchSize=50
# Canal data fetch timeout in milliseconds, empty means no timeout
canal.mq.canalGetTimeout=100
# Use flat JSON format
canal.mq.flatMessage=true

2. Instance Configuration File

Create an instance directory named data-sync under the conf directory. Create the configuration file instance.properties inside data-sync:

# Update with your actual database connection details
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username and password for database access
...
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
...

# Table regex pattern
canal.instance.filter.regex=mytest.products

# MQ configuration
canal.mq.topic=data-sync-topic
# Dynamic topic routing based on database or table names
#canal.mq.dynamicTopic=mytest,.*,mytest.users,mytest\..*,.*\..*
canal.mq.partition=0
# Hash partition configuration
#canal.mq.partitionsNum=3
# Schema.table: primary key, multiple tables separated by commas
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################

3. Service Startup

Start two Canal server instances. Monitor their status through the ZooKeeper GUI.

Modify a record in the products table and observe new messages appearing in the RocketMQ console.

Consumer Implementation

1. Index Operation Service

2. Message Listener

The consumer logic has two key aspects:

  • Sequential consumpsion in the listener
  • Convert message data to JSON strings, extract latest table data from the data nodes (batch operations may contain multiple records). Then execute the corresponding index operation service methods based on the operation type: UPDATE, INSERT, or DELETE.

Summary

Canal is a remarkably interesting open-source project. Many companies use it to build Data Transmission Services (DTS).

Exploring this project teaches valuable lessons in network programming, multi-threading models, and high-performance queues like Disruptor.

Tags: canal

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.