Fading Coder

An Old Coder’s Final Dance

Home > Tech > Content

Downloading and Decrypting AES-128 Encrypted HLS (M3U8) Streams with Python

Tech 2

This guide shows how to inspect an HLS master playlist, locate the actual media playlist, fetch the encryption key, and downloda/decrypt all segment using Python. Both a simple single-threaded downloadre and a faster multi-threaded variant are provided.

Requirements

pip install m3u8 requests pycryptodome

Quick inspection of an M3U8 URL

Some HLS URLs point to a master playlist that references one or more media playlists. First, check whether an M3U8 is a master playlist and obtain the media playlist URL.

import m3u8

UA_HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/120.0.0.0 Safari/537.36"
    )
}

master_url = 'https://vod8.wenshibaowenbei.com/20210628/g4yNLlI7/index.m3u8'
pl = m3u8.load(uri=master_url, headers=UA_HEADERS)

print('is_variant:', pl.is_variant)
if pl.is_variant:
    for p in pl.playlists:
        print('variant:', p.absolute_uri, p.stream_info.bandwidth, p.stream_info.resolution)

If it’s a master playlist, pick a variant and load it to get segments.

Resolve a master playlist to a media playlist

import m3u8

UA_HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/120.0.0.0 Safari/537.36"
    )
}


def resolve_media_playlist(url: str) -> str:
    playlist = m3u8.load(uri=url, headers=UA_HEADERS)
    if playlist.is_variant:
        # Choose the first variant (or implement your own selection logic)
        return playlist.playlists[0].absolute_uri
    return url

media_url = resolve_media_playlist(master_url)
print(media_url)

Inspect key information

For AES-128-encrypted HLS, the media playlist provides the key URI and optional IV.

import m3u8

media = m3u8.load(uri=media_url, headers=UA_HEADERS)
key_obj = media.keys[-1] if media.keys else None
if key_obj:
    print('key_uri:', getattr(key_obj, 'absolute_uri', None) or key_obj.uri)
    print('method:', key_obj.method)
    print('iv:', key_obj.iv)
else:
    print('no encryption key; stream likely unencrypted')

Helpers for AES-128 decryption and IV handling

import requests
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
from urllib.parse import urljoin

UA_HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/120.0.0.0 Safari/537.36"
    )
}


def fetch_key_bytes(session: requests.Session, media_url: str, key_obj) -> bytes | None:
    if not key_obj:
        return None
    key_url = getattr(key_obj, 'absolute_uri', None) or urljoin(media_url, key_obj.uri)
    resp = session.get(key_url, headers=UA_HEADERS, timeout=20)
    resp.raise_for_status()
    return resp.content


def derive_iv(key_obj, seq_number: int) -> bytes:
    """Return a 16-byte IV. If the playlist specifies an IV, parse it; otherwise use the big-endian segment sequence."""
    if key_obj and key_obj.iv:
        hex_str = key_obj.iv.lower().replace('0x', '')
        iv = bytes.fromhex(hex_str)
        if len(iv) < 16:
            iv = (b"\x00" * (16 - len(iv))) + iv
        elif len(iv) > 16:
            iv = iv[-16:]
        return iv
    return int(seq_number).to_bytes(16, byteorder='big')


def aes128_cbc_decrypt(ct: bytes, key_bytes: bytes, iv_bytes: bytes) -> bytes:
    cipher = AES.new(key_bytes, AES.MODE_CBC, iv_bytes)
    pt = cipher.decrypt(ct)
    # Try to remove PKCS#7 padding if present; keep raw if unpadding fails.
    try:
        return unpad(pt, AES.block_size)
    except ValueError:
        return pt

Single-threaded end-to-end download

import os
import time
import m3u8
import requests
from urllib.parse import urljoin

UA_HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/120.0.0.0 Safari/537.36"
    )
}


