Automated IP Blocking Pipeline with Dynamic Whitelist and Threshold Filtering
This solution implements a hybrid shell-Python architecture for automatically blocking malicious IP addresses based on connection frequency thresholds while respecting CIDR-based whitelists and preventing duplicate firewall rules.
Architecture Oevrview
The pipeline separates system-level operations (log parrsing and firewall manipulation) from business logic (IP validation and whitelist processing). The shell component handles volatile temporary files and iptables interactions, while Python manages address parsing, network overlap detection, and set operations for efficient flitering.
Shell Component: Log Extraction and Firewall Orchestration
The bash wrapper manages environment initialization, extracts IP frequencies from system logs, captures current firewall state, and applies incremental blocks.
#!/bin/bash
set -euo pipefail
readonly WORKDIR="/var/lib/ipguard"
readonly AGGREGATED="${WORKDIR}/conn_freq.txt"
readonly SNAPSHOT="${WORKDIR}/existing_blocks.txt"
readonly BLOCKLIST="${WORKDIR}/to_block.txt"
readonly WHITELIST="${WORKDIR}/exceptions.conf"
readonly LOGFILE="/var/log/ipguard.log"
log_event() {
local severity="$1"
local message="$2"
printf '%s [%s] %s\n' "$(date '+%Y-%m-%d %H:%M:%S')" "${severity}" "${message}" | tee -a "${LOGFILE}"
}
parse_auth_logs() {
local target_log="${1:-/var/log/secure}"
local regex_pattern="${2:-'Invalid user'}"
grep -E "${regex_pattern}" "${target_log}" 2>/dev/null | \
grep -oE '\b([0-9]{1,3}\.){3}[0-9]{1,3}\b' | \
sort | uniq -c | sort -rn > "${AGGREGATED}"
log_event "INFO" "Parsed $(wc -l < "${AGGREGATED}") unique addresses"
}
capture_current_blocks() {
iptables -L INPUT -n | awk '/DROP/ {print $4}' | grep -E '^[0-9]' > "${SNAPSHOT}"
}
apply_blocks() {
local blocked_count=0
while IFS= read -r addr; do
[[ -z "${addr}" ]] && continue
if ! iptables -C INPUT -s "${addr}" -j DROP 2>/dev/null; then
iptables -I INPUT -s "${addr}" -j DROP -m comment --comment "auto-blocked"
log_event "BLOCK" "${addr}"
((blocked_count++))
fi
done < "${BLOCKLIST}"
log_event "INFO" "Inserted ${blocked_count} new rules"
}
run_pipeline() {
mkdir -p "${WORKDIR}"
log_event "INFO" "Starting IP filtering cycle"
parse_auth_logs "$1" "authentication failure"
capture_current_blocks
python3 /usr/local/bin/filter_engine.py \
10 \
"${WHITELIST}" \
"${AGGREGATED}" \
"${SNAPSHOT}" \
"${BLOCKLIST}"
if [[ -s "${BLOCKLIST}" ]]; then
apply_blocks
else
log_event "INFO" "No action required"
fi
rm -f "${AGGREGATED}" "${SNAPSHOT}" "${BLOCKLIST}"
}
case "${1:-}" in
start) run_pipeline "${2:-/var/log/auth.log}" ;;
*) echo "Usage: $0 start [log_path]" ; exit 1 ;;
esac
Python Component: Intelligent Filtering Engine
The Python script utilizes the ipaddress module for proper CIDR handling, performs set difference operations for O(1) lookup complexity, and validates address formats.
import argparse
import ipaddress
import sys
from pathlib import Path
from typing import Set, Tuple, Dict
class IPFilterEngine:
def __init__(self, threshold: int):
self.threshold = threshold
self.excluded_hosts: Set[str] = set()
self.excluded_networks: Set[ipaddress.IPv4Network] = set()
def load_exceptions(self, whitelist_path: str) -> None:
"""Parse whitelist containing individual IPs or CIDR notation."""
path = Path(whitelist_path)
if not path.exists():
return
with open(path, 'r') as f:
for line in f:
entry = line.strip()
if not entry or entry.startswith('#'):
continue
try:
if '/' in entry:
self.excluded_networks.add(
ipaddress.ip_network(entry, strict=False)
)
else:
self.excluded_hosts.add(entry)
except ValueError:
continue
def parse_frequency_log(self, log_path: str) -> Dict[str, int]:
"""Extract IPs exceeding threshold from aggregated log."""
violators = {}
with open(log_path, 'r') as f:
for record in f:
parts = record.strip().split()
if len(parts) < 2:
continue
try:
count, ip = int(parts[0]), parts[1]
if count >= self.threshold:
violators[ip] = count
except ValueError:
continue
return violators
def is_whitelisted(self, ip_str: str) -> bool:
"""Check if IP matches whitelist or belongs to exempted subnet."""
if ip_str in self.excluded_hosts:
return True
try:
addr = ipaddress.ip_address(ip_str)
return any(addr in network for network in self.excluded_networks)
except ValueError:
return True
def filter_addresses(self, candidates: Dict[str, int],
existing_blocks: Set[str]) -> Set[str]:
"""Return IPs to block after applying all filters."""
to_block = set()
for ip, count in candidates.items():
if ip in existing_blocks:
continue
if self.is_whitelisted(ip):
continue
to_block.add(ip)
return to_block
@staticmethod
def load_existing_blocks(blocklist_path: str) -> Set[str]:
"""Load currently blocked IPs from iptables snapshot."""
blocks = set()
path = Path(blocklist_path)
if path.exists():
with open(path, 'r') as f:
blocks = {line.strip() for line in f if line.strip()}
return blocks
def process(self, whitelist: str, frequency_log: str,
current_blocks: str, output: str) -> int:
"""Execute full filtering pipeline."""
self.load_exceptions(whitelist)
candidates = self.parse_frequency_log(frequency_log)
blocked = self.load_existing_blocks(current_blocks)
new_blocks = self.filter_addresses(candidates, blocked)
with open(output, 'w') as f:
for ip in sorted(new_blocks):
f.write(f"{ip}\n")
return len(new_blocks)
def main():
parser = argparse.ArgumentParser(
description='Filter IP addresses for firewall blocking'
)
parser.add_argument('threshold', type=int,
help='Minimum connection count to trigger block')
parser.add_argument('whitelist', help='Path to whitelist file')
parser.add_argument('frequency_log', help='Path to aggregated IP counts')
parser.add_argument('current_blocks', help='Path to existing iptables list')
parser.add_argument('output', help='Path to write new block list')
args = parser.parse_args()
engine = IPFilterEngine(args.threshold)
count = engine.process(
args.whitelist,
args.frequency_log,
args.current_blocks,
args.output
)
print(f"Processed: {count} addresses queued for blocking")
return 0
if __name__ == "__main__":
sys.exit(main())
Configuration and Whitelist Format
Create /var/lib/ipguard/exceptions.conf with the following format:
# Individual IPs
192.168.1.100
10.0.0.50
# CIDR Networks
172.16.0.0/12
203.0.113.0/24
Integration
Schedule via cron for automated execution every 5 minutes:
*/5 * * * * /usr/local/bin/block_ips.sh start /var/log/secure
The system efficiently handles incremental updates by comparing against active iptables rules, ensuring only new violators are processed while maintaining persistent whitelist exclusions across execution cycles.