Designing a Partitioned Database Index for Offline Geospatial Disk Archiving
This article details the design and implementation of a database system for indexing and managing large volumes of geospatial data stored across multiple offline hard drives. The system uses PostgreSQL with automated table partitioning to efficiently handle billions of file records.
System Architecture and Technology Stack
The solution is built using Python for data processing and PostgreSQL for storage. Key libraries include psycopg2 for database interaction and concurrent.futures for parallel file scanning.
Database Schema Design
The core of the system is a partitioned table designed to scale with the number of hard drives.
Master Hard Drive Registry
A master table storess metadata for each physical hard drive.
CREATE TABLE storage_devices (
device_id SERIAL PRIMARY KEY,
volume_label VARCHAR(255) NOT NULL UNIQUE,
serial_number VARCHAR(100) UNIQUE,
total_capacity_gb FLOAT4,
used_capacity_gb FLOAT4,
physical_location VARCHAR(255),
device_status VARCHAR(50) DEFAULT 'ACTIVE',
encryption_key VARCHAR(255),
notes TEXT,
registered_at TIMESTAMPTZ DEFAULT NOW()
);
COMMENT ON TABLE storage_devices IS 'Registry for physical storage devices';
Partitioned File Index Table
The main file index is partitioned by device_id using PostgreSQL's list partitioning. This design allows each hard drive's data to reside in a separate physical table, improving query performance and maintenance.
CREATE TABLE geospatial_file_index (
record_id BIGINT NOT NULL,
device_id INTEGER NOT NULL,
absolute_path TEXT NOT NULL,
directory_path TEXT NOT NULL,
filename TEXT NOT NULL,
file_extension VARCHAR(255) NOT NULL,
size_mb FLOAT(8),
content_hash VARCHAR(255),
spatial_extent GEOMETRY(GEOMETRY, 4326),
original_crs VARCHAR(100),
technical_metadata JSONB,
acquisition_time TIMESTAMPTZ,
last_modified TIMESTAMPTZ,
indexed_time TIMESTAMPTZ DEFAULT NOW(),
user_notes TEXT,
PRIMARY KEY (device_id, record_id),
CONSTRAINT unique_file_per_device UNIQUE (device_id, absolute_path)
) PARTITION BY LIST (device_id);
COMMENT ON TABLE geospatial_file_index IS 'Master index for geospatial files, partitioned by storage device';
Index Strategy
Appropriate indexes are created on the partitioned table to support common query patterns.
-- Spatial index for geographic queries
CREATE INDEX idx_spatial_extent ON geospatial_file_index USING GIST (spatial_extent);
-- Index for filtering by file type
CREATE INDEX idx_file_extension ON geospatial_file_index (file_extension);
-- GIN index for querying JSON metadata
CREATE INDEX idx_tech_metadata ON geospatial_file_index USING GIN (technical_metadata);
-- Pattern matching indexes for directory and filename searches
CREATE INDEX idx_directory_path ON geospatial_file_index (directory_path text_pattern_ops);
CREATE INDEX idx_filename ON geospatial_file_index (filename text_pattern_ops);
A global sequence is created to generate unique IDs across all partitions.
CREATE SEQUENCE geospatial_file_index_id_seq AS BIGINT;
ALTER TABLE geospatial_file_index ALTER COLUMN record_id SET DEFAULT nextval('geospatial_file_index_id_seq');
ALTER SEQUENCE geospatial_file_index_id_seq OWNED BY geospatial_file_index.record_id;
Automated Partition Management
A trigger automatically creates a new partition when a file from a previously unindexed hard drive is enserted.
CREATE OR REPLACE FUNCTION create_partition_on_insert()
RETURNS TRIGGER AS $$
DECLARE
partition_table_name TEXT;
BEGIN
partition_table_name := 'geospatial_file_index_device_' || NEW.device_id;
IF NOT EXISTS (SELECT 1 FROM pg_class WHERE relname = partition_table_name) THEN
BEGIN
EXECUTE format(
'CREATE TABLE %I PARTITION OF geospatial_file_index FOR VALUES IN (%L)',
partition_table_name,
NEW.device_id
);
RAISE NOTICE 'New partition % created.', partition_table_name;
EXCEPTION
WHEN duplicate_table THEN
RAISE NOTICE 'Partition % already exists.', partition_table_name;
END;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_auto_create_partition
BEFORE INSERT ON geospatial_file_index
FOR EACH ROW
EXECUTE FUNCTION create_partition_on_insert();
Python Implementation for Disk Indexing
Database Connection Handler
A robust database connection class manages connections and includes retry logic.
import psycopg2
from psycopg2.extras import RealDictCursor
from typing import Optional, Dict, Any
class DatabaseConnector:
def __init__(self, connection_params: Dict[str, str]):
self.connection_params = connection_params
self.connection = None
self.cursor = None
def establish_connection(self) -> None:
try:
self.connection = psycopg2.connect(**self.connection_params)
self.cursor = self.connection.cursor(cursor_factory=RealDictCursor)
except Exception as conn_error:
raise ConnectionError(f"Failed to connect: {conn_error}")
def verify_connection(self) -> bool:
if self.connection is None or self.connection.closed:
try:
self.establish_connection()
return True
except Exception:
return False
try:
self.cursor.execute("SELECT 1")
return True
except Exception:
try:
self.establish_connection()
return True
except Exception:
return False
def run_query(self, sql_statement: str, parameters: Optional[tuple] = None) -> None:
retry_attempts = 0
max_retries = 3
while retry_attempts < max_retries:
try:
if not self.verify_connection():
raise RuntimeError("Database connection is not available")
self.cursor.execute(sql_statement, parameters)
self.connection.commit()
return
except psycopg2.OperationalError:
retry_attempts += 1
if retry_attempts >= max_retries:
self.connection.rollback()
raise
try:
self.establish_connection()
except Exception:
pass
except Exception as query_error:
self.connection.rollback()
raise RuntimeError(f"Query execution failed: {query_error}")
def __enter__(self):
self.establish_connection()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.cursor:
self.cursor.close()
if self.connection:
self.connection.close()
File Indexing Engine
The indexing engine scans a hard drive, extracts file metadata, and writes it to the partitioned database table.
import os
import hashlib
from datetime import datetime
from dataclasses import dataclass
from typing import Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
@dataclass
class FileMetadata:
device_identifier: int
full_path: str
parent_folder: str
name: str
extension: str
size_bytes: float
hash_value: Optional[str]
modification_time: datetime
creation_time: Optional[datetime] = None
comments: Optional[str] = None
class DriveIndexer:
def __init__(self, db_client, device_id: int, scan_root: str, worker_threads: int = 4):
self.db = db_client
self.target_device_id = device_id
self.root_directory = scan_root
self.worker_count = worker_threads
self.files_processed = 0
self.total_file_count = 0
self.progress_lock = threading.Lock()
def compute_hash(self, file_location: str, block_size: int = 8192, limit_mb: int = 10) -> str:
try:
file_size = os.path.getsize(file_location)
hash_algorithm = hashlib.sha256()
max_bytes = limit_mb * 1024 * 1024
with open(file_location, "rb") as file_handle:
if file_size > max_bytes:
remaining = max_bytes
while remaining > 0:
read_block = min(block_size, remaining)
data_chunk = file_handle.read(read_block)
if not data_chunk:
break
hash_algorithm.update(data_chunk)
remaining -= len(data_chunk)
hash_algorithm.update(str(file_size).encode())
else:
for data_block in iter(lambda: file_handle.read(block_size), b""):
hash_algorithm.update(data_block)
return hash_algorithm.hexdigest()
except Exception as hash_error:
return f"hash_error_{hash(file_location)}"
def analyze_single_file(self, file_details: tuple) -> Optional[FileMetadata]:
folder_path, base_name = file_details
complete_path = os.path.join(folder_path, base_name)
try:
stat_info = os.stat(complete_path)
size_in_mb = stat_info.st_size / (1024 * 1024)
mod_time = datetime.fromtimestamp(stat_info.st_mtime)
create_time = datetime.fromtimestamp(stat_info.st_ctime)
file_hash = self.compute_hash(complete_path)
return FileMetadata(
device_identifier=self.target_device_id,
full_path=complete_path,
parent_folder=folder_path,
name=base_name,
extension=os.path.splitext(base_name)[1].lower().lstrip('.'),
size_bytes=size_in_mb,
hash_value=file_hash,
modification_time=mod_time,
creation_time=create_time
)
except Exception as proc_error:
print(f"Error processing {complete_path}: {proc_error}")
return None
def scan_and_process(self):
file_list = []
for current_dir, _, files_in_dir in os.walk(self.root_directory):
for fname in files_in_dir:
file_list.append((current_dir, fname))
self.total_file_count = len(file_list)
print(f"Found {self.total_file_count} files to index.")
with ThreadPoolExecutor(max_workers=self.worker_count) as executor:
future_map = {executor.submit(self.analyze_single_file, f): f for f in file_list}
for future_task in as_completed(future_map):
try:
file_meta = future_task.result()
if file_meta:
with self.progress_lock:
self.files_processed += 1
if self.files_processed % 500 == 0:
percent = (self.files_processed / self.total_file_count) * 100
print(f"Progress: {percent:.1f}% ({self.files_processed}/{self.total_file_count})")
yield file_meta
except Exception as task_error:
print(f"Task error: {task_error}")
def execute_indexing(self) -> None:
insert_query = """
INSERT INTO geospatial_file_index (
device_id, absolute_path, directory_path, filename, file_extension,
size_mb, content_hash, last_modified, acquisition_time
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (device_id, absolute_path) DO NOTHING;
"""
start_timestamp = datetime.now()
records_inserted = 0
batch = []
batch_limit = 1000
for metadata_record in self.scan_and_process():
batch.append((
metadata_record.device_identifier,
metadata_record.full_path,
metadata_record.parent_folder,
metadata_record.name,
metadata_record.extension,
metadata_record.size_bytes,
metadata_record.hash_value,
metadata_record.modification_time,
metadata_record.creation_time
))
if len(batch) >= batch_limit:
self.db.run_query(insert_query, batch)
records_inserted += len(batch)
batch.clear()
print(f"Indexed {records_inserted} records...")
if batch:
self.db.run_query(insert_query, batch)
records_inserted += len(batch)
end_timestamp = datetime.now()
duration_seconds = (end_timestamp - start_timestamp).total_seconds()
print(f"Indexing complete. Total files: {records_inserted}")
print(f"Time elapsed: {duration_seconds:.2f} seconds")
if duration_seconds > 0:
print(f"Rate: {records_inserted/duration_seconds:.2f} files/second")
Data Classification and Metadata Extraction
The system is designed to identify and categorize common geospatial data formats, though the specific extraction logic for each format (e.g., GeoTIFF, Shapefile, KMZ) is implemented separate. The technical_metadata JSONB column in the index table stores format-specific attributes extracted during the indexing process.