#!/usr/bin/env python3 from dotenv import load_dotenv import os import secrets import subprocess import threading import shutil import time from pathlib import Path from flask import Flask, request import logging import atexit # Setup Flask and load environment variables app = Flask(__name__) application = app # WSGI servers like gunicorn look for 'application' load_dotenv() # Configuration INGEST_PSK = os.environ.get('OBS_STREAM_KEY') or os.environ.get('STREAM_PSK') INGEST_RTMP_HOST = os.environ.get('INGEST_RTMP_HOST', '127.0.0.1') try: INGEST_RTMP_PORT = int(os.environ.get('INGEST_RTMP_PORT', '1936')) except ValueError as exc: raise ValueError('INGEST_RTMP_PORT must be an integer') from exc INGEST_THREAD_QUEUE_SIZE = int(os.environ.get('INGEST_THREAD_QUEUE_SIZE', '4096')) HLS_SEGMENT_TIME = float(os.environ.get('HLS_SEGMENT_TIME', '2')) HLS_PLAYLIST_SIZE = int(os.environ.get('HLS_PLAYLIST_SIZE', '20')) HLS_DELETE_THRESHOLD = int(os.environ.get('HLS_DELETE_THRESHOLD', '20')) HLS_PLAYLIST_TIMEOUT = float(os.environ.get('HLS_PLAYLIST_TIMEOUT', '45')) HLS_PLAYLIST_POLL_INTERVAL = float(os.environ.get('HLS_PLAYLIST_POLL_INTERVAL', '0.5')) BASE_DIR = Path(os.environ.get('STREAM_DIR', '/var/www/streams')) SERVER_DOMAIN = os.environ.get('SERVER_DOMAIN', 'yummers.dev') STREAM_HEX = secrets.token_hex(16) STREAM_PATH = BASE_DIR / 'live' / STREAM_HEX HLS_ROUTE_PREFIX = f"/hls/{STREAM_HEX}" # Media output settings tuned for VRChat playback AUDIO_BITRATE = '256k' AUDIO_CHANNELS = 2 AUDIO_SAMPLE_RATE = 48000 FFMPEG_RETRY_DELAY = float(os.environ.get('FFMPEG_RETRY_DELAY', '1.0')) FFMPEG_RETRY_MAX_DELAY = float(os.environ.get('FFMPEG_RETRY_MAX_DELAY', '5.0')) FFMPEG_RETRY_BACKOFF = float(os.environ.get('FFMPEG_RETRY_BACKOFF', '1.5')) # Setup logging logging.basicConfig( level=getattr(logging, os.environ.get('LOG_LEVEL', 'INFO')), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('obs_proxy') # Global state stream_active = False ffmpeg_process = None ffmpeg_worker_thread = None ffmpeg_stop_event = threading.Event() # Validate configuration if not INGEST_PSK: logger.error("OBS_STREAM_KEY/STREAM_PSK is not set") exit(1) # Create required directories BASE_DIR.mkdir(parents=True, exist_ok=True) STREAM_PATH.mkdir(parents=True, exist_ok=True) def reset_stream_path(): """Ensure the live stream directory is empty and ready.""" shutil.rmtree(STREAM_PATH, ignore_errors=True) STREAM_PATH.mkdir(parents=True, exist_ok=True) def _wait_for_playlist(process: subprocess.Popen[str], attempt: int): """Block until the playlist appears or we hit the configured timeout.""" playlist_path = STREAM_PATH / 'stream.m3u8' start = time.monotonic() while True: if playlist_path.exists(): elapsed = time.monotonic() - start segments = sorted(seg.name for seg in playlist_path.parent.glob('segment-*.ts')) logger.info( "HLS playlist materialized after %.2fs at %s; segments=%s", elapsed, playlist_path, segments, ) return True, 'ready' if ffmpeg_stop_event.is_set(): return False, 'stop_requested' if process.poll() is not None: return False, 'ffmpeg_exited' elapsed = time.monotonic() - start if elapsed >= HLS_PLAYLIST_TIMEOUT: logger.warning( "HLS playlist still missing after %.2fs on attempt %s; recycling FFmpeg", elapsed, attempt, ) return False, 'timeout' if ffmpeg_stop_event.wait(HLS_PLAYLIST_POLL_INTERVAL): return False, 'stop_requested' def _start_pipe_logger(pipe, level): """Drain an ffmpeg pipe on a background thread to avoid deadlocks.""" def pipe_logger(): with pipe: for line in iter(pipe.readline, ''): line = line.strip() if line: logger.log(level, "ffmpeg: %s", line) threading.Thread(target=pipe_logger, daemon=True).start() def _terminate_ffmpeg_process(process: subprocess.Popen[str], *, timeout: float = 5.0, log_errors: bool = True) -> None: """Terminate an ffmpeg process, falling back to kill if needed.""" try: process.terminate() process.wait(timeout=timeout) except Exception as exc: # pragma: no cover - best effort logging if log_errors: logger.error(f"Error stopping FFmpeg: {exc}") try: process.kill() except Exception: # pragma: no cover pass def _build_ffmpeg_command() -> list[str]: """Construct the ffmpeg command line we execute for each attempt.""" return [ 'ffmpeg', '-nostdin', '-hide_banner', '-loglevel', os.environ.get('FFMPEG_LOGLEVEL', 'warning'), '-fflags', '+genpts', '-thread_queue_size', str(INGEST_THREAD_QUEUE_SIZE), '-i', f'rtmp://{INGEST_RTMP_HOST}:{INGEST_RTMP_PORT}/live/{INGEST_PSK}', '-map', '0:v:0?', '-map', '0:a:0?', '-c:v', 'copy', '-c:a', 'aac', '-b:a', AUDIO_BITRATE, '-ac', str(AUDIO_CHANNELS), '-ar', str(AUDIO_SAMPLE_RATE), '-f', 'hls', '-hls_time', str(HLS_SEGMENT_TIME), '-hls_list_size', str(HLS_PLAYLIST_SIZE), '-hls_flags', 'delete_segments+independent_segments', '-hls_delete_threshold', str(HLS_DELETE_THRESHOLD), '-hls_segment_filename', str(STREAM_PATH / 'segment-%05d.ts'), str(STREAM_PATH / 'stream.m3u8'), ] def _run_ffmpeg_once(attempt: int) -> bool: """Start ffmpeg, wait for it to exit, and report whether it ran cleanly.""" global ffmpeg_process, stream_active logger.info("Starting FFmpeg for live stream (attempt %s)", attempt) try: process = subprocess.Popen( _build_ffmpeg_command(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, ) except Exception as exc: # pragma: no cover - best effort logging logger.error("Failed to start FFmpeg: %s", exc) return False ffmpeg_process = process stream_active = True start_time = time.monotonic() logger.info("FFmpeg process started with PID %s", process.pid) logger.info('Stream active; waiting for playlist to appear') _start_pipe_logger(process.stderr, logging.WARNING) _start_pipe_logger(process.stdout, logging.DEBUG) playlist_ready, playlist_reason = _wait_for_playlist(process, attempt) if not playlist_ready and not ffmpeg_stop_event.is_set(): if playlist_reason == 'timeout' and process.poll() is None: logger.info("Terminating FFmpeg attempt %s after playlist timeout", attempt) _terminate_ffmpeg_process(process, log_errors=False) elif playlist_reason == 'ffmpeg_exited': logger.debug("FFmpeg exited before playlist became available on attempt %s", attempt) exit_code = process.wait() elapsed = time.monotonic() - start_time ffmpeg_process = None stream_active = False if ffmpeg_stop_event.is_set(): logger.info( "FFmpeg stop requested; process exited with code %s after %.2fs", exit_code, elapsed, ) return True if not playlist_ready: logger.error( "FFmpeg attempt %s ended (exit %s) before playlist became available (reason=%s)", attempt, exit_code, playlist_reason, ) return False if exit_code != 0: logger.error("FFmpeg exited with code %s after %.2fs", exit_code, elapsed) return False logger.info("FFmpeg process completed successfully after %.2fs", elapsed) return True def start_ffmpeg_process(): """Start FFmpeg to convert RTMP ingest into HLS.""" global ffmpeg_worker_thread if ffmpeg_worker_thread and ffmpeg_worker_thread.is_alive(): logger.warning("FFmpeg worker already running; skipping duplicate start") return True reset_stream_path() logger.info(f"Stream directory ready at {STREAM_PATH}") # Sanity check that we can write to the path before spawning ffmpeg try: test_file = STREAM_PATH / "write_test.txt" with open(test_file, "w", encoding="utf-8") as probe: probe.write("ok") if test_file.exists(): test_file.unlink() except Exception as exc: # pragma: no cover - best effort logging logger.error(f"Could not write to stream directory: {exc}") ffmpeg_stop_event.clear() def worker(): attempt = 0 delay = max(FFMPEG_RETRY_DELAY, 0.1) while not ffmpeg_stop_event.is_set(): attempt += 1 ran_cleanly = _run_ffmpeg_once(attempt) if ran_cleanly: break if ffmpeg_stop_event.is_set(): break wait_time = min(delay, FFMPEG_RETRY_MAX_DELAY) logger.info( "FFmpeg will retry in %.2fs (next attempt %s)", wait_time, attempt + 1, ) if ffmpeg_stop_event.wait(wait_time): break try: reset_stream_path() except Exception as exc: # pragma: no cover - best effort logging logger.error(f"Error resetting stream directory between attempts: {exc}") delay = min(delay * FFMPEG_RETRY_BACKOFF, FFMPEG_RETRY_MAX_DELAY) logger.debug("FFmpeg worker exiting") try: ffmpeg_worker_thread = threading.Thread(target=worker, daemon=True, name="ffmpeg-worker") ffmpeg_worker_thread.start() return True except Exception as exc: # pragma: no cover - defensive logging logger.error(f"Failed to start FFmpeg worker thread: {exc}") ffmpeg_worker_thread = None return False def cleanup_stream(): """Stop FFmpeg and purge any cached HLS segments.""" global ffmpeg_process, stream_active, ffmpeg_worker_thread ffmpeg_stop_event.set() if ffmpeg_process: _terminate_ffmpeg_process(ffmpeg_process) ffmpeg_process = None if ffmpeg_worker_thread and ffmpeg_worker_thread.is_alive(): ffmpeg_worker_thread.join(timeout=5) if ffmpeg_worker_thread.is_alive(): # pragma: no cover - diagnostic logger.warning("FFmpeg worker thread did not exit cleanly") ffmpeg_worker_thread = None stream_active = False try: reset_stream_path() except Exception as exc: # pragma: no cover - best effort logging logger.error(f"Error resetting stream directory: {exc}") ffmpeg_stop_event.clear() # Routes @app.route('/rtmp_callbacks/on_publish', methods=['POST']) def on_publish(): """Callback when a stream starts""" global stream_active, ffmpeg_worker_thread stream_key = request.form.get('name') logger.info("on_publish received for key=%s", stream_key) old_playlist = STREAM_PATH / 'stream.m3u8' if not stream_key or stream_key != INGEST_PSK: logger.warning("Unauthorized stream key attempted to publish: %s", stream_key) return "Unauthorized", 403 if stream_active or (ffmpeg_worker_thread and ffmpeg_worker_thread.is_alive()): logger.info("Stream already active, recycling existing session") cleanup_stream() if not start_ffmpeg_process(): return "Failed to start stream", 500 playlist_path = STREAM_PATH / 'stream.m3u8' if playlist_path.exists(): logger.info("Existing playlist found at %s", playlist_path) return "OK" @app.route('/rtmp_callbacks/on_publish_done', methods=['POST']) def on_publish_done(): """Callback when a stream ends""" global stream_active, ffmpeg_worker_thread stream_key = request.form.get('name') logger.info("on_publish_done received for key=%s", stream_key) if not stream_key or stream_key != INGEST_PSK: logger.warning("on_publish_done received for unknown key: %s", stream_key) return "Bad request", 400 if stream_active or (ffmpeg_worker_thread and ffmpeg_worker_thread.is_alive()): cleanup_stream() logger.info("Stream publishing ended") return "OK" @app.route('/health') def health_check(): """Health check endpoint""" return { "status": "healthy", "streaming": ffmpeg_process is not None } def print_instructions(): """Print usage instructions""" obs_url = f"rtmps://{SERVER_DOMAIN}:1935/live" hls_url = f"https://{SERVER_DOMAIN}{HLS_ROUTE_PREFIX}/stream.m3u8" print("\n" + "="*80) print(f"{'OBS TO VRCHAT STREAMING PROXY':^80}") print("="*80) print("\n[URLS]") print(f" OBS ingest: {obs_url}") print(f" HLS: {hls_url}") print("\n[STATUS]") print(f" Stream is {'ACTIVE' if ffmpeg_process else 'INACTIVE'}") print(f" Session ID: {STREAM_HEX}") print(f" On-disk path: {STREAM_PATH}") print("="*80 + "\n") # Register cleanup once the module is imported so any WSGI server benefits. atexit.register(cleanup_stream) def main(): """Entry point for running with a production WSGI server.""" try: from waitress import serve except ImportError as exc: # pragma: no cover - defensive guardrail raise RuntimeError( "Waitress is required to run this service. Install it with 'pip install waitress'." ) from exc print_instructions() host = os.environ.get('HOST', '0.0.0.0') port = int(os.environ.get('PORT', 5000)) logger.info("Starting Waitress on %s:%s", host, port) serve(app, host=host, port=port) if __name__ == '__main__': main()