diff options
| author | yum <yum.food.vr@gmail.com> | 2025-10-14 15:33:13 -0700 |
|---|---|---|
| committer | yum <yum.food.vr@gmail.com> | 2025-10-28 17:19:36 -0700 |
| commit | 8222717b7e12101cbc1945107b20f5258618d56e (patch) | |
| tree | 34ff9c83bef17dcab99b0c1b71c2b15a39476851 /load_test.py | |
| parent | d52b56d2d99548e996e978c4ae96305f73b24c13 (diff) | |
add load test script
Diffstat (limited to 'load_test.py')
| -rw-r--r-- | load_test.py | 403 |
1 files changed, 403 insertions, 0 deletions
diff --git a/load_test.py b/load_test.py new file mode 100644 index 0000000..0abc3c3 --- /dev/null +++ b/load_test.py @@ -0,0 +1,403 @@ +#!/usr/bin/env python3 +"""Spawn multiple ffmpeg readers against a single HLS playlist. + +Usage: + python load_test.py https://example/hls/abc/stream.m3u8 5 + +The positional arguments control the target playlist URL and how many +concurrent ffmpeg processes are started. Each client retries on failure with a +short backoff so it behaves like a persistent video player. A health summary is +printed periodically showing how many workers are streaming, buffering, or +backing off. When the script exits (normally or via Ctrl+C) all spawned ffmpeg +children are terminated. +""" + +from __future__ import annotations + +import argparse +import os +import random +import signal +import subprocess +import sys +import tempfile +import time +from collections import deque +from dataclasses import dataclass + + +STOP_REQUESTED = False +BUFFER_KEYWORDS = ( + "buffer", + "stall", + "timeout", + "timed out", + "drop", + "late", + "overrun", + "input/output error", + "connection refused", +) +SUMMARY_INTERVAL = float(os.environ.get("LOAD_TEST_SUMMARY_INTERVAL", "5")) +STABLE_AFTER = float(os.environ.get("LOAD_TEST_STABLE_AFTER", "5")) +LOG_CHECK_INTERVAL = float(os.environ.get("LOAD_TEST_LOG_CHECK_INTERVAL", "2")) +MIN_BACKOFF = float(os.environ.get("LOAD_TEST_MIN_BACKOFF", "0.5")) +MAX_BACKOFF = float(os.environ.get("LOAD_TEST_MAX_BACKOFF", "30")) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Spawn N concurrent ffmpeg readers against an HLS playlist." + ) + parser.add_argument( + "playlist_url", + help="HLS playlist URL to probe." + ) + parser.add_argument( + "count", + type=int, + help="Number of concurrent ffmpeg processes to launch." + ) + return parser.parse_args() + + +@dataclass +class Client: + index: int + command: list[str] + attempts: int = 0 + process: subprocess.Popen[str] | None = None + log_path: str | None = None + resume_at: float = 0.0 + last_pid: int | None = None + start_time: float = 0.0 + is_buffering: bool = False + last_warning: str | None = None + last_error: str | None = None + state: str = "idle" + last_log_check: float = 0.0 + consecutive_failures: int = 0 + + def spawn(self) -> None: + log_handle = tempfile.NamedTemporaryFile( + prefix=f"hls_client_{self.index}_", + suffix=".log", + delete=False, + mode="w", + encoding="utf-8", + ) + + try: + proc = subprocess.Popen( + self.command, + stdout=subprocess.DEVNULL, + stderr=log_handle, + text=False, + ) + except FileNotFoundError: # pragma: no cover - requires missing ffmpeg + sys.exit("ffmpeg binary not found; install ffmpeg before running this script.") + finally: + log_handle.close() + + self.attempts += 1 + self.process = proc + self.log_path = log_handle.name + self.resume_at = 0.0 + self.last_pid = proc.pid + self.start_time = time.time() + self.is_buffering = False + self.last_warning = None + self.state = "starting" + self.last_log_check = 0.0 + print( + f"Spawned client {self.index} attempt {self.attempts} (pid={proc.pid})" + ) + + def poll(self) -> int | None: + if self.process is None: + return None + return self.process.poll() + + def terminate(self) -> None: + if self.process and self.process.poll() is None: + self.process.terminate() + + +def spawn_clients(target_url: str, count: int) -> list[Client]: + command = [ + "ffmpeg", + "-nostdin", + "-loglevel", + os.environ.get("FFMPEG_LOGLEVEL", "warning"), + "-i", + target_url, + "-f", + "null", + "-", + ] + + clients: list[Client] = [] + for index in range(1, count + 1): + client = Client(index=index, command=command.copy()) + client.spawn() + clients.append(client) + + return clients + + +def terminate_processes(clients: list[Client]) -> None: + for client in clients: + client.terminate() + + deadline = time.time() + 5 + for client in clients: + proc = client.process + if proc and proc.poll() is None: + remaining = max(0, deadline - time.time()) + try: + proc.wait(timeout=remaining) + except subprocess.TimeoutExpired: + proc.kill() + + +def install_signal_handlers(clients: list[Client]) -> None: + def handler(signum: int, _frame) -> None: + global STOP_REQUESTED + if STOP_REQUESTED: + return + STOP_REQUESTED = True + print(f"Received signal {signum}, shutting down clients…") + terminate_processes(clients) + cleanup_logs(clients) + sys.exit(0) + + signal.signal(signal.SIGINT, handler) + signal.signal(signal.SIGTERM, handler) + + +def explain_exit_code(return_code: int | None) -> str: + if return_code is None: + return "process still running" + if return_code == 0: + return "exited cleanly" + + signal_num: int | None = None + if return_code < 0: + signal_num = -return_code + elif return_code >= 128: + signal_num = return_code - 128 + + if signal_num: + try: + sig_name = signal.Signals(signal_num).name + except ValueError: + sig_name = f"signal {signal_num}" + return f"terminated by {sig_name} (exit code {return_code})" + + return f"exited with status {return_code}" + + +def read_log_tail(path: str, max_lines: int = 5) -> str: + try: + with open(path, "r", encoding="utf-8", errors="replace") as handle: + tail_lines = ''.join(deque(handle, maxlen=max_lines)).strip() + return tail_lines or "<no log output>" + except OSError as exc: + return f"<unable to read log: {exc}>" + + +def report_exit(client: Client, return_code: int | None) -> None: + log_path = client.log_path + description = explain_exit_code(return_code) + pid_info = f"pid {client.last_pid}" if client.last_pid is not None else "pid unknown" + print(f"Client {client.index} ({pid_info}) {description}.") + if log_path: + log_tail = read_log_tail(log_path) + if log_tail: + print("Recent log:") + print(log_tail) + client.last_error = log_tail if (return_code and log_tail) else None + try: + os.remove(log_path) + except OSError: + pass + client.log_path = None + client.is_buffering = False + client.last_warning = None + + +def cleanup_logs(clients: list[Client]) -> None: + for client in clients: + if client.log_path: + report_exit(client, client.process.returncode if client.process else None) + + +def compute_backoff(failures: int) -> float: + if failures <= 0: + return MIN_BACKOFF + + min_backoff = max(0.0, MIN_BACKOFF) + max_backoff = max(min_backoff, MAX_BACKOFF) + + lower = min(max_backoff, min_backoff * (2 ** (failures - 1))) + upper = min(max_backoff, min_backoff * (2 ** failures)) + + if upper < lower: + lower, upper = upper, lower + + if lower == upper: + return lower + + return random.uniform(lower, upper) + + +def truncate(text: str, limit: int = 120) -> str: + return text if len(text) <= limit else text[: limit - 3] + "..." + + +def detect_buffering(client: Client, now: float) -> None: + if not client.log_path: + client.is_buffering = False + return + + if now - client.last_log_check < LOG_CHECK_INTERVAL: + return + + client.last_log_check = now + tail = read_log_tail(client.log_path) + if tail and tail != "<no log output>": + lower_tail = tail.lower() + if any(keyword in lower_tail for keyword in BUFFER_KEYWORDS): + client.is_buffering = True + client.last_warning = tail + return + + client.is_buffering = False + + +def print_summary(clients: list[Client], now: float) -> None: + counts = { + "streaming": 0, + "buffering": 0, + "starting": 0, + "backoff": 0, + "idle": 0, + } + + buffering_notes = [] + recent_errors = [] + + for client in clients: + proc = client.process + if proc and proc.poll() is None: + uptime = now - client.start_time + detect_buffering(client, now) + if client.is_buffering: + client.state = "buffering" + elif uptime >= STABLE_AFTER: + client.state = "streaming" + if client.consecutive_failures: + client.consecutive_failures = 0 + client.last_error = None + else: + client.state = "starting" + else: + if STOP_REQUESTED: + client.state = "idle" + elif client.resume_at > now: + client.state = "backoff" + else: + client.state = "idle" + + counts.setdefault(client.state, 0) + counts[client.state] += 1 + + if client.is_buffering and client.last_warning: + buffering_notes.append( + f"#{client.index} {truncate(client.last_warning)}" + ) + if client.last_error and client.state != "streaming": + recent_errors.append( + f"#{client.index} {truncate(client.last_error)}" + ) + + restarts = sum(max(0, client.attempts - 1) for client in clients) + summary_parts = [ + f"Streaming:{counts['streaming']}", + f"Buffering:{counts['buffering']}", + f"Starting:{counts['starting']}", + f"Backoff:{counts['backoff']}", + f"Idle:{counts['idle']}", + f"Restarts:{restarts}", + ] + + print("[Summary] " + " | ".join(summary_parts)) + + if buffering_notes: + print(" Buffering clients:") + for note in buffering_notes: + print(f" {note}") + + if recent_errors: + print(" Recent errors:") + for error in recent_errors[:5]: + print(f" {error}") + + +def monitor(clients: list[Client]) -> None: + global STOP_REQUESTED + try: + next_summary = time.time() + SUMMARY_INTERVAL + while True: + now = time.time() + for client in clients: + proc = client.process + if proc and proc.poll() is not None: + return_code = proc.returncode + report_exit(client, return_code) + client.process = None + if STOP_REQUESTED: + continue + + if return_code and return_code != 0: + client.consecutive_failures += 1 + backoff = compute_backoff(client.consecutive_failures) + else: + client.consecutive_failures = 0 + backoff = MIN_BACKOFF + client.resume_at = now + backoff + print( + f"Scheduling restart for client {client.index} in {backoff:.1f}s…" + ) + client.state = "backoff" + + if client.process is None and not STOP_REQUESTED and now >= client.resume_at: + client.spawn() + + if now >= next_summary: + print_summary(clients, now) + next_summary = now + SUMMARY_INTERVAL + + if STOP_REQUESTED: + break + + time.sleep(1) + finally: + STOP_REQUESTED = True + terminate_processes(clients) + cleanup_logs(clients) + + +def main() -> None: + args = parse_args() + if args.count < 1: + sys.exit("Count must be at least 1.") + + clients = spawn_clients(args.playlist_url, args.count) + install_signal_handlers(clients) + print("Press Ctrl+C to stop all clients.") + monitor(clients) + + +if __name__ == "__main__": + main() |