def download_hls_serial(master_or_media_url: str, output_file: str) -> None:
    session = requests.Session()

    # Resolve to media playlist if necessary
    media_url = resolve_media_playlist(master_or_media_url)

    # Load playlist and key
    playlist = m3u8.load(uri=media_url, headers=UA_HEADERS)
    key_obj = playlist.keys[-1] if playlist.keys else None
    key_bytes = fetch_key_bytes(session, media_url, key_obj) if key_obj else None

    base_seq = playlist.media_sequence or 0
    total = len(playlist.segments)

    # Start clean
    with open(output_file, 'wb'):
        pass

    start = time.time()
    total_written = 0
    for idx, seg in enumerate(playlist.segments):
        seg_url = seg.absolute_uri
        resp = session.get(seg_url, headers=UA_HEADERS, timeout=30)
        resp.raise_for_status()
        chunk = resp.content

        if key_bytes:
            iv = derive_iv(key_obj, base_seq + idx)
            chunk = aes128_cbc_decrypt(chunk, key_bytes, iv)

        with open(output_file, 'ab') as fp:
            fp.write(chunk)
            total_written += len(chunk)

        elapsed = time.time() - start
        print(f"\r[{idx + 1}/{total}] {total_written/1024/1024:.2f} MB in {elapsed:.1f}s", end="")

    print()


# Example
# download_hls_serial('https://vod8.wenshibaowenbei.com/20210628/g4yNLlI7/index.m3u8', 'video_serial.mp4')

Multi-threaded download with merge

Download each segment to a temporary folder, then concatenate in order.

import os
import glob
import shutil
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
import m3u8
import requests

UA_HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/120.0.0.0 Safari/537.36"
    )
}

TMP_DIR = Path('chunks_tmp')


def _download_one(session: requests.Session, seg_url: str, out_path: Path, key_bytes: bytes | None, iv_bytes: bytes | None):
    r = session.get(seg_url, headers=UA_HEADERS, timeout=30)
    r.raise_for_status()
    data = r.content
    if key_bytes and iv_bytes:
        data = aes128_cbc_decrypt(data, key_bytes, iv_bytes)
    out_path.write_bytes(data)


def download_hls_parallel(master_or_media_url: str, output_file: str, workers: int = 10) -> None:
    media_url = resolve_media_playlist(master_or_media_url)
    playlist = m3u8.load(uri=media_url, headers=UA_HEADERS)

    key_obj = playlist.keys[-1] if playlist.keys else None
    session = requests.Session()
    key_bytes = fetch_key_bytes(session, media_url, key_obj) if key_obj else None

    base_seq = playlist.media_sequence or 0
    TMP_DIR.mkdir(exist_ok=True)

    tasks = []
    with ThreadPoolExecutor(max_workers=workers) as pool:
        for idx, seg in enumerate(playlist.segments):
            seg_url = seg.absolute_uri
            iv = derive_iv(key_obj, base_seq + idx) if key_bytes else None
            part_path = TMP_DIR / f"{idx:05d}.ts"
            fut = pool.submit(_download_one, session, seg_url, part_path, key_bytes, iv)
            tasks.append(fut)

        # Optional: simple progress
        for i, fut in enumerate(as_completed(tasks), 1):
            fut.result()  # propagate exceptions
            print(f"\rdownloaded {i}/{len(tasks)}", end="")
    print()

    # Merge in numeric order
    with open(output_file, 'wb') as out_fp:
        for p in sorted(TMP_DIR.glob('*.ts')):
            with open(p, 'rb') as in_fp:
                shutil.copyfileobj(in_fp, out_fp, length=1024 * 1024)
            print(f"\rmerged {p.name}", end="")
    print()

    # Cleanup
    shutil.rmtree(TMP_DIR, ignore_errors=True)


# Example
# download_hls_parallel('https://vod8.wenshibaowenbei.com/20210628/g4yNLlI7/index.m3u8', 'video_parallel.mp4', workers=10)

Minimal TS-fragment test

import m3u8
import requests
from pathlib import Path

UA_HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/120.0.0.0 Safari/537.36"
    )
}

media_url = resolve_media_playlist('https://vod8.wenshibaowenbei.com/20210628/g4yNLlI7/index.m3u8')
pl = m3u8.load(uri=media_url, headers=UA_HEADERS)
key_obj = pl.keys[-1] if pl.keys else None

with requests.Session() as s:
    key_bytes = fetch_key_bytes(s, media_url, key_obj) if key_obj else None
    seq0 = (pl.media_sequence or 0)
    seg0 = pl.segments[0]
    iv0 = derive_iv(key_obj, seq0) if key_bytes else None

    r = s.get(seg0.absolute_uri, headers=UA_HEADERS, timeout=20)
    r.raise_for_status()
    content = r.content
    if key_bytes:
        content = aes128_cbc_decrypt(content, key_bytes, iv0)

    Path('sample_00001.ts').write_bytes(content)
    print('sample_00001.ts saved')
Tags: Pythonm3u8

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.