#!/usr/bin/env python3 """Spawn multiple ffmpeg readers against a single HLS playlist. Usage: python load_test.py https://example/hls/abc/stream.m3u8 5 The positional arguments control the target playlist URL and how many concurrent ffmpeg processes are started. Each client retries on failure with a short backoff so it behaves like a persistent video player. A health summary is printed periodically showing how many workers are streaming, buffering, or backing off. When the script exits (normally or via Ctrl+C) all spawned ffmpeg children are terminated. """ from __future__ import annotations import argparse import os import random import signal import subprocess import sys import tempfile import time from collections import deque from dataclasses import dataclass STOP_REQUESTED = False BUFFER_KEYWORDS = ( "buffer", "stall", "timeout", "timed out", "drop", "late", "overrun", "input/output error", "connection refused", ) SUMMARY_INTERVAL = float(os.environ.get("LOAD_TEST_SUMMARY_INTERVAL", "5")) STABLE_AFTER = float(os.environ.get("LOAD_TEST_STABLE_AFTER", "5")) LOG_CHECK_INTERVAL = float(os.environ.get("LOAD_TEST_LOG_CHECK_INTERVAL", "2")) MIN_BACKOFF = float(os.environ.get("LOAD_TEST_MIN_BACKOFF", "0.5")) MAX_BACKOFF = float(os.environ.get("LOAD_TEST_MAX_BACKOFF", "30")) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Spawn N concurrent ffmpeg readers against an HLS playlist." ) parser.add_argument( "playlist_url", help="HLS playlist URL to probe." ) parser.add_argument( "count", type=int, help="Number of concurrent ffmpeg processes to launch." ) return parser.parse_args() @dataclass class Client: index: int command: list[str] attempts: int = 0 process: subprocess.Popen[str] | None = None log_path: str | None = None resume_at: float = 0.0 last_pid: int | None = None start_time: float = 0.0 is_buffering: bool = False last_warning: str | None = None last_error: str | None = None state: str = "idle" last_log_check: float = 0.0 consecutive_failures: int = 0 def spawn(self) -> None: log_handle = tempfile.NamedTemporaryFile( prefix=f"hls_client_{self.index}_", suffix=".log", delete=False, mode="w", encoding="utf-8", ) try: proc = subprocess.Popen( self.command, stdout=subprocess.DEVNULL, stderr=log_handle, text=False, ) except FileNotFoundError: # pragma: no cover - requires missing ffmpeg sys.exit("ffmpeg binary not found; install ffmpeg before running this script.") finally: log_handle.close() self.attempts += 1 self.process = proc self.log_path = log_handle.name self.resume_at = 0.0 self.last_pid = proc.pid self.start_time = time.time() self.is_buffering = False self.last_warning = None self.state = "starting" self.last_log_check = 0.0 print( f"Spawned client {self.index} attempt {self.attempts} (pid={proc.pid})" ) def poll(self) -> int | None: if self.process is None: return None return self.process.poll() def terminate(self) -> None: if self.process and self.process.poll() is None: self.process.terminate() def spawn_clients(target_url: str, count: int) -> list[Client]: command = [ "ffmpeg", "-nostdin", "-loglevel", os.environ.get("FFMPEG_LOGLEVEL", "warning"), "-i", target_url, "-f", "null", "-", ] clients: list[Client] = [] for index in range(1, count + 1): client = Client(index=index, command=command.copy()) client.spawn() clients.append(client) return clients def terminate_processes(clients: list[Client]) -> None: for client in clients: client.terminate() deadline = time.time() + 5 for client in clients: proc = client.process if proc and proc.poll() is None: remaining = max(0, deadline - time.time()) try: proc.wait(timeout=remaining) except subprocess.TimeoutExpired: proc.kill() def install_signal_handlers(clients: list[Client]) -> None: def handler(signum: int, _frame) -> None: global STOP_REQUESTED if STOP_REQUESTED: return STOP_REQUESTED = True print(f"Received signal {signum}, shutting down clients…") terminate_processes(clients) cleanup_logs(clients) sys.exit(0) signal.signal(signal.SIGINT, handler) signal.signal(signal.SIGTERM, handler) def explain_exit_code(return_code: int | None) -> str: if return_code is None: return "process still running" if return_code == 0: return "exited cleanly" signal_num: int | None = None if return_code < 0: signal_num = -return_code elif return_code >= 128: signal_num = return_code - 128 if signal_num: try: sig_name = signal.Signals(signal_num).name except ValueError: sig_name = f"signal {signal_num}" return f"terminated by {sig_name} (exit code {return_code})" return f"exited with status {return_code}" def read_log_tail(path: str, max_lines: int = 5) -> str: try: with open(path, "r", encoding="utf-8", errors="replace") as handle: tail_lines = ''.join(deque(handle, maxlen=max_lines)).strip() return tail_lines or "" except OSError as exc: return f"" def report_exit(client: Client, return_code: int | None) -> None: log_path = client.log_path description = explain_exit_code(return_code) pid_info = f"pid {client.last_pid}" if client.last_pid is not None else "pid unknown" print(f"Client {client.index} ({pid_info}) {description}.") if log_path: log_tail = read_log_tail(log_path) if log_tail: print("Recent log:") print(log_tail) client.last_error = log_tail if (return_code and log_tail) else None try: os.remove(log_path) except OSError: pass client.log_path = None client.is_buffering = False client.last_warning = None def cleanup_logs(clients: list[Client]) -> None: for client in clients: if client.log_path: report_exit(client, client.process.returncode if client.process else None) def compute_backoff(failures: int) -> float: if failures <= 0: return MIN_BACKOFF min_backoff = max(0.0, MIN_BACKOFF) max_backoff = max(min_backoff, MAX_BACKOFF) lower = min(max_backoff, min_backoff * (2 ** (failures - 1))) upper = min(max_backoff, min_backoff * (2 ** failures)) if upper < lower: lower, upper = upper, lower if lower == upper: return lower return random.uniform(lower, upper) def truncate(text: str, limit: int = 120) -> str: return text if len(text) <= limit else text[: limit - 3] + "..." def detect_buffering(client: Client, now: float) -> None: if not client.log_path: client.is_buffering = False return if now - client.last_log_check < LOG_CHECK_INTERVAL: return client.last_log_check = now tail = read_log_tail(client.log_path) if tail and tail != "": lower_tail = tail.lower() if any(keyword in lower_tail for keyword in BUFFER_KEYWORDS): client.is_buffering = True client.last_warning = tail return client.is_buffering = False def print_summary(clients: list[Client], now: float) -> None: counts = { "streaming": 0, "buffering": 0, "starting": 0, "backoff": 0, "idle": 0, } buffering_notes = [] recent_errors = [] for client in clients: proc = client.process if proc and proc.poll() is None: uptime = now - client.start_time detect_buffering(client, now) if client.is_buffering: client.state = "buffering" elif uptime >= STABLE_AFTER: client.state = "streaming" if client.consecutive_failures: client.consecutive_failures = 0 client.last_error = None else: client.state = "starting" else: if STOP_REQUESTED: client.state = "idle" elif client.resume_at > now: client.state = "backoff" else: client.state = "idle" counts.setdefault(client.state, 0) counts[client.state] += 1 if client.is_buffering and client.last_warning: buffering_notes.append( f"#{client.index} {truncate(client.last_warning)}" ) if client.last_error and client.state != "streaming": recent_errors.append( f"#{client.index} {truncate(client.last_error)}" ) restarts = sum(max(0, client.attempts - 1) for client in clients) summary_parts = [ f"Streaming:{counts['streaming']}", f"Buffering:{counts['buffering']}", f"Starting:{counts['starting']}", f"Backoff:{counts['backoff']}", f"Idle:{counts['idle']}", f"Restarts:{restarts}", ] print("[Summary] " + " | ".join(summary_parts)) if buffering_notes: print(" Buffering clients:") for note in buffering_notes: print(f" {note}") if recent_errors: print(" Recent errors:") for error in recent_errors[:5]: print(f" {error}") def monitor(clients: list[Client]) -> None: global STOP_REQUESTED try: next_summary = time.time() + SUMMARY_INTERVAL while True: now = time.time() for client in clients: proc = client.process if proc and proc.poll() is not None: return_code = proc.returncode report_exit(client, return_code) client.process = None if STOP_REQUESTED: continue if return_code and return_code != 0: client.consecutive_failures += 1 backoff = compute_backoff(client.consecutive_failures) else: client.consecutive_failures = 0 backoff = MIN_BACKOFF client.resume_at = now + backoff print( f"Scheduling restart for client {client.index} in {backoff:.1f}s…" ) client.state = "backoff" if client.process is None and not STOP_REQUESTED and now >= client.resume_at: client.spawn() if now >= next_summary: print_summary(clients, now) next_summary = now + SUMMARY_INTERVAL if STOP_REQUESTED: break time.sleep(1) finally: STOP_REQUESTED = True terminate_processes(clients) cleanup_logs(clients) def main() -> None: args = parse_args() if args.count < 1: sys.exit("Count must be at least 1.") clients = spawn_clients(args.playlist_url, args.count) install_signal_handlers(clients) print("Press Ctrl+C to stop all clients.") monitor(clients) if __name__ == "__main__": main()