Automating SQL Workflows in Python with pyodbc and Pandas
SQL is excellent for querying and transforming data, but repetitive data-loading and housekeeping tasks are far easier to orchestrate from Python. This guide demonstrates a compact Python helper that connects to Microsoft SQL Server, executes ad‑hoc SQL, loads Pandas DataFrames into tables, unions multiple tables, and cleans up temporary artifacts.
Connection skeleton
import pyodbc
import pandas as pd
from datetime import date
class SqlClient:
def __init__(self, database, server="XXVIR00012,55000", driver="{ODBC Driver 17 for SQL Server}"):
self.server = server
self.database = database
self.cnxn = pyodbc.connect(
f"Driver={driver};"
f"Server={server};"
f"Database={database};"
"Trusted_Connection=yes;"
)
self.sql_log = f"-- {date.today():%Y-%m-%d}\n-- generated by Python"
Instantiating a client is a one‑liner:
db = SqlClient("database123")
Core routiens
Execute SQL, optionally returning a DataFrame
def run(self, sql, return_frame=False):
if return_frame:
return pd.read_sql_query(sql, self.cnxn)
cur = self.cnxn.cursor()
try:
cur.execute(sql)
except pyodbc.ProgrammingError as err:
print(f"Warning:\n{err}")
self.cnxn.commit()
self.sql_log += f"\n\n-- statement\n{sql}"
return "OK"
- Set
return_frame=Trueto fetch results into a Pandas DataFrame.
Load a DataFrame into a table (create + insert)
def push_frame(self, frame, table_name="raw_data", batch=500):
cur = self.cnxn.cursor()
cur.fast_executemany = True
cols = [str(c) for c in frame.columns]
col_defs = ",\n ".join(f"[{c}] varchar(255)" for c in cols)
ddl = f"CREATE TABLE [{table_name}] (\n {col_defs}\n);"
cur.execute(ddl)
self.cnxn.commit()
self.sql_log += f"\n\n-- create table\n{ddl}"
placeholders = ", ".join("?" for _ in cols)
collist = ", ".join(f"[{c}]" for c in cols)
dml = f"INSERT INTO [{table_name}] ({collist}) VALUES ({placeholders})"
# Convert NaN to None for pyodbc
frame_to_load = frame.where(pd.notnull(frame), None)
for start in range(0, len(frame_to_load), batch):
chunk = frame_to_load.iloc[start:start+batch].values.tolist()
cur.executemany(dml, chunk)
self.cnxn.commit()
- Defines all columns as
varchar(255)for simplicity. - Uses
fast_executemanyto accelerate bulk inserts.
Union multiple tables into one
def unite(self, tables, into="unioned", operator="UNION ALL"):
parts = [f"SELECT [{t}].* FROM [{t}]" for t in tables]
body = f"\n{operator}\n".join(parts)
sql = f"SELECT * INTO [{into}] FROM (\n{body}\n) AS u;"
self.run(sql)
operatorcan beUNIONorUNION ALL.
Drop one or many tables
def drop(self, tables):
if isinstance(tables, str):
tables = [tables]
for t in tables:
sql = f"IF OBJECT_ID(N'[{t}]', N'U') IS NOT NULL DROP TABLE [{t}];"
self.run(sql)
End‑to‑end pipeline example
Task: import a folder of CSVs, merge them into a single table, remove the staging tables, then split the merged table into multiple tables based on a category column.
from pathlib import Path
import pandas as pd
# Initialize SQL client
db = SqlClient("database123")
# 1) Import all CSVs from a directory
src_dir = Path(r"C:\\User\\medium\\data")
files = sorted(src_dir.glob("*.csv"))
for fp in files:
df = pd.read_csv(fp)
db.push_frame(df, table_name=fp.stem) # create table per file and load
# 2) Union the per-file tables into one
staging_tables = [fp.stem for fp in files]
db.unite(staging_tables, into="generic_jan", operator="UNION ALL")
# 3) Remove the staging tables
db.drop(staging_tables)
# 4) Split into tables by category in colX
cats = db.run("SELECT DISTINCT colX AS category FROM generic_jan", return_frame=True)["category"].dropna().tolist()
for cat in cats:
db.run(f"SELECT * INTO generic_jan_{cat} FROM generic_jan WHERE colX = '{cat}'")
This setup lets Python handle file I/O and iteration, while SQL Server remains responsible for set operations and persistence.