Practical Cryptography and Network Security Techniques in Python
import hashlib
# Secure password hashing with salt and iteration
secret = b'base_secret'
salt = b'random_salt_2024'
iterations = 100_000
hashed = hashlib.pbkdf2_hmac('sha256', b'user_password', salt, iterations)
print(f'PBKDF2 hash (hex): {hashed.hex()[:32]}...')
# Verification logic using constant-time comparison
def verify_password(provided: bytes, stored_hash: bytes, salt: bytes) -> bool:
candidate = hashlib.pbkdf2_hmac('sha256', provided, salt, iterations)
return hmac.compare_digest(candidate, stored_hash)
stored = hashed
is_valid = verify_password(b'user_password', stored, salt)
print(f'Authentication result: {is_valid}')
Network Interaction and Reconnaissance
1. Crafting Custom Packets with Scapy
Scapy enables precise control over packet construction for analysis or simulation.
from scapy.all import IP, TCP, send, RandShort
victim = '192.168.1.100'
probe_port = 443
# Send a single SYN probe to test reachability
syn_packet = IP(dst=victim) / TCP(dport=probe_port, flags='S', sport=RandShort())
send(syn_packet, verbose=0, timeout=1)
2. Concurrent Port Enumeration
Improve scanning efficiency by leveraging threading instead of sequential iteration.
import socket
import threading
from concurrent.futures import ThreadPoolExecutor
def check_port(host: str, port: int) -> tuple[int, bool]:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(0.8)
status = sock.connect_ex((host, port)) == 0
sock.close()
return port, status
except Exception:
return port, False
target = '192.168.1.1'
open_ports = []
with ThreadPoolExecutor(max_workers=50) as executor:
futures = [executor.submit(check_port, target, p) for p in range(22, 1025)]
for future in futures:
port, is_open = future.result()
if is_open:
open_ports.append(port)
print(f'Open ports on {target}: {sorted(open_ports)}')
Secure Communication and Cryptographic Operations
1. Asymmetric Signing with Modern Primitives
Use cryptography library (a more activley maintained alternative to PyCryptodome) for signing with RSA-PSS.
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes, serialization
# Generate key pair
private_key = rsa.generate_private_key(public_exponent=65537, key_size=3072)
public_key = private_key.public_key()
# Sign message
message = b"Confidential payload"
signature = private_key.sign(
message,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
# Verify signature
try:
public_key.verify(signature, message, padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
), hashes.SHA256())
print("Signature validation passed.")
except Exception:
print("Signature validation failed.")
2. TLS Client with Certificate Validation
Establish encrypted connections while enforcing strict certificate checks.
import ssl
import socket
context = ssl.create_default_context() # Enables cert verification & secure defaults
with socket.create_connection(('httpbin.org', 443)) as sock:
with context.wrap_socket(sock, server_hostname='httpbin.org') as ssock:
ssock.send(b"GET /headers HTTP/1.1\r\nHost: httpbin.org\r\nConnection: close\r\n\r\n")
response = ssock.recv(2048)
print(response.decode().split('\n\n')[0])
Defensive Programming and Input Safety
1. Parameterized Queries to Prevent SQL Injection
Leverage SQLAlchemy’s parameter binding to eliminate injection vectors.
from sqlalchemy import create_engine, text
engine = create_engine('sqlite:///app.db')
# Safe query using bound parameters
with engine.connect() as conn:
stmt = text("SELECT * FROM users WHERE email = :email AND active = :active")
result = conn.execute(stmt, {"email": "admin@example.com", "active": True})
for row in result:
print(row)
2. Input Sanitization for Web Contexts
Apply context-aware escaping rather than generic filtering.
from html import escape
from urllib.parse import quote
user_input = '<script>alert("xss")</script> hello@world.com'
# HTML context
safe_html = escape(user_input)
# URL context
safe_url = quote(user_input)
print(f'Escaped for HTML: {safe_html}')
print(f'Encoded for URL: {safe_url}')
Traffic Inspection and Behavioral Monitoring
1. Programmatic Packet Capture with TShark
Integrate tshark via subprocess for lightweight capture and filtering.
import subprocess
import json
# Capture 10 packets and extract basic metadata
result = subprocess.run([
'tshark', '-c', '10', '-T', 'json',
'-e', 'ip.src', '-e', 'ip.dst', '-e', 'tcp.port',
'-Y', 'tcp || udp'
], capture_output=True, text=True)
if result.returncode == 0:
packets = json.loads(result.stdout)
for pkt in packets[:3]:
layers = pkt.get('_source', {}).get('layers', {})
src = layers.get('ip', {}).get('ip.ip_src', ['N/A'])[0]
dst = layers.get('ip', {}).get('ip.ip_dst', ['N/A'])[0]
print(f'Flow: {src} → {dst}')
2. Log Anomaly Detection Using Statistical Thresholds
Detect unusual authentication patterns from structured log data.
import pandas as pd
from datetime import timedelta
# Simulated auth log DataFrame
logs = pd.DataFrame([
{'timestamp': '2024-04-10 08:22:10', 'user': 'alice', 'status': 'success'},
{'timestamp': '2024-04-10 08:22:12', 'user': 'alice', 'status': 'fail'},
{'timestamp': '2024-04-10 08:22:15', 'user': 'alice', 'status': 'fail'},
{'timestamp': '2024-04-10 09:15:03', 'user': 'bob', 'status': 'success'},
])
logs['timestamp'] = pd.to_datetime(logs['timestamp'])
logs = logs.sort_values('timestamp')
# Count failures per user within 60-second windows
logs['window'] = logs['timestamp'] - timedelta(seconds=60)
failure_counts = logs[logs['status'] == 'fail'].groupby('user').size()
for user, count in failure_counts.items():
if count >= 3:
print(f'ALERT: User {user} had {count} failed logins in short interval.')