summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--load_test.py403
-rw-r--r--opt/obsproxy/requirements.txt3
-rwxr-xr-xopt/obsproxy/server.py6
3 files changed, 409 insertions, 3 deletions
diff --git a/load_test.py b/load_test.py
new file mode 100644
index 0000000..0abc3c3
--- /dev/null
+++ b/load_test.py
@@ -0,0 +1,403 @@
+#!/usr/bin/env python3
+"""Spawn multiple ffmpeg readers against a single HLS playlist.
+
+Usage:
+ python load_test.py https://example/hls/abc/stream.m3u8 5
+
+The positional arguments control the target playlist URL and how many
+concurrent ffmpeg processes are started. Each client retries on failure with a
+short backoff so it behaves like a persistent video player. A health summary is
+printed periodically showing how many workers are streaming, buffering, or
+backing off. When the script exits (normally or via Ctrl+C) all spawned ffmpeg
+children are terminated.
+"""
+
+from __future__ import annotations
+
+import argparse
+import os
+import random
+import signal
+import subprocess
+import sys
+import tempfile
+import time
+from collections import deque
+from dataclasses import dataclass
+
+
+STOP_REQUESTED = False
+BUFFER_KEYWORDS = (
+ "buffer",
+ "stall",
+ "timeout",
+ "timed out",
+ "drop",
+ "late",
+ "overrun",
+ "input/output error",
+ "connection refused",
+)
+SUMMARY_INTERVAL = float(os.environ.get("LOAD_TEST_SUMMARY_INTERVAL", "5"))
+STABLE_AFTER = float(os.environ.get("LOAD_TEST_STABLE_AFTER", "5"))
+LOG_CHECK_INTERVAL = float(os.environ.get("LOAD_TEST_LOG_CHECK_INTERVAL", "2"))
+MIN_BACKOFF = float(os.environ.get("LOAD_TEST_MIN_BACKOFF", "0.5"))
+MAX_BACKOFF = float(os.environ.get("LOAD_TEST_MAX_BACKOFF", "30"))
+
+
+def parse_args() -> argparse.Namespace:
+ parser = argparse.ArgumentParser(
+ description="Spawn N concurrent ffmpeg readers against an HLS playlist."
+ )
+ parser.add_argument(
+ "playlist_url",
+ help="HLS playlist URL to probe."
+ )
+ parser.add_argument(
+ "count",
+ type=int,
+ help="Number of concurrent ffmpeg processes to launch."
+ )
+ return parser.parse_args()
+
+
+@dataclass
+class Client:
+ index: int
+ command: list[str]
+ attempts: int = 0
+ process: subprocess.Popen[str] | None = None
+ log_path: str | None = None
+ resume_at: float = 0.0
+ last_pid: int | None = None
+ start_time: float = 0.0
+ is_buffering: bool = False
+ last_warning: str | None = None
+ last_error: str | None = None
+ state: str = "idle"
+ last_log_check: float = 0.0
+ consecutive_failures: int = 0
+
+ def spawn(self) -> None:
+ log_handle = tempfile.NamedTemporaryFile(
+ prefix=f"hls_client_{self.index}_",
+ suffix=".log",
+ delete=False,
+ mode="w",
+ encoding="utf-8",
+ )
+
+ try:
+ proc = subprocess.Popen(
+ self.command,
+ stdout=subprocess.DEVNULL,
+ stderr=log_handle,
+ text=False,
+ )
+ except FileNotFoundError: # pragma: no cover - requires missing ffmpeg
+ sys.exit("ffmpeg binary not found; install ffmpeg before running this script.")
+ finally:
+ log_handle.close()
+
+ self.attempts += 1
+ self.process = proc
+ self.log_path = log_handle.name
+ self.resume_at = 0.0
+ self.last_pid = proc.pid
+ self.start_time = time.time()
+ self.is_buffering = False
+ self.last_warning = None
+ self.state = "starting"
+ self.last_log_check = 0.0
+ print(
+ f"Spawned client {self.index} attempt {self.attempts} (pid={proc.pid})"
+ )
+
+ def poll(self) -> int | None:
+ if self.process is None:
+ return None
+ return self.process.poll()
+
+ def terminate(self) -> None:
+ if self.process and self.process.poll() is None:
+ self.process.terminate()
+
+
+def spawn_clients(target_url: str, count: int) -> list[Client]:
+ command = [
+ "ffmpeg",
+ "-nostdin",
+ "-loglevel",
+ os.environ.get("FFMPEG_LOGLEVEL", "warning"),
+ "-i",
+ target_url,
+ "-f",
+ "null",
+ "-",
+ ]
+
+ clients: list[Client] = []
+ for index in range(1, count + 1):
+ client = Client(index=index, command=command.copy())
+ client.spawn()
+ clients.append(client)
+
+ return clients
+
+
+def terminate_processes(clients: list[Client]) -> None:
+ for client in clients:
+ client.terminate()
+
+ deadline = time.time() + 5
+ for client in clients:
+ proc = client.process
+ if proc and proc.poll() is None:
+ remaining = max(0, deadline - time.time())
+ try:
+ proc.wait(timeout=remaining)
+ except subprocess.TimeoutExpired:
+ proc.kill()
+
+
+def install_signal_handlers(clients: list[Client]) -> None:
+ def handler(signum: int, _frame) -> None:
+ global STOP_REQUESTED
+ if STOP_REQUESTED:
+ return
+ STOP_REQUESTED = True
+ print(f"Received signal {signum}, shutting down clients…")
+ terminate_processes(clients)
+ cleanup_logs(clients)
+ sys.exit(0)
+
+ signal.signal(signal.SIGINT, handler)
+ signal.signal(signal.SIGTERM, handler)
+
+
+def explain_exit_code(return_code: int | None) -> str:
+ if return_code is None:
+ return "process still running"
+ if return_code == 0:
+ return "exited cleanly"
+
+ signal_num: int | None = None
+ if return_code < 0:
+ signal_num = -return_code
+ elif return_code >= 128:
+ signal_num = return_code - 128
+
+ if signal_num:
+ try:
+ sig_name = signal.Signals(signal_num).name
+ except ValueError:
+ sig_name = f"signal {signal_num}"
+ return f"terminated by {sig_name} (exit code {return_code})"
+
+ return f"exited with status {return_code}"
+
+
+def read_log_tail(path: str, max_lines: int = 5) -> str:
+ try:
+ with open(path, "r", encoding="utf-8", errors="replace") as handle:
+ tail_lines = ''.join(deque(handle, maxlen=max_lines)).strip()
+ return tail_lines or "<no log output>"
+ except OSError as exc:
+ return f"<unable to read log: {exc}>"
+
+
+def report_exit(client: Client, return_code: int | None) -> None:
+ log_path = client.log_path
+ description = explain_exit_code(return_code)
+ pid_info = f"pid {client.last_pid}" if client.last_pid is not None else "pid unknown"
+ print(f"Client {client.index} ({pid_info}) {description}.")
+ if log_path:
+ log_tail = read_log_tail(log_path)
+ if log_tail:
+ print("Recent log:")
+ print(log_tail)
+ client.last_error = log_tail if (return_code and log_tail) else None
+ try:
+ os.remove(log_path)
+ except OSError:
+ pass
+ client.log_path = None
+ client.is_buffering = False
+ client.last_warning = None
+
+
+def cleanup_logs(clients: list[Client]) -> None:
+ for client in clients:
+ if client.log_path:
+ report_exit(client, client.process.returncode if client.process else None)
+
+
+def compute_backoff(failures: int) -> float:
+ if failures <= 0:
+ return MIN_BACKOFF
+
+ min_backoff = max(0.0, MIN_BACKOFF)
+ max_backoff = max(min_backoff, MAX_BACKOFF)
+
+ lower = min(max_backoff, min_backoff * (2 ** (failures - 1)))
+ upper = min(max_backoff, min_backoff * (2 ** failures))
+
+ if upper < lower:
+ lower, upper = upper, lower
+
+ if lower == upper:
+ return lower
+
+ return random.uniform(lower, upper)
+
+
+def truncate(text: str, limit: int = 120) -> str:
+ return text if len(text) <= limit else text[: limit - 3] + "..."
+
+
+def detect_buffering(client: Client, now: float) -> None:
+ if not client.log_path:
+ client.is_buffering = False
+ return
+
+ if now - client.last_log_check < LOG_CHECK_INTERVAL:
+ return
+
+ client.last_log_check = now
+ tail = read_log_tail(client.log_path)
+ if tail and tail != "<no log output>":
+ lower_tail = tail.lower()
+ if any(keyword in lower_tail for keyword in BUFFER_KEYWORDS):
+ client.is_buffering = True
+ client.last_warning = tail
+ return
+
+ client.is_buffering = False
+
+
+def print_summary(clients: list[Client], now: float) -> None:
+ counts = {
+ "streaming": 0,
+ "buffering": 0,
+ "starting": 0,
+ "backoff": 0,
+ "idle": 0,
+ }
+
+ buffering_notes = []
+ recent_errors = []
+
+ for client in clients:
+ proc = client.process
+ if proc and proc.poll() is None:
+ uptime = now - client.start_time
+ detect_buffering(client, now)
+ if client.is_buffering:
+ client.state = "buffering"
+ elif uptime >= STABLE_AFTER:
+ client.state = "streaming"
+ if client.consecutive_failures:
+ client.consecutive_failures = 0
+ client.last_error = None
+ else:
+ client.state = "starting"
+ else:
+ if STOP_REQUESTED:
+ client.state = "idle"
+ elif client.resume_at > now:
+ client.state = "backoff"
+ else:
+ client.state = "idle"
+
+ counts.setdefault(client.state, 0)
+ counts[client.state] += 1
+
+ if client.is_buffering and client.last_warning:
+ buffering_notes.append(
+ f"#{client.index} {truncate(client.last_warning)}"
+ )
+ if client.last_error and client.state != "streaming":
+ recent_errors.append(
+ f"#{client.index} {truncate(client.last_error)}"
+ )
+
+ restarts = sum(max(0, client.attempts - 1) for client in clients)
+ summary_parts = [
+ f"Streaming:{counts['streaming']}",
+ f"Buffering:{counts['buffering']}",
+ f"Starting:{counts['starting']}",
+ f"Backoff:{counts['backoff']}",
+ f"Idle:{counts['idle']}",
+ f"Restarts:{restarts}",
+ ]
+
+ print("[Summary] " + " | ".join(summary_parts))
+
+ if buffering_notes:
+ print(" Buffering clients:")
+ for note in buffering_notes:
+ print(f" {note}")
+
+ if recent_errors:
+ print(" Recent errors:")
+ for error in recent_errors[:5]:
+ print(f" {error}")
+
+
+def monitor(clients: list[Client]) -> None:
+ global STOP_REQUESTED
+ try:
+ next_summary = time.time() + SUMMARY_INTERVAL
+ while True:
+ now = time.time()
+ for client in clients:
+ proc = client.process
+ if proc and proc.poll() is not None:
+ return_code = proc.returncode
+ report_exit(client, return_code)
+ client.process = None
+ if STOP_REQUESTED:
+ continue
+
+ if return_code and return_code != 0:
+ client.consecutive_failures += 1
+ backoff = compute_backoff(client.consecutive_failures)
+ else:
+ client.consecutive_failures = 0
+ backoff = MIN_BACKOFF
+ client.resume_at = now + backoff
+ print(
+ f"Scheduling restart for client {client.index} in {backoff:.1f}s…"
+ )
+ client.state = "backoff"
+
+ if client.process is None and not STOP_REQUESTED and now >= client.resume_at:
+ client.spawn()
+
+ if now >= next_summary:
+ print_summary(clients, now)
+ next_summary = now + SUMMARY_INTERVAL
+
+ if STOP_REQUESTED:
+ break
+
+ time.sleep(1)
+ finally:
+ STOP_REQUESTED = True
+ terminate_processes(clients)
+ cleanup_logs(clients)
+
+
+def main() -> None:
+ args = parse_args()
+ if args.count < 1:
+ sys.exit("Count must be at least 1.")
+
+ clients = spawn_clients(args.playlist_url, args.count)
+ install_signal_handlers(clients)
+ print("Press Ctrl+C to stop all clients.")
+ monitor(clients)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/opt/obsproxy/requirements.txt b/opt/obsproxy/requirements.txt
new file mode 100644
index 0000000..c1a9042
--- /dev/null
+++ b/opt/obsproxy/requirements.txt
@@ -0,0 +1,3 @@
+Flask>=2.3,<3.0
+python-dotenv>=1.0,<2.0
+waitress>=2.1,<3.0
diff --git a/opt/obsproxy/server.py b/opt/obsproxy/server.py
index 16d10cf..422b483 100755
--- a/opt/obsproxy/server.py
+++ b/opt/obsproxy/server.py
@@ -19,8 +19,8 @@ load_dotenv()
INGEST_PSK = os.environ.get('OBS_STREAM_KEY') or os.environ.get('STREAM_PSK')
INGEST_THREAD_QUEUE_SIZE = int(os.environ.get('INGEST_THREAD_QUEUE_SIZE', '4096'))
HLS_SEGMENT_TIME = float(os.environ.get('HLS_SEGMENT_TIME', '2'))
-HLS_PLAYLIST_SIZE = int(os.environ.get('HLS_PLAYLIST_SIZE', '6'))
-HLS_DELETE_THRESHOLD = int(os.environ.get('HLS_DELETE_THRESHOLD', '2'))
+HLS_PLAYLIST_SIZE = int(os.environ.get('HLS_PLAYLIST_SIZE', '20'))
+HLS_DELETE_THRESHOLD = int(os.environ.get('HLS_DELETE_THRESHOLD', '20'))
BASE_DIR = Path(os.environ.get('STREAM_DIR', '/var/www/streams'))
SERVER_DOMAIN = os.environ.get('SERVER_DOMAIN', 'yummers.dev')
STREAM_HEX = secrets.token_hex(16)
@@ -78,7 +78,7 @@ def start_ffmpeg_process():
'ffmpeg',
'-nostdin',
'-hide_banner',
- '-loglevel', os.environ.get('FFMPEG_LOGLEVEL', 'info'),
+ '-loglevel', os.environ.get('FFMPEG_LOGLEVEL', 'warning'),
'-fflags', '+genpts',
'-thread_queue_size', str(INGEST_THREAD_QUEUE_SIZE),
'-i', f'rtmp://localhost/live/{INGEST_PSK}',