Building a Python Data Pipeline for Daily Epidemic Statistics
Fetching and Storing Web Data
This example deomnstrates a data pipeline for processing daily epidemic information. The core process involves retrieving JSON data from a web API, parsing the relevant details, and storing them in a MySQL database using batch operations.
When handling a dataset conatining multiple records, inserting them individually can be inefficient. A more effective approach is to use a batch insertion method provided by the database cursor.
# The executemany method allows efficient insertion of multiple records.
# The first argument is the SQL query with placeholders, the second is a list of tuples containing the data.
cursor.executemany(insert_query, records_list)
Implementation Code
The following script retrieves regional case data from a public health API and persists it to a database.
import requests
import json
import pymysql
def fetch_epidemic_data():
"""
Fetches and parses epidemic data from a public API endpoint.
Returns a list of tuples containing parsed regional statistics.
"""
source_url = "https://view.inews.qq.com/g2/getOnsInfo?name=disease_h5"
request_headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
response = requests.get(source_url, headers=request_headers)
primary_data = json.loads(response.text)
structured_data = json.loads(primary_data["data"])
last_update = structured_data["lastUpdateTime"]
country_list = structured_data["areaTree"]
province_list = country_list[0]["children"]
collected_records = []
for province_info in province_list:
province_name = province_info["name"]
for city_info in province_info["children"]:
city_name = city_info["name"]
total_confirmed = city_info["total"]["confirm"]
new_confirmed = city_info["today"]["confirm"]
recovered = city_info["total"]["heal"]
fatalities = city_info["total"]["dead"]
record = (last_update, province_name, city_name, total_confirmed, new_confirmed, recovered, fatalities)
collected_records.append(record)
return collected_records
def establish_db_connection():
"""Creates and returns a database connection and cursor object."""
connection = pymysql.connect(
host='localhost',
user='app_user',
password='user_password',
database='health_stats_db'
)
db_cursor = connection.cursor()
return connection, db_cursor
def close_db_resources(connection, cursor):
"""Safely closes the database cursor and connection."""
if cursor:
cursor.close()
if connection:
connection.close()
def refresh_daily_records():
"""Main function to update the database with the latest fetched data."""
db_conn, db_cur = establish_db_connection()
latest_data = fetch_epidemic_data()
insertion_query = """
INSERT INTO daily_stats
(report_time, region, city, total_cases, new_cases, recovered, deceased)
VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
try:
db_cur.executemany(insertion_query, latest_data)
db_conn.commit()
print("Data update successful.")
except Exception as error:
db_conn.rollback()
print(f"An error occurred during insertion: {error}")
finally:
close_db_resources(db_conn, db_cur)
# Execute the update process
if __name__ == "__main__":
refresh_daily_records()
Data Visualization
The stored data can be visualized using charting libraries. For instance, data retrieved from the database via a modified SQL query can be rendered using a library like ECharts to create interactive maps or graphs showing regional case distribution.