Real-Time MongoDB to MySQL Synchronization Using Change Streams
Synchronizing document-oriented data from MongoDB to a relational MySQL instance requires bridging two fundamentally different storage models. A production-ready pipeline typically relies on MongoDB Change Streams for event-driven capture, a middleware process for transformation, and optimized MySQL write operations.
Event Capture with Change Streams
Polling introduces latency and unnecessary load. Change Streams provide a persistent cursor over the oplog, delivering insert, update, delete, and replace events as they occur. The stream supports resume tokens, ensuring the sync process can recover from interruptions without data loss.
import pymongo
from pymongo.errors import PyMongoError
import time
class MongoChangeCapture:
def __init__(self, uri, db_name, coll_name):
self.client = pymongo.MongoClient(uri)
self.collection = self.client[db_name][coll_name]
self.resume_token = None
def start_stream(self):
pipeline = [{"$match": {"operationType": {"$in": ["insert", "update", "delete", "replace"]}}}]
options = {"full_document": "updateLookup"}
if self.resume_token:
options["resume_after"] = self.resume_token
while True:
try:
with self.collection.watch(pipeline, **options) as cursor:
for event in cursor:
self.resume_token = cursor.resume_token
yield event
except PyMongoError as e:
print(f"Stream interrupted: {e}. Reconnecting in 5s...")
time.sleep(5)
Relational Mapping and MySQL Application
MySQL triggers cannot directly react to external MongoDB events. Instead, the middleware must translate document mutations into relational DML statements. Flattening nested structures and handling schema drift are criticla steps. Using parameterized INSERT ... ON DUPLICATE KEY UPDATE statements ensures idempotent writes.
import mysql.connector
from mysql.connector import Error
class MySQLSyncEngine:
def __init__(self, config):
self.conn = mysql.connector.connect(**config)
self.cursor = self.conn.cursor()
def apply_change(self, doc_id, payload, op_type):
try:
if op_type in ("insert", "replace", "update"):
flat_data = {
"record_id": str(doc_id),
"status": payload.get("status", "unknown"),
"metric_value": payload.get("metrics", {}).get("score", 0.0),
"updated_at": payload.get("timestamp")
}
sql = """
INSERT INTO sync_target (record_id, status, metric_value, updated_at)
VALUES (%(record_id)s, %(status)s, %(metric_value)s, %(updated_at)s)
ON DUPLICATE KEY UPDATE
status = VALUES(status),
metric_value = VALUES(metric_value),
updated_at = VALUES(updated_at)
"""
self.cursor.execute(sql, flat_data)
self.conn.commit()
elif op_type == "delete":
self.cursor.execute("DELETE FROM sync_target WHERE record_id = %s", (str(doc_id),))
self.conn.commit()
except Error as db_err:
self.conn.rollback()
raise RuntimeError(f"MySQL sync failed: {db_err}")
Orchestrating the Pipeline
The synchronization worker bridges the capture and application layers. It processes events sequentially to maintain order, which is essential for referential integrity in the target database. Batch processing can be introduced for high-throughput scenarios, but sequential application guarantees strict consistency.
def run_sync_pipeline(mongo_uri, mysql_cfg):
capture = MongoChangeCapture(mongo_uri, "app_data", "events")
engine = MySQLSyncEngine(mysql_cfg)
print("Initializing real-time sync worker...")
for change_event in capture.start_stream():
op = change_event["operationType"]
doc_key = change_event["documentKey"]["_id"]
full_doc = change_event.get("fullDocument", {})
try:
engine.apply_change(doc_key, full_doc or {}, op)
except Exception as exec_err:
print(f"Skipping event {doc_key} due to error: {exec_err}")
Schema Alignment and Type Conversion
MongoDB's flexible schema contrasts with MySQL's rigid column definitions. The middleware must enforce type casting before execution. Common transformations include converting ObjectId to VARCHAR(24), mapping ISODate objects to MySQL DATETIME, serializing unstructured arrays into JSON columns, and handling null versus missing fields explicitly to avoid constraint violations.
Resilience and Offset Management
Storing the resume_token in a persistent store allows the worker to resume exactly where it left off after crashes or deployments. Without token persistence, the stream defaults to the current oplog position, potentially missing events during downtime. Implementing exponential backoff for connection failures and dead-letter queues for unparseable documents prevents pipeline blockage.