Fading Coder

One Final Commit for the Last Sprint

Home > Notes > Content

Automated IP Blocking Pipeline with Dynamic Whitelist and Threshold Filtering

Notes 1

This solution implements a hybrid shell-Python architecture for automatically blocking malicious IP addresses based on connection frequency thresholds while respecting CIDR-based whitelists and preventing duplicate firewall rules.

Architecture Oevrview

The pipeline separates system-level operations (log parrsing and firewall manipulation) from business logic (IP validation and whitelist processing). The shell component handles volatile temporary files and iptables interactions, while Python manages address parsing, network overlap detection, and set operations for efficient flitering.

Shell Component: Log Extraction and Firewall Orchestration

The bash wrapper manages environment initialization, extracts IP frequencies from system logs, captures current firewall state, and applies incremental blocks.

#!/bin/bash
set -euo pipefail

readonly WORKDIR="/var/lib/ipguard"
readonly AGGREGATED="${WORKDIR}/conn_freq.txt"
readonly SNAPSHOT="${WORKDIR}/existing_blocks.txt"
readonly BLOCKLIST="${WORKDIR}/to_block.txt"
readonly WHITELIST="${WORKDIR}/exceptions.conf"
readonly LOGFILE="/var/log/ipguard.log"

log_event() {
    local severity="$1"
    local message="$2"
    printf '%s [%s] %s\n' "$(date '+%Y-%m-%d %H:%M:%S')" "${severity}" "${message}" | tee -a "${LOGFILE}"
}

parse_auth_logs() {
    local target_log="${1:-/var/log/secure}"
    local regex_pattern="${2:-'Invalid user'}"
    
    grep -E "${regex_pattern}" "${target_log}" 2>/dev/null | \
        grep -oE '\b([0-9]{1,3}\.){3}[0-9]{1,3}\b' | \
        sort | uniq -c | sort -rn > "${AGGREGATED}"
    
    log_event "INFO" "Parsed $(wc -l < "${AGGREGATED}") unique addresses"
}

capture_current_blocks() {
    iptables -L INPUT -n | awk '/DROP/ {print $4}' | grep -E '^[0-9]' > "${SNAPSHOT}"
}

apply_blocks() {
    local blocked_count=0
    while IFS= read -r addr; do
        [[ -z "${addr}" ]] && continue
        if ! iptables -C INPUT -s "${addr}" -j DROP 2>/dev/null; then
            iptables -I INPUT -s "${addr}" -j DROP -m comment --comment "auto-blocked"
            log_event "BLOCK" "${addr}"
            ((blocked_count++))
        fi
    done < "${BLOCKLIST}"
    log_event "INFO" "Inserted ${blocked_count} new rules"
}

run_pipeline() {
    mkdir -p "${WORKDIR}"
    log_event "INFO" "Starting IP filtering cycle"
    
    parse_auth_logs "$1" "authentication failure"
    capture_current_blocks
    
    python3 /usr/local/bin/filter_engine.py \
        10 \
        "${WHITELIST}" \
        "${AGGREGATED}" \
        "${SNAPSHOT}" \
        "${BLOCKLIST}"
    
    if [[ -s "${BLOCKLIST}" ]]; then
        apply_blocks
    else
        log_event "INFO" "No action required"
    fi
    
    rm -f "${AGGREGATED}" "${SNAPSHOT}" "${BLOCKLIST}"
}

case "${1:-}" in
    start) run_pipeline "${2:-/var/log/auth.log}" ;;
    *) echo "Usage: $0 start [log_path]" ; exit 1 ;;
esac

Python Component: Intelligent Filtering Engine

The Python script utilizes the ipaddress module for proper CIDR handling, performs set difference operations for O(1) lookup complexity, and validates address formats.

import argparse
import ipaddress
import sys
from pathlib import Path
from typing import Set, Tuple, Dict

