From 45653c831d286ceb1450bf9f3850c6b5b1a954c1 Mon Sep 17 00:00:00 2001 From: yum Date: Wed, 15 Oct 2025 14:02:59 -0700 Subject: cleanup --- opt/obsproxy/server.py | 140 +++++++++++++++++++++++-------------------------- 1 file changed, 66 insertions(+), 74 deletions(-) diff --git a/opt/obsproxy/server.py b/opt/obsproxy/server.py index 6a52653..3f9001a 100755 --- a/opt/obsproxy/server.py +++ b/opt/obsproxy/server.py @@ -25,10 +25,10 @@ except ValueError as exc: raise ValueError('INGEST_RTMP_PORT must be an integer') from exc INGEST_THREAD_QUEUE_SIZE = int(os.environ.get('INGEST_THREAD_QUEUE_SIZE', '4096')) HLS_SEGMENT_TIME = float(os.environ.get('HLS_SEGMENT_TIME', '2')) -HLS_PLAYLIST_SIZE = int(os.environ.get('HLS_PLAYLIST_SIZE', '20')) +HLS_PLAYLIST_SIZE = int(os.environ.get('HLS_PLAYLIST_SIZE', '4')) HLS_DELETE_THRESHOLD = int(os.environ.get('HLS_DELETE_THRESHOLD', '20')) -HLS_PLAYLIST_TIMEOUT = float(os.environ.get('HLS_PLAYLIST_TIMEOUT', '45')) -HLS_PLAYLIST_POLL_INTERVAL = float(os.environ.get('HLS_PLAYLIST_POLL_INTERVAL', '0.5')) +HLS_PLAYLIST_TIMEOUT = float(os.environ.get('HLS_PLAYLIST_TIMEOUT', '5')) +HLS_PLAYLIST_POLL_INTERVAL = float(os.environ.get('HLS_PLAYLIST_POLL_INTERVAL', '0.2')) BASE_DIR = Path(os.environ.get('STREAM_DIR', '/var/www/streams')) SERVER_DOMAIN = os.environ.get('SERVER_DOMAIN', 'yummers.dev') STREAM_HEX = secrets.token_hex(16) @@ -38,9 +38,6 @@ HLS_ROUTE_PREFIX = f"/hls/{STREAM_HEX}" AUDIO_BITRATE = '256k' AUDIO_CHANNELS = 2 AUDIO_SAMPLE_RATE = 48000 -FFMPEG_RETRY_DELAY = float(os.environ.get('FFMPEG_RETRY_DELAY', '1.0')) -FFMPEG_RETRY_MAX_DELAY = float(os.environ.get('FFMPEG_RETRY_MAX_DELAY', '5.0')) -FFMPEG_RETRY_BACKOFF = float(os.environ.get('FFMPEG_RETRY_BACKOFF', '1.5')) # Setup logging logging.basicConfig( @@ -50,11 +47,15 @@ logging.basicConfig( logger = logging.getLogger('obs_proxy') # Global state -stream_active = False ffmpeg_process = None ffmpeg_worker_thread = None ffmpeg_stop_event = threading.Event() + +def _worker_running() -> bool: + """Return True if the ffmpeg worker thread is currently active.""" + return ffmpeg_worker_thread is not None and ffmpeg_worker_thread.is_alive() + # Validate configuration if not INGEST_PSK: logger.error("OBS_STREAM_KEY/STREAM_PSK is not set") @@ -70,6 +71,29 @@ def reset_stream_path(): STREAM_PATH.mkdir(parents=True, exist_ok=True) +def _safe_reset_stream_path(context: str) -> bool: + """Reset the stream directory and log any failure.""" + try: + reset_stream_path() + return True + except Exception as exc: # pragma: no cover - best effort logging + logger.error("Error resetting stream directory %s: %s", context, exc) + return False + + +def _verify_stream_path_writable() -> None: + """Ensure the stream directory is writable before launching FFmpeg.""" + test_file = STREAM_PATH / "write_test.txt" + try: + with open(test_file, "w", encoding="utf-8") as probe: + probe.write("ok") + except Exception as exc: # pragma: no cover - best effort logging + logger.error("Could not write to stream directory: %s", exc) + finally: + if test_file.exists(): + test_file.unlink() + + def _wait_for_playlist(process: subprocess.Popen[str], attempt: int): """Block until the playlist appears or we hit the configured timeout.""" playlist_path = STREAM_PATH / 'stream.m3u8' @@ -163,7 +187,7 @@ def _build_ffmpeg_command() -> list[str]: def _run_ffmpeg_once(attempt: int) -> bool: """Start ffmpeg, wait for it to exit, and report whether it ran cleanly.""" - global ffmpeg_process, stream_active + global ffmpeg_process logger.info("Starting FFmpeg for live stream (attempt %s)", attempt) @@ -180,7 +204,6 @@ def _run_ffmpeg_once(attempt: int) -> bool: return False ffmpeg_process = process - stream_active = True start_time = time.monotonic() logger.info("FFmpeg process started with PID %s", process.pid) @@ -202,7 +225,6 @@ def _run_ffmpeg_once(attempt: int) -> bool: elapsed = time.monotonic() - start_time ffmpeg_process = None - stream_active = False if ffmpeg_stop_event.is_set(): logger.info( @@ -229,63 +251,43 @@ def _run_ffmpeg_once(attempt: int) -> bool: return True -def start_ffmpeg_process(): - """Start FFmpeg to convert RTMP ingest into HLS.""" - global ffmpeg_worker_thread - - if ffmpeg_worker_thread and ffmpeg_worker_thread.is_alive(): - logger.warning("FFmpeg worker already running; skipping duplicate start") - return True +def _ffmpeg_worker_loop() -> None: + """Keep spawning FFmpeg until it runs cleanly or a stop is requested.""" + attempt = 0 - reset_stream_path() - logger.info(f"Stream directory ready at {STREAM_PATH}") + while not ffmpeg_stop_event.is_set(): + attempt += 1 + if _run_ffmpeg_once(attempt): + break - # Sanity check that we can write to the path before spawning ffmpeg - try: - test_file = STREAM_PATH / "write_test.txt" - with open(test_file, "w", encoding="utf-8") as probe: - probe.write("ok") - if test_file.exists(): - test_file.unlink() - except Exception as exc: # pragma: no cover - best effort logging - logger.error(f"Could not write to stream directory: {exc}") - - ffmpeg_stop_event.clear() - - def worker(): - attempt = 0 - delay = max(FFMPEG_RETRY_DELAY, 0.1) + if ffmpeg_stop_event.is_set(): + break - while not ffmpeg_stop_event.is_set(): - attempt += 1 - ran_cleanly = _run_ffmpeg_once(attempt) - if ran_cleanly: - break + _safe_reset_stream_path("between FFmpeg attempts") - if ffmpeg_stop_event.is_set(): - break + logger.debug("FFmpeg worker exiting") - wait_time = min(delay, FFMPEG_RETRY_MAX_DELAY) - logger.info( - "FFmpeg will retry in %.2fs (next attempt %s)", - wait_time, - attempt + 1, - ) - if ffmpeg_stop_event.wait(wait_time): - break +def start_ffmpeg_process(): + """Start FFmpeg to convert RTMP ingest into HLS.""" + global ffmpeg_worker_thread - try: - reset_stream_path() - except Exception as exc: # pragma: no cover - best effort logging - logger.error(f"Error resetting stream directory between attempts: {exc}") + if _worker_running(): + logger.warning("FFmpeg worker already running; skipping duplicate start") + return True - delay = min(delay * FFMPEG_RETRY_BACKOFF, FFMPEG_RETRY_MAX_DELAY) + if _safe_reset_stream_path("before FFmpeg start"): + logger.info(f"Stream directory ready at {STREAM_PATH}") + _verify_stream_path_writable() - logger.debug("FFmpeg worker exiting") + ffmpeg_stop_event.clear() try: - ffmpeg_worker_thread = threading.Thread(target=worker, daemon=True, name="ffmpeg-worker") + ffmpeg_worker_thread = threading.Thread( + target=_ffmpeg_worker_loop, + daemon=True, + name="ffmpeg-worker", + ) ffmpeg_worker_thread.start() return True except Exception as exc: # pragma: no cover - defensive logging @@ -296,7 +298,7 @@ def start_ffmpeg_process(): def cleanup_stream(): """Stop FFmpeg and purge any cached HLS segments.""" - global ffmpeg_process, stream_active, ffmpeg_worker_thread + global ffmpeg_process, ffmpeg_worker_thread ffmpeg_stop_event.set() @@ -304,18 +306,14 @@ def cleanup_stream(): _terminate_ffmpeg_process(ffmpeg_process) ffmpeg_process = None - if ffmpeg_worker_thread and ffmpeg_worker_thread.is_alive(): - ffmpeg_worker_thread.join(timeout=5) - if ffmpeg_worker_thread.is_alive(): # pragma: no cover - diagnostic + worker = ffmpeg_worker_thread + if worker and worker.is_alive(): + worker.join(timeout=5) + if worker.is_alive(): # pragma: no cover - diagnostic logger.warning("FFmpeg worker thread did not exit cleanly") ffmpeg_worker_thread = None - stream_active = False - - try: - reset_stream_path() - except Exception as exc: # pragma: no cover - best effort logging - logger.error(f"Error resetting stream directory: {exc}") + _safe_reset_stream_path("during cleanup") ffmpeg_stop_event.clear() @@ -324,18 +322,14 @@ def cleanup_stream(): @app.route('/rtmp_callbacks/on_publish', methods=['POST']) def on_publish(): """Callback when a stream starts""" - global stream_active, ffmpeg_worker_thread - stream_key = request.form.get('name') logger.info("on_publish received for key=%s", stream_key) - old_playlist = STREAM_PATH / 'stream.m3u8' - if not stream_key or stream_key != INGEST_PSK: logger.warning("Unauthorized stream key attempted to publish: %s", stream_key) return "Unauthorized", 403 - if stream_active or (ffmpeg_worker_thread and ffmpeg_worker_thread.is_alive()): + if ffmpeg_process or _worker_running(): logger.info("Stream already active, recycling existing session") cleanup_stream() @@ -352,8 +346,6 @@ def on_publish(): @app.route('/rtmp_callbacks/on_publish_done', methods=['POST']) def on_publish_done(): """Callback when a stream ends""" - global stream_active, ffmpeg_worker_thread - stream_key = request.form.get('name') logger.info("on_publish_done received for key=%s", stream_key) @@ -361,7 +353,7 @@ def on_publish_done(): logger.warning("on_publish_done received for unknown key: %s", stream_key) return "Bad request", 400 - if stream_active or (ffmpeg_worker_thread and ffmpeg_worker_thread.is_alive()): + if ffmpeg_process or _worker_running(): cleanup_stream() logger.info("Stream publishing ended") -- cgit v1.2.3