diff options
| author | yum <yum.food.vr@gmail.com> | 2025-10-14 21:44:30 -0700 |
|---|---|---|
| committer | yum <yum.food.vr@gmail.com> | 2025-10-28 17:19:37 -0700 |
| commit | a8bf42f13453626a5ff1b247360279cbf06e3bc4 (patch) | |
| tree | 07a6403a4a115c43d990aef529ae0b06291a1273 | |
| parent | fb7997c4773dc81414baadbfbfa5b171cd6f7eb2 (diff) | |
should pick up connection after upgrade without manual obs restart
| -rwxr-xr-x | opt/obsproxy/server.py | 289 |
1 files changed, 196 insertions, 93 deletions
diff --git a/opt/obsproxy/server.py b/opt/obsproxy/server.py index d7d3eac..6a52653 100755 --- a/opt/obsproxy/server.py +++ b/opt/obsproxy/server.py @@ -27,6 +27,8 @@ INGEST_THREAD_QUEUE_SIZE = int(os.environ.get('INGEST_THREAD_QUEUE_SIZE', '4096' HLS_SEGMENT_TIME = float(os.environ.get('HLS_SEGMENT_TIME', '2')) HLS_PLAYLIST_SIZE = int(os.environ.get('HLS_PLAYLIST_SIZE', '20')) HLS_DELETE_THRESHOLD = int(os.environ.get('HLS_DELETE_THRESHOLD', '20')) +HLS_PLAYLIST_TIMEOUT = float(os.environ.get('HLS_PLAYLIST_TIMEOUT', '45')) +HLS_PLAYLIST_POLL_INTERVAL = float(os.environ.get('HLS_PLAYLIST_POLL_INTERVAL', '0.5')) BASE_DIR = Path(os.environ.get('STREAM_DIR', '/var/www/streams')) SERVER_DOMAIN = os.environ.get('SERVER_DOMAIN', 'yummers.dev') STREAM_HEX = secrets.token_hex(16) @@ -36,6 +38,9 @@ HLS_ROUTE_PREFIX = f"/hls/{STREAM_HEX}" AUDIO_BITRATE = '256k' AUDIO_CHANNELS = 2 AUDIO_SAMPLE_RATE = 48000 +FFMPEG_RETRY_DELAY = float(os.environ.get('FFMPEG_RETRY_DELAY', '1.0')) +FFMPEG_RETRY_MAX_DELAY = float(os.environ.get('FFMPEG_RETRY_MAX_DELAY', '5.0')) +FFMPEG_RETRY_BACKOFF = float(os.environ.get('FFMPEG_RETRY_BACKOFF', '1.5')) # Setup logging logging.basicConfig( @@ -47,6 +52,8 @@ logger = logging.getLogger('obs_proxy') # Global state stream_active = False ffmpeg_process = None +ffmpeg_worker_thread = None +ffmpeg_stop_event = threading.Event() # Validate configuration if not INGEST_PSK: @@ -63,10 +70,12 @@ def reset_stream_path(): STREAM_PATH.mkdir(parents=True, exist_ok=True) -def _log_playlist_availability(playlist_path: Path, timeout: float = 30.0, poll_interval: float = 0.5) -> None: - """Log when the HLS playlist becomes available (or if it never does).""" +def _wait_for_playlist(process: subprocess.Popen[str], attempt: int): + """Block until the playlist appears or we hit the configured timeout.""" + playlist_path = STREAM_PATH / 'stream.m3u8' start = time.monotonic() - while time.monotonic() - start <= timeout: + + while True: if playlist_path.exists(): elapsed = time.monotonic() - start segments = sorted(seg.name for seg in playlist_path.parent.glob('segment-*.ts')) @@ -76,34 +85,58 @@ def _log_playlist_availability(playlist_path: Path, timeout: float = 30.0, poll_ playlist_path, segments, ) - return - time.sleep(poll_interval) + return True, 'ready' - logger.warning( - "HLS playlist still missing after %.2fs at %s", - timeout, - playlist_path, - ) + if ffmpeg_stop_event.is_set(): + return False, 'stop_requested' + if process.poll() is not None: + return False, 'ffmpeg_exited' -def start_ffmpeg_process(): - """Start FFmpeg to convert RTMP ingest into HLS.""" - global ffmpeg_process, stream_active + elapsed = time.monotonic() - start + if elapsed >= HLS_PLAYLIST_TIMEOUT: + logger.warning( + "HLS playlist still missing after %.2fs on attempt %s; recycling FFmpeg", + elapsed, + attempt, + ) + return False, 'timeout' - reset_stream_path() - logger.info(f"Stream directory ready at {STREAM_PATH}") + if ffmpeg_stop_event.wait(HLS_PLAYLIST_POLL_INTERVAL): + return False, 'stop_requested' - # Sanity check that we can write to the path before spawning ffmpeg + +def _start_pipe_logger(pipe, level): + """Drain an ffmpeg pipe on a background thread to avoid deadlocks.""" + + def pipe_logger(): + with pipe: + for line in iter(pipe.readline, ''): + line = line.strip() + if line: + logger.log(level, "ffmpeg: %s", line) + + threading.Thread(target=pipe_logger, daemon=True).start() + + +def _terminate_ffmpeg_process(process: subprocess.Popen[str], *, timeout: float = 5.0, log_errors: bool = True) -> None: + """Terminate an ffmpeg process, falling back to kill if needed.""" try: - test_file = STREAM_PATH / "write_test.txt" - with open(test_file, "w", encoding="utf-8") as probe: - probe.write("ok") - if test_file.exists(): - test_file.unlink() + process.terminate() + process.wait(timeout=timeout) except Exception as exc: # pragma: no cover - best effort logging - logger.error(f"Could not write to stream directory: {exc}") + if log_errors: + logger.error(f"Error stopping FFmpeg: {exc}") + try: + process.kill() + except Exception: # pragma: no cover + pass + + +def _build_ffmpeg_command() -> list[str]: + """Construct the ffmpeg command line we execute for each attempt.""" - command = [ + return [ 'ffmpeg', '-nostdin', '-hide_banner', @@ -124,97 +157,174 @@ def start_ffmpeg_process(): '-hls_flags', 'delete_segments+independent_segments', '-hls_delete_threshold', str(HLS_DELETE_THRESHOLD), '-hls_segment_filename', str(STREAM_PATH / 'segment-%05d.ts'), - str(STREAM_PATH / 'stream.m3u8') + str(STREAM_PATH / 'stream.m3u8'), ] - logger.info("Starting FFmpeg for live stream") + +def _run_ffmpeg_once(attempt: int) -> bool: + """Start ffmpeg, wait for it to exit, and report whether it ran cleanly.""" + global ffmpeg_process, stream_active + + logger.info("Starting FFmpeg for live stream (attempt %s)", attempt) try: process = subprocess.Popen( - command, + _build_ffmpeg_command(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, - bufsize=1 + bufsize=1, ) + except Exception as exc: # pragma: no cover - best effort logging + logger.error("Failed to start FFmpeg: %s", exc) + return False + + ffmpeg_process = process + stream_active = True + start_time = time.monotonic() + + logger.info("FFmpeg process started with PID %s", process.pid) + logger.info('Stream active; waiting for playlist to appear') + + _start_pipe_logger(process.stderr, logging.WARNING) + _start_pipe_logger(process.stdout, logging.DEBUG) - ffmpeg_process = process - stream_active = True - - def pipe_logger(pipe, level): - """Continuously drain an ffmpeg pipe and log each line.""" - with pipe: - for line in iter(pipe.readline, ''): - line = line.strip() - if line: - logger.log(level, "ffmpeg: %s", line) - - threading.Thread( - target=pipe_logger, - args=(process.stderr, logging.WARNING), - daemon=True - ).start() - - threading.Thread( - target=pipe_logger, - args=(process.stdout, logging.DEBUG), - daemon=True - ).start() - - def monitor_process(): - """Collect ffmpeg result and reset state when it exits.""" - global ffmpeg_process, stream_active - - logger.info("FFmpeg process started with PID %s", process.pid) - - exit_code = process.wait() - if exit_code != 0: - logger.error("FFmpeg exited with code %s", exit_code) - else: - logger.info("FFmpeg process completed successfully") - - ffmpeg_process = None - stream_active = False - - threading.Thread(target=monitor_process, daemon=True).start() + playlist_ready, playlist_reason = _wait_for_playlist(process, attempt) + + if not playlist_ready and not ffmpeg_stop_event.is_set(): + if playlist_reason == 'timeout' and process.poll() is None: + logger.info("Terminating FFmpeg attempt %s after playlist timeout", attempt) + _terminate_ffmpeg_process(process, log_errors=False) + elif playlist_reason == 'ffmpeg_exited': + logger.debug("FFmpeg exited before playlist became available on attempt %s", attempt) + + exit_code = process.wait() + elapsed = time.monotonic() - start_time + + ffmpeg_process = None + stream_active = False + + if ffmpeg_stop_event.is_set(): + logger.info( + "FFmpeg stop requested; process exited with code %s after %.2fs", + exit_code, + elapsed, + ) return True - except Exception as exc: - logger.error(f"Failed to start FFmpeg: {exc}") - ffmpeg_process = None - stream_active = False + if not playlist_ready: + logger.error( + "FFmpeg attempt %s ended (exit %s) before playlist became available (reason=%s)", + attempt, + exit_code, + playlist_reason, + ) + return False + + if exit_code != 0: + logger.error("FFmpeg exited with code %s after %.2fs", exit_code, elapsed) + return False + + logger.info("FFmpeg process completed successfully after %.2fs", elapsed) + return True + + +def start_ffmpeg_process(): + """Start FFmpeg to convert RTMP ingest into HLS.""" + global ffmpeg_worker_thread + + if ffmpeg_worker_thread and ffmpeg_worker_thread.is_alive(): + logger.warning("FFmpeg worker already running; skipping duplicate start") + return True + + reset_stream_path() + logger.info(f"Stream directory ready at {STREAM_PATH}") + + # Sanity check that we can write to the path before spawning ffmpeg + try: + test_file = STREAM_PATH / "write_test.txt" + with open(test_file, "w", encoding="utf-8") as probe: + probe.write("ok") + if test_file.exists(): + test_file.unlink() + except Exception as exc: # pragma: no cover - best effort logging + logger.error(f"Could not write to stream directory: {exc}") + + ffmpeg_stop_event.clear() + + def worker(): + attempt = 0 + delay = max(FFMPEG_RETRY_DELAY, 0.1) + + while not ffmpeg_stop_event.is_set(): + attempt += 1 + ran_cleanly = _run_ffmpeg_once(attempt) + if ran_cleanly: + break + + if ffmpeg_stop_event.is_set(): + break + + wait_time = min(delay, FFMPEG_RETRY_MAX_DELAY) + logger.info( + "FFmpeg will retry in %.2fs (next attempt %s)", + wait_time, + attempt + 1, + ) + + if ffmpeg_stop_event.wait(wait_time): + break + + try: + reset_stream_path() + except Exception as exc: # pragma: no cover - best effort logging + logger.error(f"Error resetting stream directory between attempts: {exc}") + + delay = min(delay * FFMPEG_RETRY_BACKOFF, FFMPEG_RETRY_MAX_DELAY) + + logger.debug("FFmpeg worker exiting") + + try: + ffmpeg_worker_thread = threading.Thread(target=worker, daemon=True, name="ffmpeg-worker") + ffmpeg_worker_thread.start() + return True + except Exception as exc: # pragma: no cover - defensive logging + logger.error(f"Failed to start FFmpeg worker thread: {exc}") + ffmpeg_worker_thread = None return False def cleanup_stream(): """Stop FFmpeg and purge any cached HLS segments.""" - global ffmpeg_process, stream_active + global ffmpeg_process, stream_active, ffmpeg_worker_thread + + ffmpeg_stop_event.set() if ffmpeg_process: - try: - ffmpeg_process.terminate() - ffmpeg_process.wait(timeout=5) - except Exception as exc: # pragma: no cover - best effort logging - logger.error(f"Error stopping FFmpeg: {exc}") - try: - ffmpeg_process.kill() - except Exception: # pragma: no cover - pass - finally: - ffmpeg_process = None + _terminate_ffmpeg_process(ffmpeg_process) + ffmpeg_process = None + + if ffmpeg_worker_thread and ffmpeg_worker_thread.is_alive(): + ffmpeg_worker_thread.join(timeout=5) + if ffmpeg_worker_thread.is_alive(): # pragma: no cover - diagnostic + logger.warning("FFmpeg worker thread did not exit cleanly") + ffmpeg_worker_thread = None stream_active = False + try: reset_stream_path() except Exception as exc: # pragma: no cover - best effort logging logger.error(f"Error resetting stream directory: {exc}") + ffmpeg_stop_event.clear() + # Routes @app.route('/rtmp_callbacks/on_publish', methods=['POST']) def on_publish(): """Callback when a stream starts""" - global stream_active + global stream_active, ffmpeg_worker_thread stream_key = request.form.get('name') logger.info("on_publish received for key=%s", stream_key) @@ -225,7 +335,7 @@ def on_publish(): logger.warning("Unauthorized stream key attempted to publish: %s", stream_key) return "Unauthorized", 403 - if stream_active: + if stream_active or (ffmpeg_worker_thread and ffmpeg_worker_thread.is_alive()): logger.info("Stream already active, recycling existing session") cleanup_stream() @@ -235,13 +345,6 @@ def on_publish(): playlist_path = STREAM_PATH / 'stream.m3u8' if playlist_path.exists(): logger.info("Existing playlist found at %s", playlist_path) - else: - logger.info('Stream active; waiting for playlist to appear') - threading.Thread( - target=_log_playlist_availability, - args=(playlist_path,), - daemon=True, - ).start() return "OK" @@ -249,7 +352,7 @@ def on_publish(): @app.route('/rtmp_callbacks/on_publish_done', methods=['POST']) def on_publish_done(): """Callback when a stream ends""" - global stream_active + global stream_active, ffmpeg_worker_thread stream_key = request.form.get('name') logger.info("on_publish_done received for key=%s", stream_key) @@ -258,7 +361,7 @@ def on_publish_done(): logger.warning("on_publish_done received for unknown key: %s", stream_key) return "Bad request", 400 - if stream_active: + if stream_active or (ffmpeg_worker_thread and ffmpeg_worker_thread.is_alive()): cleanup_stream() logger.info("Stream publishing ended") |
