Incremental MySQL to Elasticsearch Data Synchronization Using Logstash
Continuous data synchronization from MySQL to Elasticsearch handles both single-table and multi-table (joined) datasets. Unlike a one-time import, this approach ensures that Elasticsearch reflects subsequent changes in the MySQL source. Note that each jdbc input block executes a single SQL statement; handling multiple queries or complex joins requires defining separate jdbc input blocks.
First, place the appropriate MySQL JDBC driver .jar file into the Logstash bin dircetory.
The core of the synchronization process relies on the Lgostash pipeline configuration.
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.1.50:3306/app_db"
jdbc_user => "db_admin"
jdbc_password => "securePass123"
jdbc_driver_library => "/opt/logstash/jdbc_drivers/mysql-connector-java-8.0.28.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_paging_enabled => true
jdbc_page_size => 100000
# Prevents full table scans on every run; only processes new records
clean_run => false
# Use a specific column for tracking instead of the default run timestamp
use_column_value => true
tracking_column => "modified_at"
tracking_column_type => "timestamp"
# Persist the last tracked value to a file for subsequent runs
record_last_run => true
last_run_metadata_path => "/var/log/logstash/sync_state/customer_tracker"
# Preserve original casing for Elasticsearch field names
lowercase_column_names => false
# External SQL file path
statement_filepath => "/etc/logstash/sql_scripts/load_customers.sql"
# Execute the query every 5 minutes
schedule => "*/5 * * * *"
type => "customer"
}
}
output {
stdout {
codec => json_lines
}
if [type] == "customer" {
elasticsearch {
hosts => ["http://es-node1:9200"]
index => "customers_idx"
document_id => "%{customer_id}"
}
}
}
Critical configuration considerations:
By default, if tracking_column is omitted, Logstash tracks the pipeline execution timestamp. To track a specific database column like modified_at, explicit set use_column_value => true.
Enabling record_last_run => true writes the most recent value of the tracked column to the file specified by last_run_metadata_path.
During incremental synchronization, the referenced SQL file must include the :sql_last_value placeholder in its WHERE clause, combined with an ORDER BY directive on the tracked column ascending. For instance:
SELECT * FROM app_users WHERE modified_at > :sql_last_value ORDER BY modified_at ASC
This logic fetches only newly added or updated rows and correctly advances the tracking state stored in the metadata file.