class IPFilterEngine:
    def __init__(self, threshold: int):
        self.threshold = threshold
        self.excluded_hosts: Set[str] = set()
        self.excluded_networks: Set[ipaddress.IPv4Network] = set()
    
    def load_exceptions(self, whitelist_path: str) -> None:
        """Parse whitelist containing individual IPs or CIDR notation."""
        path = Path(whitelist_path)
        if not path.exists():
            return
            
        with open(path, 'r') as f:
            for line in f:
                entry = line.strip()
                if not entry or entry.startswith('#'):
                    continue
                    
                try:
                    if '/' in entry:
                        self.excluded_networks.add(
                            ipaddress.ip_network(entry, strict=False)
                        )
                    else:
                        self.excluded_hosts.add(entry)
                except ValueError:
                    continue
    
    def parse_frequency_log(self, log_path: str) -> Dict[str, int]:
        """Extract IPs exceeding threshold from aggregated log."""
        violators = {}
        with open(log_path, 'r') as f:
            for record in f:
                parts = record.strip().split()
                if len(parts) < 2:
                    continue
                try:
                    count, ip = int(parts[0]), parts[1]
                    if count >= self.threshold:
                        violators[ip] = count
                except ValueError:
                    continue
        return violators
    
    def is_whitelisted(self, ip_str: str) -> bool:
        """Check if IP matches whitelist or belongs to exempted subnet."""
        if ip_str in self.excluded_hosts:
            return True
            
        try:
            addr = ipaddress.ip_address(ip_str)
            return any(addr in network for network in self.excluded_networks)
        except ValueError:
            return True
    
    def filter_addresses(self, candidates: Dict[str, int], 
                        existing_blocks: Set[str]) -> Set[str]:
        """Return IPs to block after applying all filters."""
        to_block = set()
        
        for ip, count in candidates.items():
            if ip in existing_blocks:
                continue
            if self.is_whitelisted(ip):
                continue
            to_block.add(ip)
            
        return to_block
    
    @staticmethod
    def load_existing_blocks(blocklist_path: str) -> Set[str]:
        """Load currently blocked IPs from iptables snapshot."""
        blocks = set()
        path = Path(blocklist_path)
        if path.exists():
            with open(path, 'r') as f:
                blocks = {line.strip() for line in f if line.strip()}
        return blocks
    
    def process(self, whitelist: str, frequency_log: str, 
                current_blocks: str, output: str) -> int:
        """Execute full filtering pipeline."""
        self.load_exceptions(whitelist)
        candidates = self.parse_frequency_log(frequency_log)
        blocked = self.load_existing_blocks(current_blocks)
        
        new_blocks = self.filter_addresses(candidates, blocked)
        
        with open(output, 'w') as f:
            for ip in sorted(new_blocks):
                f.write(f"{ip}\n")
                
        return len(new_blocks)

def main():
    parser = argparse.ArgumentParser(
        description='Filter IP addresses for firewall blocking'
    )
    parser.add_argument('threshold', type=int, 
                       help='Minimum connection count to trigger block')
    parser.add_argument('whitelist', help='Path to whitelist file')
    parser.add_argument('frequency_log', help='Path to aggregated IP counts')
    parser.add_argument('current_blocks', help='Path to existing iptables list')
    parser.add_argument('output', help='Path to write new block list')
    
    args = parser.parse_args()
    
    engine = IPFilterEngine(args.threshold)
    count = engine.process(
        args.whitelist,
        args.frequency_log,
        args.current_blocks,
        args.output
    )
    
    print(f"Processed: {count} addresses queued for blocking")
    return 0

if __name__ == "__main__":
    sys.exit(main())

Configuration and Whitelist Format

Create /var/lib/ipguard/exceptions.conf with the following format:

# Individual IPs
192.168.1.100
10.0.0.50

# CIDR Networks
172.16.0.0/12
203.0.113.0/24

Integration

Schedule via cron for automated execution every 5 minutes:

*/5 * * * * /usr/local/bin/block_ips.sh start /var/log/secure

The system efficiently handles incremental updates by comparing against active iptables rules, ensuring only new violators are processed while maintaining persistent whitelist exclusions across execution cycles.

Related Articles

Designing Alertmanager Templates for Prometheus Notifications

How to craft Alertmanager templates to format alert messages, improving clarity and presentation. Alertmanager uses Go’s text/template engine with additional helper functions. Alerting rules referenc...

Deploying a Maven Web Application to Tomcat 9 Using the Tomcat Manager

Tomcat 9 does not provide a dedicated Maven plugin. The Tomcat Manager interface, however, is backward-compatible, so the Tomcat 7 Maven Plugin can be used to deploy to Tomcat 9. This guide shows two...

Skipping Errors in MySQL Asynchronous Replication

When a replica halts because the SQL thread encounters an error, you can resume replication by skipping the problematic event(s). Two common approaches are available. Methods to Skip Errors 1) Skip a...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.