Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Incremental MySQL to Elasticsearch Data Synchronization Using Logstash

Tech 1

Continuous data synchronization from MySQL to Elasticsearch handles both single-table and multi-table (joined) datasets. Unlike a one-time import, this approach ensures that Elasticsearch reflects subsequent changes in the MySQL source. Note that each jdbc input block executes a single SQL statement; handling multiple queries or complex joins requires defining separate jdbc input blocks.

First, place the appropriate MySQL JDBC driver .jar file into the Logstash bin dircetory.

The core of the synchronization process relies on the Lgostash pipeline configuration.

input {
  jdbc {
    jdbc_connection_string => "jdbc:mysql://192.168.1.50:3306/app_db"
    jdbc_user => "db_admin"
    jdbc_password => "securePass123"
    jdbc_driver_library => "/opt/logstash/jdbc_drivers/mysql-connector-java-8.0.28.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_paging_enabled => true
    jdbc_page_size => 100000

    # Prevents full table scans on every run; only processes new records
    clean_run => false

    # Use a specific column for tracking instead of the default run timestamp
    use_column_value => true
    tracking_column => "modified_at"
    tracking_column_type => "timestamp"

    # Persist the last tracked value to a file for subsequent runs
    record_last_run => true
    last_run_metadata_path => "/var/log/logstash/sync_state/customer_tracker"

    # Preserve original casing for Elasticsearch field names
    lowercase_column_names => false

    # External SQL file path
    statement_filepath => "/etc/logstash/sql_scripts/load_customers.sql"

    # Execute the query every 5 minutes
    schedule => "*/5 * * * *"
    type => "customer"
  }
}

output {
  stdout {
    codec => json_lines
  }

  if [type] == "customer" {
    elasticsearch {
      hosts => ["http://es-node1:9200"]
      index => "customers_idx"
      document_id => "%{customer_id}"
    }
  }
}

Critical configuration considerations:

By default, if tracking_column is omitted, Logstash tracks the pipeline execution timestamp. To track a specific database column like modified_at, explicit set use_column_value => true.

Enabling record_last_run => true writes the most recent value of the tracked column to the file specified by last_run_metadata_path.

During incremental synchronization, the referenced SQL file must include the :sql_last_value placeholder in its WHERE clause, combined with an ORDER BY directive on the tracked column ascending. For instance:

SELECT * FROM app_users WHERE modified_at > :sql_last_value ORDER BY modified_at ASC

This logic fetches only newly added or updated rows and correctly advances the tracking state stored in the metadata file.

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.