#!/usr/bin/env python3 from dotenv import load_dotenv import os import secrets import subprocess import threading import shutil import time from pathlib import Path from flask import Flask, request import logging import atexit # Setup Flask and load environment variables app = Flask(__name__) application = app # WSGI servers like gunicorn look for 'application' load_dotenv() # Configuration INGEST_PSK = os.environ.get('OBS_STREAM_KEY') or os.environ.get('STREAM_PSK') INGEST_RTMP_HOST = os.environ.get('INGEST_RTMP_HOST', '127.0.0.1') try: INGEST_RTMP_PORT = int(os.environ.get('INGEST_RTMP_PORT', '1936')) except ValueError as exc: raise ValueError('INGEST_RTMP_PORT must be an integer') from exc INGEST_THREAD_QUEUE_SIZE = int(os.environ.get('INGEST_THREAD_QUEUE_SIZE', '4096')) HLS_SEGMENT_TIME = float(os.environ.get('HLS_SEGMENT_TIME', '2')) HLS_PLAYLIST_SIZE = int(os.environ.get('HLS_PLAYLIST_SIZE', '3')) HLS_DELETE_THRESHOLD = int(os.environ.get('HLS_DELETE_THRESHOLD', '20')) HLS_PLAYLIST_TIMEOUT = float(os.environ.get('HLS_PLAYLIST_TIMEOUT', '5')) HLS_PLAYLIST_POLL_INTERVAL = float(os.environ.get('HLS_PLAYLIST_POLL_INTERVAL', '0.1')) BASE_DIR = Path(os.environ.get('STREAM_DIR', '/var/www/streams')) SERVER_DOMAIN = os.environ.get('SERVER_DOMAIN', 'yummers.b-cdn.net') STREAM_HEX = secrets.token_hex(16) STREAM_PATH = BASE_DIR / 'live' / STREAM_HEX HLS_ROUTE_PREFIX = f"/hls/{STREAM_HEX}" # Media output settings tuned for VRChat playback AUDIO_BITRATE = '256k' AUDIO_CHANNELS = 2 AUDIO_SAMPLE_RATE = 48000 # Setup logging logging.basicConfig( level=getattr(logging, os.environ.get('LOG_LEVEL', 'INFO')), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('obs_proxy') # Global state ffmpeg_process = None ffmpeg_worker_thread = None ffmpeg_stop_event = threading.Event() def _worker_running() -> bool: """Return True if the ffmpeg worker thread is currently active.""" return ffmpeg_worker_thread is not None and ffmpeg_worker_thread.is_alive() # Validate configuration if not INGEST_PSK: logger.error("OBS_STREAM_KEY/STREAM_PSK is not set") exit(1) # Create required directories BASE_DIR.mkdir(parents=True, exist_ok=True) STREAM_PATH.mkdir(parents=True, exist_ok=True) def reset_stream_path(): """Ensure the live stream directory is empty and ready.""" shutil.rmtree(STREAM_PATH, ignore_errors=True) STREAM_PATH.mkdir(parents=True, exist_ok=True) def _safe_reset_stream_path(context: str) -> bool: """Reset the stream directory and log any failure.""" try: reset_stream_path() return True except Exception as exc: # pragma: no cover - best effort logging logger.error("Error resetting stream directory %s: %s", context, exc) return False def _verify_stream_path_writable() -> None: """Ensure the stream directory is writable before launching FFmpeg.""" test_file = STREAM_PATH / "write_test.txt" try: with open(test_file, "w", encoding="utf-8") as probe: probe.write("ok") except Exception as exc: # pragma: no cover - best effort logging logger.error("Could not write to stream directory: %s", exc) finally: if test_file.exists(): test_file.unlink() def _wait_for_playlist(process: subprocess.Popen[str], attempt: int): """Block until the playlist appears or we hit the configured timeout.""" playlist_path = STREAM_PATH / 'stream.m3u8' start = time.monotonic() while True: if playlist_path.exists(): elapsed = time.monotonic() - start segments = sorted(seg.name for seg in playlist_path.parent.glob('segment-*.ts')) logger.info( "HLS playlist materialized after %.2fs at %s; segments=%s", elapsed, playlist_path, segments, ) return True, 'ready' if ffmpeg_stop_event.is_set(): return False, 'stop_requested' if process.poll() is not None: return False, 'ffmpeg_exited' elapsed = time.monotonic() - start if elapsed >= HLS_PLAYLIST_TIMEOUT: logger.warning( "HLS playlist still missing after %.2fs on attempt %s; recycling FFmpeg", elapsed, attempt, ) return False, 'timeout' if ffmpeg_stop_event.wait(HLS_PLAYLIST_POLL_INTERVAL): return False, 'stop_requested' def _start_pipe_logger(pipe, level): """Drain an ffmpeg pipe on a background thread to avoid deadlocks.""" def pipe_logger(): with pipe: for line in iter(pipe.readline, ''): line = line.strip() if line: logger.log(level, "ffmpeg: %s", line) threading.Thread(target=pipe_logger, daemon=True).start() def _terminate_ffmpeg_process(process: subprocess.Popen[str], *, timeout: float = 5.0, log_errors: bool = True) -> None: """Terminate an ffmpeg process, falling back to kill if needed.""" try: process.terminate() process.wait(timeout=timeout) except Exception as exc: # pragma: no cover - best effort logging if log_errors: logger.error(f"Error stopping FFmpeg: {exc}") try: process.kill() except Exception: # pragma: no cover pass def _build_ffmpeg_command() -> list[str]: """Construct the ffmpeg command line we execute for each attempt.""" return [ 'ffmpeg', '-nostdin', '-hide_banner', '-loglevel', os.environ.get('FFMPEG_LOGLEVEL', 'warning'), '-fflags', '+genpts', '-thread_queue_size', str(INGEST_THREAD_QUEUE_SIZE), '-i', f'rtmp://{INGEST_RTMP_HOST}:{INGEST_RTMP_PORT}/live/{INGEST_PSK}', '-map', '0:v:0?', '-map', '0:a:0?', '-c:v', 'copy', '-c:a', 'aac', '-b:a', AUDIO_BITRATE, '-ac', str(AUDIO_CHANNELS), '-ar', str(AUDIO_SAMPLE_RATE), '-f', 'hls', '-hls_time', str(HLS_SEGMENT_TIME), '-hls_list_size', str(HLS_PLAYLIST_SIZE), '-hls_flags', 'delete_segments+independent_segments', '-hls_delete_threshold', str(HLS_DELETE_THRESHOLD), '-hls_segment_filename', str(STREAM_PATH / 'segment-%05d.ts'), str(STREAM_PATH / 'stream.m3u8'), ] def _run_ffmpeg_once(attempt: int) -> bool: """Start ffmpeg, wait for it to exit, and report whether it ran cleanly.""" global ffmpeg_process logger.info("Starting FFmpeg for live stream (attempt %s)", attempt) try: process = subprocess.Popen( _build_ffmpeg_command(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, ) except Exception as exc: # pragma: no cover - best effort logging logger.error("Failed to start FFmpeg: %s", exc) return False ffmpeg_process = process start_time = time.monotonic() logger.info("FFmpeg process started with PID %s", process.pid) logger.info('Stream active; waiting for playlist to appear') _start_pipe_logger(process.stderr, logging.WARNING) _start_pipe_logger(process.stdout, logging.DEBUG) playlist_ready, playlist_reason = _wait_for_playlist(process, attempt) if not playlist_ready and not ffmpeg_stop_event.is_set(): if playlist_reason == 'timeout' and process.poll() is None: logger.info("Terminating FFmpeg attempt %s after playlist timeout", attempt) _terminate_ffmpeg_process(process, log_errors=False) elif playlist_reason == 'ffmpeg_exited': logger.debug("FFmpeg exited before playlist became available on attempt %s", attempt) exit_code = process.wait() elapsed = time.monotonic() - start_time ffmpeg_process = None if ffmpeg_stop_event.is_set(): logger.info( "FFmpeg stop requested; process exited with code %s after %.2fs", exit_code, elapsed, ) return True if not playlist_ready: logger.error( "FFmpeg attempt %s ended (exit %s) before playlist became available (reason=%s)", attempt, exit_code, playlist_reason, ) return False if exit_code != 0: logger.error("FFmpeg exited with code %s after %.2fs", exit_code, elapsed) return False logger.info("FFmpeg process completed successfully after %.2fs", elapsed) return True def _ffmpeg_worker_loop() -> None: """Keep spawning FFmpeg until it runs cleanly or a stop is requested.""" attempt = 0 while not ffmpeg_stop_event.is_set(): attempt += 1 if _run_ffmpeg_once(attempt): break if ffmpeg_stop_event.is_set(): break _safe_reset_stream_path("between FFmpeg attempts") logger.debug("FFmpeg worker exiting") def start_ffmpeg_process(): """Start FFmpeg to convert RTMP ingest into HLS.""" global ffmpeg_worker_thread if _worker_running(): logger.warning("FFmpeg worker already running; skipping duplicate start") return True if _safe_reset_stream_path("before FFmpeg start"): logger.info(f"Stream directory ready at {STREAM_PATH}") _verify_stream_path_writable() ffmpeg_stop_event.clear() try: ffmpeg_worker_thread = threading.Thread( target=_ffmpeg_worker_loop, daemon=True, name="ffmpeg-worker", ) ffmpeg_worker_thread.start() return True except Exception as exc: # pragma: no cover - defensive logging logger.error(f"Failed to start FFmpeg worker thread: {exc}") ffmpeg_worker_thread = None return False def cleanup_stream(): """Stop FFmpeg and purge any cached HLS segments.""" global ffmpeg_process, ffmpeg_worker_thread ffmpeg_stop_event.set() if ffmpeg_process: _terminate_ffmpeg_process(ffmpeg_process) ffmpeg_process = None worker = ffmpeg_worker_thread if worker and worker.is_alive(): worker.join(timeout=5) if worker.is_alive(): # pragma: no cover - diagnostic logger.warning("FFmpeg worker thread did not exit cleanly") ffmpeg_worker_thread = None _safe_reset_stream_path("during cleanup") ffmpeg_stop_event.clear() # Routes @app.route('/rtmp_callbacks/on_publish', methods=['POST']) def on_publish(): """Callback when a stream starts""" stream_key = request.form.get('name') logger.info("on_publish received for key=%s", stream_key) if not stream_key or stream_key != INGEST_PSK: logger.warning("Unauthorized stream key attempted to publish: %s", stream_key) return "Unauthorized", 403 if ffmpeg_process or _worker_running(): logger.info("Stream already active, recycling existing session") cleanup_stream() if not start_ffmpeg_process(): return "Failed to start stream", 500 playlist_path = STREAM_PATH / 'stream.m3u8' if playlist_path.exists(): logger.info("Existing playlist found at %s", playlist_path) return "OK" @app.route('/rtmp_callbacks/on_publish_done', methods=['POST']) def on_publish_done(): """Callback when a stream ends""" stream_key = request.form.get('name') logger.info("on_publish_done received for key=%s", stream_key) if not stream_key or stream_key != INGEST_PSK: logger.warning("on_publish_done received for unknown key: %s", stream_key) return "Bad request", 400 if ffmpeg_process or _worker_running(): cleanup_stream() logger.info("Stream publishing ended") return "OK" @app.route('/health') def health_check(): """Health check endpoint""" return { "status": "healthy", "streaming": ffmpeg_process is not None } def print_instructions(): """Print usage instructions""" obs_url = f"rtmps://{SERVER_DOMAIN}:1935/live" hls_url = f"https://{SERVER_DOMAIN}{HLS_ROUTE_PREFIX}/stream.m3u8" print("\n" + "="*80) print(f"{'OBS TO VRCHAT STREAMING PROXY':^80}") print("="*80) print("\n[URLS]") print(f" OBS ingest: {obs_url}") print(f" HLS: {hls_url}") print("\n[STATUS]") print(f" Stream is {'ACTIVE' if ffmpeg_process else 'INACTIVE'}") print(f" Session ID: {STREAM_HEX}") print(f" On-disk path: {STREAM_PATH}") print("="*80 + "\n") # Register cleanup once the module is imported so any WSGI server benefits. atexit.register(cleanup_stream) def main(): """Entry point for running with a production WSGI server.""" try: from waitress import serve except ImportError as exc: # pragma: no cover - defensive guardrail raise RuntimeError( "Waitress is required to run this service. Install it with 'pip install waitress'." ) from exc print_instructions() host = os.environ.get('HOST', '0.0.0.0') port = int(os.environ.get('PORT', 5000)) logger.info("Starting Waitress on %s:%s", host, port) serve(app, host=host, port=port) if __name__ == '__main__': main()