Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Real-Time MongoDB to MySQL Synchronization Using Change Streams

Tech 1

Synchronizing document-oriented data from MongoDB to a relational MySQL instance requires bridging two fundamentally different storage models. A production-ready pipeline typically relies on MongoDB Change Streams for event-driven capture, a middleware process for transformation, and optimized MySQL write operations.

Event Capture with Change Streams

Polling introduces latency and unnecessary load. Change Streams provide a persistent cursor over the oplog, delivering insert, update, delete, and replace events as they occur. The stream supports resume tokens, ensuring the sync process can recover from interruptions without data loss.

import pymongo
from pymongo.errors import PyMongoError
import time

class MongoChangeCapture:
    def __init__(self, uri, db_name, coll_name):
        self.client = pymongo.MongoClient(uri)
        self.collection = self.client[db_name][coll_name]
        self.resume_token = None

    def start_stream(self):
        pipeline = [{"$match": {"operationType": {"$in": ["insert", "update", "delete", "replace"]}}}]
        options = {"full_document": "updateLookup"}
        if self.resume_token:
            options["resume_after"] = self.resume_token

        while True:
            try:
                with self.collection.watch(pipeline, **options) as cursor:
                    for event in cursor:
                        self.resume_token = cursor.resume_token
                        yield event
            except PyMongoError as e:
                print(f"Stream interrupted: {e}. Reconnecting in 5s...")
                time.sleep(5)

Relational Mapping and MySQL Application

MySQL triggers cannot directly react to external MongoDB events. Instead, the middleware must translate document mutations into relational DML statements. Flattening nested structures and handling schema drift are criticla steps. Using parameterized INSERT ... ON DUPLICATE KEY UPDATE statements ensures idempotent writes.

import mysql.connector
from mysql.connector import Error

class MySQLSyncEngine:
    def __init__(self, config):
        self.conn = mysql.connector.connect(**config)
        self.cursor = self.conn.cursor()

    def apply_change(self, doc_id, payload, op_type):
        try:
            if op_type in ("insert", "replace", "update"):
                flat_data = {
                    "record_id": str(doc_id),
                    "status": payload.get("status", "unknown"),
                    "metric_value": payload.get("metrics", {}).get("score", 0.0),
                    "updated_at": payload.get("timestamp")
                }
                sql = """
                    INSERT INTO sync_target (record_id, status, metric_value, updated_at)
                    VALUES (%(record_id)s, %(status)s, %(metric_value)s, %(updated_at)s)
                    ON DUPLICATE KEY UPDATE
                        status = VALUES(status),
                        metric_value = VALUES(metric_value),
                        updated_at = VALUES(updated_at)
                """
                self.cursor.execute(sql, flat_data)
                self.conn.commit()
            elif op_type == "delete":
                self.cursor.execute("DELETE FROM sync_target WHERE record_id = %s", (str(doc_id),))
                self.conn.commit()
        except Error as db_err:
            self.conn.rollback()
            raise RuntimeError(f"MySQL sync failed: {db_err}")

Orchestrating the Pipeline

The synchronization worker bridges the capture and application layers. It processes events sequentially to maintain order, which is essential for referential integrity in the target database. Batch processing can be introduced for high-throughput scenarios, but sequential application guarantees strict consistency.

def run_sync_pipeline(mongo_uri, mysql_cfg):
    capture = MongoChangeCapture(mongo_uri, "app_data", "events")
    engine = MySQLSyncEngine(mysql_cfg)

    print("Initializing real-time sync worker...")
    for change_event in capture.start_stream():
        op = change_event["operationType"]
        doc_key = change_event["documentKey"]["_id"]
        full_doc = change_event.get("fullDocument", {})

        try:
            engine.apply_change(doc_key, full_doc or {}, op)
        except Exception as exec_err:
            print(f"Skipping event {doc_key} due to error: {exec_err}")

Schema Alignment and Type Conversion

MongoDB's flexible schema contrasts with MySQL's rigid column definitions. The middleware must enforce type casting before execution. Common transformations include converting ObjectId to VARCHAR(24), mapping ISODate objects to MySQL DATETIME, serializing unstructured arrays into JSON columns, and handling null versus missing fields explicitly to avoid constraint violations.

Resilience and Offset Management

Storing the resume_token in a persistent store allows the worker to resume exactly where it left off after crashes or deployments. Without token persistence, the stream defaults to the current oplog position, potentially missing events during downtime. Implementing exponential backoff for connection failures and dead-letter queues for unparseable documents prevents pipeline blockage.

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.