summaryrefslogtreecommitdiffstats
path: root/opt/obsproxy
diff options
context:
space:
mode:
authoryum <yum.food.vr@gmail.com>2025-10-14 21:44:30 -0700
committeryum <yum.food.vr@gmail.com>2025-10-28 17:19:37 -0700
commita8bf42f13453626a5ff1b247360279cbf06e3bc4 (patch)
tree07a6403a4a115c43d990aef529ae0b06291a1273 /opt/obsproxy
parentfb7997c4773dc81414baadbfbfa5b171cd6f7eb2 (diff)
should pick up connection after upgrade without manual obs restart
Diffstat (limited to 'opt/obsproxy')
-rwxr-xr-xopt/obsproxy/server.py289
1 files changed, 196 insertions, 93 deletions
diff --git a/opt/obsproxy/server.py b/opt/obsproxy/server.py
index d7d3eac..6a52653 100755
--- a/opt/obsproxy/server.py
+++ b/opt/obsproxy/server.py
@@ -27,6 +27,8 @@ INGEST_THREAD_QUEUE_SIZE = int(os.environ.get('INGEST_THREAD_QUEUE_SIZE', '4096'
HLS_SEGMENT_TIME = float(os.environ.get('HLS_SEGMENT_TIME', '2'))
HLS_PLAYLIST_SIZE = int(os.environ.get('HLS_PLAYLIST_SIZE', '20'))
HLS_DELETE_THRESHOLD = int(os.environ.get('HLS_DELETE_THRESHOLD', '20'))
+HLS_PLAYLIST_TIMEOUT = float(os.environ.get('HLS_PLAYLIST_TIMEOUT', '45'))
+HLS_PLAYLIST_POLL_INTERVAL = float(os.environ.get('HLS_PLAYLIST_POLL_INTERVAL', '0.5'))
BASE_DIR = Path(os.environ.get('STREAM_DIR', '/var/www/streams'))
SERVER_DOMAIN = os.environ.get('SERVER_DOMAIN', 'yummers.dev')
STREAM_HEX = secrets.token_hex(16)
@@ -36,6 +38,9 @@ HLS_ROUTE_PREFIX = f"/hls/{STREAM_HEX}"
AUDIO_BITRATE = '256k'
AUDIO_CHANNELS = 2
AUDIO_SAMPLE_RATE = 48000
+FFMPEG_RETRY_DELAY = float(os.environ.get('FFMPEG_RETRY_DELAY', '1.0'))
+FFMPEG_RETRY_MAX_DELAY = float(os.environ.get('FFMPEG_RETRY_MAX_DELAY', '5.0'))
+FFMPEG_RETRY_BACKOFF = float(os.environ.get('FFMPEG_RETRY_BACKOFF', '1.5'))
# Setup logging
logging.basicConfig(
@@ -47,6 +52,8 @@ logger = logging.getLogger('obs_proxy')
# Global state
stream_active = False
ffmpeg_process = None
+ffmpeg_worker_thread = None
+ffmpeg_stop_event = threading.Event()
# Validate configuration
if not INGEST_PSK:
@@ -63,10 +70,12 @@ def reset_stream_path():
STREAM_PATH.mkdir(parents=True, exist_ok=True)
-def _log_playlist_availability(playlist_path: Path, timeout: float = 30.0, poll_interval: float = 0.5) -> None:
- """Log when the HLS playlist becomes available (or if it never does)."""
+def _wait_for_playlist(process: subprocess.Popen[str], attempt: int):
+ """Block until the playlist appears or we hit the configured timeout."""
+ playlist_path = STREAM_PATH / 'stream.m3u8'
start = time.monotonic()
- while time.monotonic() - start <= timeout:
+
+ while True:
if playlist_path.exists():
elapsed = time.monotonic() - start
segments = sorted(seg.name for seg in playlist_path.parent.glob('segment-*.ts'))
@@ -76,34 +85,58 @@ def _log_playlist_availability(playlist_path: Path, timeout: float = 30.0, poll_
playlist_path,
segments,
)
- return
- time.sleep(poll_interval)
+ return True, 'ready'
- logger.warning(
- "HLS playlist still missing after %.2fs at %s",
- timeout,
- playlist_path,
- )
+ if ffmpeg_stop_event.is_set():
+ return False, 'stop_requested'
+ if process.poll() is not None:
+ return False, 'ffmpeg_exited'
-def start_ffmpeg_process():
- """Start FFmpeg to convert RTMP ingest into HLS."""
- global ffmpeg_process, stream_active
+ elapsed = time.monotonic() - start
+ if elapsed >= HLS_PLAYLIST_TIMEOUT:
+ logger.warning(
+ "HLS playlist still missing after %.2fs on attempt %s; recycling FFmpeg",
+ elapsed,
+ attempt,
+ )
+ return False, 'timeout'
- reset_stream_path()
- logger.info(f"Stream directory ready at {STREAM_PATH}")
+ if ffmpeg_stop_event.wait(HLS_PLAYLIST_POLL_INTERVAL):
+ return False, 'stop_requested'
- # Sanity check that we can write to the path before spawning ffmpeg
+
+def _start_pipe_logger(pipe, level):
+ """Drain an ffmpeg pipe on a background thread to avoid deadlocks."""
+
+ def pipe_logger():
+ with pipe:
+ for line in iter(pipe.readline, ''):
+ line = line.strip()
+ if line:
+ logger.log(level, "ffmpeg: %s", line)
+
+ threading.Thread(target=pipe_logger, daemon=True).start()
+
+
+def _terminate_ffmpeg_process(process: subprocess.Popen[str], *, timeout: float = 5.0, log_errors: bool = True) -> None:
+ """Terminate an ffmpeg process, falling back to kill if needed."""
try:
- test_file = STREAM_PATH / "write_test.txt"
- with open(test_file, "w", encoding="utf-8") as probe:
- probe.write("ok")
- if test_file.exists():
- test_file.unlink()
+ process.terminate()
+ process.wait(timeout=timeout)
except Exception as exc: # pragma: no cover - best effort logging
- logger.error(f"Could not write to stream directory: {exc}")
+ if log_errors:
+ logger.error(f"Error stopping FFmpeg: {exc}")
+ try:
+ process.kill()
+ except Exception: # pragma: no cover
+ pass
+
+
+def _build_ffmpeg_command() -> list[str]:
+ """Construct the ffmpeg command line we execute for each attempt."""
- command = [
+ return [
'ffmpeg',
'-nostdin',
'-hide_banner',
@@ -124,97 +157,174 @@ def start_ffmpeg_process():
'-hls_flags', 'delete_segments+independent_segments',
'-hls_delete_threshold', str(HLS_DELETE_THRESHOLD),
'-hls_segment_filename', str(STREAM_PATH / 'segment-%05d.ts'),
- str(STREAM_PATH / 'stream.m3u8')
+ str(STREAM_PATH / 'stream.m3u8'),
]
- logger.info("Starting FFmpeg for live stream")
+
+def _run_ffmpeg_once(attempt: int) -> bool:
+ """Start ffmpeg, wait for it to exit, and report whether it ran cleanly."""
+ global ffmpeg_process, stream_active
+
+ logger.info("Starting FFmpeg for live stream (attempt %s)", attempt)
try:
process = subprocess.Popen(
- command,
+ _build_ffmpeg_command(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
- bufsize=1
+ bufsize=1,
)
+ except Exception as exc: # pragma: no cover - best effort logging
+ logger.error("Failed to start FFmpeg: %s", exc)
+ return False
+
+ ffmpeg_process = process
+ stream_active = True
+ start_time = time.monotonic()
+
+ logger.info("FFmpeg process started with PID %s", process.pid)
+ logger.info('Stream active; waiting for playlist to appear')
+
+ _start_pipe_logger(process.stderr, logging.WARNING)
+ _start_pipe_logger(process.stdout, logging.DEBUG)
- ffmpeg_process = process
- stream_active = True
-
- def pipe_logger(pipe, level):
- """Continuously drain an ffmpeg pipe and log each line."""
- with pipe:
- for line in iter(pipe.readline, ''):
- line = line.strip()
- if line:
- logger.log(level, "ffmpeg: %s", line)
-
- threading.Thread(
- target=pipe_logger,
- args=(process.stderr, logging.WARNING),
- daemon=True
- ).start()
-
- threading.Thread(
- target=pipe_logger,
- args=(process.stdout, logging.DEBUG),
- daemon=True
- ).start()
-
- def monitor_process():
- """Collect ffmpeg result and reset state when it exits."""
- global ffmpeg_process, stream_active
-
- logger.info("FFmpeg process started with PID %s", process.pid)
-
- exit_code = process.wait()
- if exit_code != 0:
- logger.error("FFmpeg exited with code %s", exit_code)
- else:
- logger.info("FFmpeg process completed successfully")
-
- ffmpeg_process = None
- stream_active = False
-
- threading.Thread(target=monitor_process, daemon=True).start()
+ playlist_ready, playlist_reason = _wait_for_playlist(process, attempt)
+
+ if not playlist_ready and not ffmpeg_stop_event.is_set():
+ if playlist_reason == 'timeout' and process.poll() is None:
+ logger.info("Terminating FFmpeg attempt %s after playlist timeout", attempt)
+ _terminate_ffmpeg_process(process, log_errors=False)
+ elif playlist_reason == 'ffmpeg_exited':
+ logger.debug("FFmpeg exited before playlist became available on attempt %s", attempt)
+
+ exit_code = process.wait()
+ elapsed = time.monotonic() - start_time
+
+ ffmpeg_process = None
+ stream_active = False
+
+ if ffmpeg_stop_event.is_set():
+ logger.info(
+ "FFmpeg stop requested; process exited with code %s after %.2fs",
+ exit_code,
+ elapsed,
+ )
return True
- except Exception as exc:
- logger.error(f"Failed to start FFmpeg: {exc}")
- ffmpeg_process = None
- stream_active = False
+ if not playlist_ready:
+ logger.error(
+ "FFmpeg attempt %s ended (exit %s) before playlist became available (reason=%s)",
+ attempt,
+ exit_code,
+ playlist_reason,
+ )
+ return False
+
+ if exit_code != 0:
+ logger.error("FFmpeg exited with code %s after %.2fs", exit_code, elapsed)
+ return False
+
+ logger.info("FFmpeg process completed successfully after %.2fs", elapsed)
+ return True
+
+
+def start_ffmpeg_process():
+ """Start FFmpeg to convert RTMP ingest into HLS."""
+ global ffmpeg_worker_thread
+
+ if ffmpeg_worker_thread and ffmpeg_worker_thread.is_alive():
+ logger.warning("FFmpeg worker already running; skipping duplicate start")
+ return True
+
+ reset_stream_path()
+ logger.info(f"Stream directory ready at {STREAM_PATH}")
+
+ # Sanity check that we can write to the path before spawning ffmpeg
+ try:
+ test_file = STREAM_PATH / "write_test.txt"
+ with open(test_file, "w", encoding="utf-8") as probe:
+ probe.write("ok")
+ if test_file.exists():
+ test_file.unlink()
+ except Exception as exc: # pragma: no cover - best effort logging
+ logger.error(f"Could not write to stream directory: {exc}")
+
+ ffmpeg_stop_event.clear()
+
+ def worker():
+ attempt = 0
+ delay = max(FFMPEG_RETRY_DELAY, 0.1)
+
+ while not ffmpeg_stop_event.is_set():
+ attempt += 1
+ ran_cleanly = _run_ffmpeg_once(attempt)
+ if ran_cleanly:
+ break
+
+ if ffmpeg_stop_event.is_set():
+ break
+
+ wait_time = min(delay, FFMPEG_RETRY_MAX_DELAY)
+ logger.info(
+ "FFmpeg will retry in %.2fs (next attempt %s)",
+ wait_time,
+ attempt + 1,
+ )
+
+ if ffmpeg_stop_event.wait(wait_time):
+ break
+
+ try:
+ reset_stream_path()
+ except Exception as exc: # pragma: no cover - best effort logging
+ logger.error(f"Error resetting stream directory between attempts: {exc}")
+
+ delay = min(delay * FFMPEG_RETRY_BACKOFF, FFMPEG_RETRY_MAX_DELAY)
+
+ logger.debug("FFmpeg worker exiting")
+
+ try:
+ ffmpeg_worker_thread = threading.Thread(target=worker, daemon=True, name="ffmpeg-worker")
+ ffmpeg_worker_thread.start()
+ return True
+ except Exception as exc: # pragma: no cover - defensive logging
+ logger.error(f"Failed to start FFmpeg worker thread: {exc}")
+ ffmpeg_worker_thread = None
return False
def cleanup_stream():
"""Stop FFmpeg and purge any cached HLS segments."""
- global ffmpeg_process, stream_active
+ global ffmpeg_process, stream_active, ffmpeg_worker_thread
+
+ ffmpeg_stop_event.set()
if ffmpeg_process:
- try:
- ffmpeg_process.terminate()
- ffmpeg_process.wait(timeout=5)
- except Exception as exc: # pragma: no cover - best effort logging
- logger.error(f"Error stopping FFmpeg: {exc}")
- try:
- ffmpeg_process.kill()
- except Exception: # pragma: no cover
- pass
- finally:
- ffmpeg_process = None
+ _terminate_ffmpeg_process(ffmpeg_process)
+ ffmpeg_process = None
+
+ if ffmpeg_worker_thread and ffmpeg_worker_thread.is_alive():
+ ffmpeg_worker_thread.join(timeout=5)
+ if ffmpeg_worker_thread.is_alive(): # pragma: no cover - diagnostic
+ logger.warning("FFmpeg worker thread did not exit cleanly")
+ ffmpeg_worker_thread = None
stream_active = False
+
try:
reset_stream_path()
except Exception as exc: # pragma: no cover - best effort logging
logger.error(f"Error resetting stream directory: {exc}")
+ ffmpeg_stop_event.clear()
+
# Routes
@app.route('/rtmp_callbacks/on_publish', methods=['POST'])
def on_publish():
"""Callback when a stream starts"""
- global stream_active
+ global stream_active, ffmpeg_worker_thread
stream_key = request.form.get('name')
logger.info("on_publish received for key=%s", stream_key)
@@ -225,7 +335,7 @@ def on_publish():
logger.warning("Unauthorized stream key attempted to publish: %s", stream_key)
return "Unauthorized", 403
- if stream_active:
+ if stream_active or (ffmpeg_worker_thread and ffmpeg_worker_thread.is_alive()):
logger.info("Stream already active, recycling existing session")
cleanup_stream()
@@ -235,13 +345,6 @@ def on_publish():
playlist_path = STREAM_PATH / 'stream.m3u8'
if playlist_path.exists():
logger.info("Existing playlist found at %s", playlist_path)
- else:
- logger.info('Stream active; waiting for playlist to appear')
- threading.Thread(
- target=_log_playlist_availability,
- args=(playlist_path,),
- daemon=True,
- ).start()
return "OK"
@@ -249,7 +352,7 @@ def on_publish():
@app.route('/rtmp_callbacks/on_publish_done', methods=['POST'])
def on_publish_done():
"""Callback when a stream ends"""
- global stream_active
+ global stream_active, ffmpeg_worker_thread
stream_key = request.form.get('name')
logger.info("on_publish_done received for key=%s", stream_key)
@@ -258,7 +361,7 @@ def on_publish_done():
logger.warning("on_publish_done received for unknown key: %s", stream_key)
return "Bad request", 400
- if stream_active:
+ if stream_active or (ffmpeg_worker_thread and ffmpeg_worker_thread.is_alive()):
cleanup_stream()
logger.info("Stream publishing ended")