#!/usr/bin/env python3 from dotenv import load_dotenv import os import secrets import subprocess import threading import shutil from pathlib import Path from flask import Flask, request import logging import atexit # Setup Flask and load environment variables app = Flask(__name__) application = app # WSGI servers like gunicorn look for 'application' load_dotenv() # Configuration INGEST_PSK = os.environ.get('OBS_STREAM_KEY') or os.environ.get('STREAM_PSK') INGEST_RTMP_HOST = os.environ.get('INGEST_RTMP_HOST', '127.0.0.1') try: INGEST_RTMP_PORT = int(os.environ.get('INGEST_RTMP_PORT', '1936')) except ValueError as exc: raise ValueError('INGEST_RTMP_PORT must be an integer') from exc INGEST_THREAD_QUEUE_SIZE = int(os.environ.get('INGEST_THREAD_QUEUE_SIZE', '4096')) HLS_SEGMENT_TIME = float(os.environ.get('HLS_SEGMENT_TIME', '2')) HLS_PLAYLIST_SIZE = int(os.environ.get('HLS_PLAYLIST_SIZE', '20')) HLS_DELETE_THRESHOLD = int(os.environ.get('HLS_DELETE_THRESHOLD', '20')) BASE_DIR = Path(os.environ.get('STREAM_DIR', '/var/www/streams')) SERVER_DOMAIN = os.environ.get('SERVER_DOMAIN', 'yummers.dev') STREAM_HEX = secrets.token_hex(16) STREAM_PATH = BASE_DIR / 'live' / STREAM_HEX HLS_ROUTE_PREFIX = f"/hls/{STREAM_HEX}" # Media output settings tuned for VRChat playback AUDIO_BITRATE = '256k' AUDIO_CHANNELS = 2 AUDIO_SAMPLE_RATE = 48000 # Setup logging logging.basicConfig( level=getattr(logging, os.environ.get('LOG_LEVEL', 'INFO')), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('obs_proxy') # Global state stream_active = False ffmpeg_process = None # Validate configuration if not INGEST_PSK: logger.error("OBS_STREAM_KEY/STREAM_PSK is not set") exit(1) # Create required directories BASE_DIR.mkdir(parents=True, exist_ok=True) STREAM_PATH.mkdir(parents=True, exist_ok=True) def reset_stream_path(): """Ensure the live stream directory is empty and ready.""" shutil.rmtree(STREAM_PATH, ignore_errors=True) STREAM_PATH.mkdir(parents=True, exist_ok=True) def start_ffmpeg_process(): """Start FFmpeg to convert RTMP ingest into HLS.""" global ffmpeg_process, stream_active reset_stream_path() logger.info(f"Stream directory ready at {STREAM_PATH}") # Sanity check that we can write to the path before spawning ffmpeg try: test_file = STREAM_PATH / "write_test.txt" with open(test_file, "w", encoding="utf-8") as probe: probe.write("ok") if test_file.exists(): test_file.unlink() except Exception as exc: # pragma: no cover - best effort logging logger.error(f"Could not write to stream directory: {exc}") command = [ 'ffmpeg', '-nostdin', '-hide_banner', '-loglevel', os.environ.get('FFMPEG_LOGLEVEL', 'warning'), '-fflags', '+genpts', '-thread_queue_size', str(INGEST_THREAD_QUEUE_SIZE), '-i', f'rtmp://{INGEST_RTMP_HOST}:{INGEST_RTMP_PORT}/live/{INGEST_PSK}', '-map', '0:v:0?', '-map', '0:a:0?', '-c:v', 'copy', '-c:a', 'aac', '-b:a', AUDIO_BITRATE, '-ac', str(AUDIO_CHANNELS), '-ar', str(AUDIO_SAMPLE_RATE), '-f', 'hls', '-hls_time', str(HLS_SEGMENT_TIME), '-hls_list_size', str(HLS_PLAYLIST_SIZE), '-hls_flags', 'delete_segments+independent_segments', '-hls_delete_threshold', str(HLS_DELETE_THRESHOLD), '-hls_segment_filename', str(STREAM_PATH / 'segment-%05d.ts'), str(STREAM_PATH / 'stream.m3u8') ] logger.info("Starting FFmpeg for live stream") try: process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1 ) ffmpeg_process = process stream_active = True def pipe_logger(pipe, level): """Continuously drain an ffmpeg pipe and log each line.""" with pipe: for line in iter(pipe.readline, ''): line = line.strip() if line: logger.log(level, "ffmpeg: %s", line) threading.Thread( target=pipe_logger, args=(process.stderr, logging.WARNING), daemon=True ).start() threading.Thread( target=pipe_logger, args=(process.stdout, logging.DEBUG), daemon=True ).start() def monitor_process(): """Collect ffmpeg result and reset state when it exits.""" global ffmpeg_process, stream_active logger.info("FFmpeg process started with PID %s", process.pid) exit_code = process.wait() if exit_code != 0: logger.error("FFmpeg exited with code %s", exit_code) else: logger.info("FFmpeg process completed successfully") ffmpeg_process = None stream_active = False threading.Thread(target=monitor_process, daemon=True).start() return True except Exception as exc: logger.error(f"Failed to start FFmpeg: {exc}") ffmpeg_process = None stream_active = False return False def cleanup_stream(): """Stop FFmpeg and purge any cached HLS segments.""" global ffmpeg_process, stream_active if ffmpeg_process: try: ffmpeg_process.terminate() ffmpeg_process.wait(timeout=5) except Exception as exc: # pragma: no cover - best effort logging logger.error(f"Error stopping FFmpeg: {exc}") try: ffmpeg_process.kill() except Exception: # pragma: no cover pass finally: ffmpeg_process = None stream_active = False try: reset_stream_path() except Exception as exc: # pragma: no cover - best effort logging logger.error(f"Error resetting stream directory: {exc}") # Routes @app.route('/rtmp_callbacks/on_publish', methods=['POST']) def on_publish(): """Callback when a stream starts""" global stream_active stream_key = request.form.get('name') if not stream_key or stream_key != INGEST_PSK: logger.warning("Unauthorized stream key attempted to publish") return "Unauthorized", 403 if stream_active: logger.info("Stream already active, recycling existing session") cleanup_stream() if not start_ffmpeg_process(): return "Failed to start stream", 500 logger.info( "Stream active; playlist available at https://%s%s/stream.m3u8", SERVER_DOMAIN, HLS_ROUTE_PREFIX ) return "OK" @app.route('/rtmp_callbacks/on_publish_done', methods=['POST']) def on_publish_done(): """Callback when a stream ends""" global stream_active stream_key = request.form.get('name') if not stream_key or stream_key != INGEST_PSK: return "Bad request", 400 if stream_active: cleanup_stream() logger.info("Stream publishing ended") return "OK" @app.route('/health') def health_check(): """Health check endpoint""" return { "status": "healthy", "streaming": ffmpeg_process is not None } def print_instructions(): """Print usage instructions""" obs_url = f"rtmps://{SERVER_DOMAIN}:1935/live" hls_url = f"https://{SERVER_DOMAIN}{HLS_ROUTE_PREFIX}/stream.m3u8" print("\n" + "="*80) print(f"{'OBS TO VRCHAT STREAMING PROXY':^80}") print("="*80) print("\n[URLS]") print(f" OBS ingest: {obs_url}") print(f" HLS: {hls_url}") print("\n[STATUS]") print(f" Stream is {'ACTIVE' if ffmpeg_process else 'INACTIVE'}") print(f" Session ID: {STREAM_HEX}") print(f" On-disk path: {STREAM_PATH}") print("="*80 + "\n") # Register cleanup once the module is imported so any WSGI server benefits. atexit.register(cleanup_stream) def main(): """Entry point for running with a production WSGI server.""" try: from waitress import serve except ImportError as exc: # pragma: no cover - defensive guardrail raise RuntimeError( "Waitress is required to run this service. Install it with 'pip install waitress'." ) from exc print_instructions() host = os.environ.get('HOST', '0.0.0.0') port = int(os.environ.get('PORT', 5000)) logger.info("Starting Waitress on %s:%s", host, port) serve(app, host=host, port=port) if __name__ == '__main__': main()