Downloading and Decrypting AES-128 Encrypted HLS (M3U8) Streams with Python
This guide shows how to inspect an HLS master playlist, locate the actual media playlist, fetch the encryption key, and downloda/decrypt all segment using Python. Both a simple single-threaded downloadre and a faster multi-threaded variant are provided.
Requirements
pip install m3u8 requests pycryptodome
Quick inspection of an M3U8 URL
Some HLS URLs point to a master playlist that references one or more media playlists. First, check whether an M3U8 is a master playlist and obtain the media playlist URL.
import m3u8
UA_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
)
}
master_url = 'https://vod8.wenshibaowenbei.com/20210628/g4yNLlI7/index.m3u8'
pl = m3u8.load(uri=master_url, headers=UA_HEADERS)
print('is_variant:', pl.is_variant)
if pl.is_variant:
for p in pl.playlists:
print('variant:', p.absolute_uri, p.stream_info.bandwidth, p.stream_info.resolution)
If it’s a master playlist, pick a variant and load it to get segments.
Resolve a master playlist to a media playlist
import m3u8
UA_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
)
}
def resolve_media_playlist(url: str) -> str:
playlist = m3u8.load(uri=url, headers=UA_HEADERS)
if playlist.is_variant:
# Choose the first variant (or implement your own selection logic)
return playlist.playlists[0].absolute_uri
return url
media_url = resolve_media_playlist(master_url)
print(media_url)
Inspect key information
For AES-128-encrypted HLS, the media playlist provides the key URI and optional IV.
import m3u8
media = m3u8.load(uri=media_url, headers=UA_HEADERS)
key_obj = media.keys[-1] if media.keys else None
if key_obj:
print('key_uri:', getattr(key_obj, 'absolute_uri', None) or key_obj.uri)
print('method:', key_obj.method)
print('iv:', key_obj.iv)
else:
print('no encryption key; stream likely unencrypted')
Helpers for AES-128 decryption and IV handling
import requests
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
from urllib.parse import urljoin
UA_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
)
}
def fetch_key_bytes(session: requests.Session, media_url: str, key_obj) -> bytes | None:
if not key_obj:
return None
key_url = getattr(key_obj, 'absolute_uri', None) or urljoin(media_url, key_obj.uri)
resp = session.get(key_url, headers=UA_HEADERS, timeout=20)
resp.raise_for_status()
return resp.content
def derive_iv(key_obj, seq_number: int) -> bytes:
"""Return a 16-byte IV. If the playlist specifies an IV, parse it; otherwise use the big-endian segment sequence."""
if key_obj and key_obj.iv:
hex_str = key_obj.iv.lower().replace('0x', '')
iv = bytes.fromhex(hex_str)
if len(iv) < 16:
iv = (b"\x00" * (16 - len(iv))) + iv
elif len(iv) > 16:
iv = iv[-16:]
return iv
return int(seq_number).to_bytes(16, byteorder='big')
def aes128_cbc_decrypt(ct: bytes, key_bytes: bytes, iv_bytes: bytes) -> bytes:
cipher = AES.new(key_bytes, AES.MODE_CBC, iv_bytes)
pt = cipher.decrypt(ct)
# Try to remove PKCS#7 padding if present; keep raw if unpadding fails.
try:
return unpad(pt, AES.block_size)
except ValueError:
return pt
Single-threaded end-to-end download
import os
import time
import m3u8
import requests
from urllib.parse import urljoin
UA_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
)
}
def download_hls_serial(master_or_media_url: str, output_file: str) -> None:
session = requests.Session()
# Resolve to media playlist if necessary
media_url = resolve_media_playlist(master_or_media_url)
# Load playlist and key
playlist = m3u8.load(uri=media_url, headers=UA_HEADERS)
key_obj = playlist.keys[-1] if playlist.keys else None
key_bytes = fetch_key_bytes(session, media_url, key_obj) if key_obj else None
base_seq = playlist.media_sequence or 0
total = len(playlist.segments)
# Start clean
with open(output_file, 'wb'):
pass
start = time.time()
total_written = 0
for idx, seg in enumerate(playlist.segments):
seg_url = seg.absolute_uri
resp = session.get(seg_url, headers=UA_HEADERS, timeout=30)
resp.raise_for_status()
chunk = resp.content
if key_bytes:
iv = derive_iv(key_obj, base_seq + idx)
chunk = aes128_cbc_decrypt(chunk, key_bytes, iv)
with open(output_file, 'ab') as fp:
fp.write(chunk)
total_written += len(chunk)
elapsed = time.time() - start
print(f"\r[{idx + 1}/{total}] {total_written/1024/1024:.2f} MB in {elapsed:.1f}s", end="")
print()
# Example
# download_hls_serial('https://vod8.wenshibaowenbei.com/20210628/g4yNLlI7/index.m3u8', 'video_serial.mp4')
Multi-threaded download with merge
Download each segment to a temporary folder, then concatenate in order.
import os
import glob
import shutil
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
import m3u8
import requests
UA_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
)
}
TMP_DIR = Path('chunks_tmp')
def _download_one(session: requests.Session, seg_url: str, out_path: Path, key_bytes: bytes | None, iv_bytes: bytes | None):
r = session.get(seg_url, headers=UA_HEADERS, timeout=30)
r.raise_for_status()
data = r.content
if key_bytes and iv_bytes:
data = aes128_cbc_decrypt(data, key_bytes, iv_bytes)
out_path.write_bytes(data)
def download_hls_parallel(master_or_media_url: str, output_file: str, workers: int = 10) -> None:
media_url = resolve_media_playlist(master_or_media_url)
playlist = m3u8.load(uri=media_url, headers=UA_HEADERS)
key_obj = playlist.keys[-1] if playlist.keys else None
session = requests.Session()
key_bytes = fetch_key_bytes(session, media_url, key_obj) if key_obj else None
base_seq = playlist.media_sequence or 0
TMP_DIR.mkdir(exist_ok=True)
tasks = []
with ThreadPoolExecutor(max_workers=workers) as pool:
for idx, seg in enumerate(playlist.segments):
seg_url = seg.absolute_uri
iv = derive_iv(key_obj, base_seq + idx) if key_bytes else None
part_path = TMP_DIR / f"{idx:05d}.ts"
fut = pool.submit(_download_one, session, seg_url, part_path, key_bytes, iv)
tasks.append(fut)
# Optional: simple progress
for i, fut in enumerate(as_completed(tasks), 1):
fut.result() # propagate exceptions
print(f"\rdownloaded {i}/{len(tasks)}", end="")
print()
# Merge in numeric order
with open(output_file, 'wb') as out_fp:
for p in sorted(TMP_DIR.glob('*.ts')):
with open(p, 'rb') as in_fp:
shutil.copyfileobj(in_fp, out_fp, length=1024 * 1024)
print(f"\rmerged {p.name}", end="")
print()
# Cleanup
shutil.rmtree(TMP_DIR, ignore_errors=True)
# Example
# download_hls_parallel('https://vod8.wenshibaowenbei.com/20210628/g4yNLlI7/index.m3u8', 'video_parallel.mp4', workers=10)
Minimal TS-fragment test
import m3u8
import requests
from pathlib import Path
UA_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
)
}
media_url = resolve_media_playlist('https://vod8.wenshibaowenbei.com/20210628/g4yNLlI7/index.m3u8')
pl = m3u8.load(uri=media_url, headers=UA_HEADERS)
key_obj = pl.keys[-1] if pl.keys else None
with requests.Session() as s:
key_bytes = fetch_key_bytes(s, media_url, key_obj) if key_obj else None
seq0 = (pl.media_sequence or 0)
seg0 = pl.segments[0]
iv0 = derive_iv(key_obj, seq0) if key_bytes else None
r = s.get(seg0.absolute_uri, headers=UA_HEADERS, timeout=20)
r.raise_for_status()
content = r.content
if key_bytes:
content = aes128_cbc_decrypt(content, key_bytes, iv0)
Path('sample_00001.ts').write_bytes(content)
print('sample_00001.ts saved')