summaryrefslogtreecommitdiffstats
path: root/opt
diff options
context:
space:
mode:
Diffstat (limited to 'opt')
-rwxr-xr-xopt/obsproxy/server.py140
1 files changed, 66 insertions, 74 deletions
diff --git a/opt/obsproxy/server.py b/opt/obsproxy/server.py
index 6a52653..3f9001a 100755
--- a/opt/obsproxy/server.py
+++ b/opt/obsproxy/server.py
@@ -25,10 +25,10 @@ except ValueError as exc:
raise ValueError('INGEST_RTMP_PORT must be an integer') from exc
INGEST_THREAD_QUEUE_SIZE = int(os.environ.get('INGEST_THREAD_QUEUE_SIZE', '4096'))
HLS_SEGMENT_TIME = float(os.environ.get('HLS_SEGMENT_TIME', '2'))
-HLS_PLAYLIST_SIZE = int(os.environ.get('HLS_PLAYLIST_SIZE', '20'))
+HLS_PLAYLIST_SIZE = int(os.environ.get('HLS_PLAYLIST_SIZE', '4'))
HLS_DELETE_THRESHOLD = int(os.environ.get('HLS_DELETE_THRESHOLD', '20'))
-HLS_PLAYLIST_TIMEOUT = float(os.environ.get('HLS_PLAYLIST_TIMEOUT', '45'))
-HLS_PLAYLIST_POLL_INTERVAL = float(os.environ.get('HLS_PLAYLIST_POLL_INTERVAL', '0.5'))
+HLS_PLAYLIST_TIMEOUT = float(os.environ.get('HLS_PLAYLIST_TIMEOUT', '5'))
+HLS_PLAYLIST_POLL_INTERVAL = float(os.environ.get('HLS_PLAYLIST_POLL_INTERVAL', '0.2'))
BASE_DIR = Path(os.environ.get('STREAM_DIR', '/var/www/streams'))
SERVER_DOMAIN = os.environ.get('SERVER_DOMAIN', 'yummers.dev')
STREAM_HEX = secrets.token_hex(16)
@@ -38,9 +38,6 @@ HLS_ROUTE_PREFIX = f"/hls/{STREAM_HEX}"
AUDIO_BITRATE = '256k'
AUDIO_CHANNELS = 2
AUDIO_SAMPLE_RATE = 48000
-FFMPEG_RETRY_DELAY = float(os.environ.get('FFMPEG_RETRY_DELAY', '1.0'))
-FFMPEG_RETRY_MAX_DELAY = float(os.environ.get('FFMPEG_RETRY_MAX_DELAY', '5.0'))
-FFMPEG_RETRY_BACKOFF = float(os.environ.get('FFMPEG_RETRY_BACKOFF', '1.5'))
# Setup logging
logging.basicConfig(
@@ -50,11 +47,15 @@ logging.basicConfig(
logger = logging.getLogger('obs_proxy')
# Global state
-stream_active = False
ffmpeg_process = None
ffmpeg_worker_thread = None
ffmpeg_stop_event = threading.Event()
+
+def _worker_running() -> bool:
+ """Return True if the ffmpeg worker thread is currently active."""
+ return ffmpeg_worker_thread is not None and ffmpeg_worker_thread.is_alive()
+
# Validate configuration
if not INGEST_PSK:
logger.error("OBS_STREAM_KEY/STREAM_PSK is not set")
@@ -70,6 +71,29 @@ def reset_stream_path():
STREAM_PATH.mkdir(parents=True, exist_ok=True)
+def _safe_reset_stream_path(context: str) -> bool:
+ """Reset the stream directory and log any failure."""
+ try:
+ reset_stream_path()
+ return True
+ except Exception as exc: # pragma: no cover - best effort logging
+ logger.error("Error resetting stream directory %s: %s", context, exc)
+ return False
+
+
+def _verify_stream_path_writable() -> None:
+ """Ensure the stream directory is writable before launching FFmpeg."""
+ test_file = STREAM_PATH / "write_test.txt"
+ try:
+ with open(test_file, "w", encoding="utf-8") as probe:
+ probe.write("ok")
+ except Exception as exc: # pragma: no cover - best effort logging
+ logger.error("Could not write to stream directory: %s", exc)
+ finally:
+ if test_file.exists():
+ test_file.unlink()
+
+
def _wait_for_playlist(process: subprocess.Popen[str], attempt: int):
"""Block until the playlist appears or we hit the configured timeout."""
playlist_path = STREAM_PATH / 'stream.m3u8'
@@ -163,7 +187,7 @@ def _build_ffmpeg_command() -> list[str]:
def _run_ffmpeg_once(attempt: int) -> bool:
"""Start ffmpeg, wait for it to exit, and report whether it ran cleanly."""
- global ffmpeg_process, stream_active
+ global ffmpeg_process
logger.info("Starting FFmpeg for live stream (attempt %s)", attempt)
@@ -180,7 +204,6 @@ def _run_ffmpeg_once(attempt: int) -> bool:
return False
ffmpeg_process = process
- stream_active = True
start_time = time.monotonic()
logger.info("FFmpeg process started with PID %s", process.pid)
@@ -202,7 +225,6 @@ def _run_ffmpeg_once(attempt: int) -> bool:
elapsed = time.monotonic() - start_time
ffmpeg_process = None
- stream_active = False
if ffmpeg_stop_event.is_set():
logger.info(
@@ -229,63 +251,43 @@ def _run_ffmpeg_once(attempt: int) -> bool:
return True
-def start_ffmpeg_process():
- """Start FFmpeg to convert RTMP ingest into HLS."""
- global ffmpeg_worker_thread
-
- if ffmpeg_worker_thread and ffmpeg_worker_thread.is_alive():
- logger.warning("FFmpeg worker already running; skipping duplicate start")
- return True
+def _ffmpeg_worker_loop() -> None:
+ """Keep spawning FFmpeg until it runs cleanly or a stop is requested."""
+ attempt = 0
- reset_stream_path()
- logger.info(f"Stream directory ready at {STREAM_PATH}")
+ while not ffmpeg_stop_event.is_set():
+ attempt += 1
+ if _run_ffmpeg_once(attempt):
+ break
- # Sanity check that we can write to the path before spawning ffmpeg
- try:
- test_file = STREAM_PATH / "write_test.txt"
- with open(test_file, "w", encoding="utf-8") as probe:
- probe.write("ok")
- if test_file.exists():
- test_file.unlink()
- except Exception as exc: # pragma: no cover - best effort logging
- logger.error(f"Could not write to stream directory: {exc}")
-
- ffmpeg_stop_event.clear()
-
- def worker():
- attempt = 0
- delay = max(FFMPEG_RETRY_DELAY, 0.1)
+ if ffmpeg_stop_event.is_set():
+ break
- while not ffmpeg_stop_event.is_set():
- attempt += 1
- ran_cleanly = _run_ffmpeg_once(attempt)
- if ran_cleanly:
- break
+ _safe_reset_stream_path("between FFmpeg attempts")
- if ffmpeg_stop_event.is_set():
- break
+ logger.debug("FFmpeg worker exiting")
- wait_time = min(delay, FFMPEG_RETRY_MAX_DELAY)
- logger.info(
- "FFmpeg will retry in %.2fs (next attempt %s)",
- wait_time,
- attempt + 1,
- )
- if ffmpeg_stop_event.wait(wait_time):
- break
+def start_ffmpeg_process():
+ """Start FFmpeg to convert RTMP ingest into HLS."""
+ global ffmpeg_worker_thread
- try:
- reset_stream_path()
- except Exception as exc: # pragma: no cover - best effort logging
- logger.error(f"Error resetting stream directory between attempts: {exc}")
+ if _worker_running():
+ logger.warning("FFmpeg worker already running; skipping duplicate start")
+ return True
- delay = min(delay * FFMPEG_RETRY_BACKOFF, FFMPEG_RETRY_MAX_DELAY)
+ if _safe_reset_stream_path("before FFmpeg start"):
+ logger.info(f"Stream directory ready at {STREAM_PATH}")
+ _verify_stream_path_writable()
- logger.debug("FFmpeg worker exiting")
+ ffmpeg_stop_event.clear()
try:
- ffmpeg_worker_thread = threading.Thread(target=worker, daemon=True, name="ffmpeg-worker")
+ ffmpeg_worker_thread = threading.Thread(
+ target=_ffmpeg_worker_loop,
+ daemon=True,
+ name="ffmpeg-worker",
+ )
ffmpeg_worker_thread.start()
return True
except Exception as exc: # pragma: no cover - defensive logging
@@ -296,7 +298,7 @@ def start_ffmpeg_process():
def cleanup_stream():
"""Stop FFmpeg and purge any cached HLS segments."""
- global ffmpeg_process, stream_active, ffmpeg_worker_thread
+ global ffmpeg_process, ffmpeg_worker_thread
ffmpeg_stop_event.set()
@@ -304,18 +306,14 @@ def cleanup_stream():
_terminate_ffmpeg_process(ffmpeg_process)
ffmpeg_process = None
- if ffmpeg_worker_thread and ffmpeg_worker_thread.is_alive():
- ffmpeg_worker_thread.join(timeout=5)
- if ffmpeg_worker_thread.is_alive(): # pragma: no cover - diagnostic
+ worker = ffmpeg_worker_thread
+ if worker and worker.is_alive():
+ worker.join(timeout=5)
+ if worker.is_alive(): # pragma: no cover - diagnostic
logger.warning("FFmpeg worker thread did not exit cleanly")
ffmpeg_worker_thread = None
- stream_active = False
-
- try:
- reset_stream_path()
- except Exception as exc: # pragma: no cover - best effort logging
- logger.error(f"Error resetting stream directory: {exc}")
+ _safe_reset_stream_path("during cleanup")
ffmpeg_stop_event.clear()
@@ -324,18 +322,14 @@ def cleanup_stream():
@app.route('/rtmp_callbacks/on_publish', methods=['POST'])
def on_publish():
"""Callback when a stream starts"""
- global stream_active, ffmpeg_worker_thread
-
stream_key = request.form.get('name')
logger.info("on_publish received for key=%s", stream_key)
- old_playlist = STREAM_PATH / 'stream.m3u8'
-
if not stream_key or stream_key != INGEST_PSK:
logger.warning("Unauthorized stream key attempted to publish: %s", stream_key)
return "Unauthorized", 403
- if stream_active or (ffmpeg_worker_thread and ffmpeg_worker_thread.is_alive()):
+ if ffmpeg_process or _worker_running():
logger.info("Stream already active, recycling existing session")
cleanup_stream()
@@ -352,8 +346,6 @@ def on_publish():
@app.route('/rtmp_callbacks/on_publish_done', methods=['POST'])
def on_publish_done():
"""Callback when a stream ends"""
- global stream_active, ffmpeg_worker_thread
-
stream_key = request.form.get('name')
logger.info("on_publish_done received for key=%s", stream_key)
@@ -361,7 +353,7 @@ def on_publish_done():
logger.warning("on_publish_done received for unknown key: %s", stream_key)
return "Bad request", 400
- if stream_active or (ffmpeg_worker_thread and ffmpeg_worker_thread.is_alive()):
+ if ffmpeg_process or _worker_running():
cleanup_stream()
logger.info("Stream publishing ended")