#!/usr/bin/env python3 from dotenv import load_dotenv import os import secrets import subprocess import threading import shutil from pathlib import Path from flask import Flask, request, send_from_directory import logging import atexit # Setup Flask and load environment variables app = Flask(__name__) load_dotenv() # Configuration INGEST_PSK = os.environ.get('OBS_STREAM_KEY') DASH_SEGMENT_TIME = float(os.environ.get('DASH_SEGMENT_TIME', '1')) DASH_FRAGMENT_TIME = float(os.environ.get('DASH_FRAGMENT_TIME', '0.5')) BASE_DIR = Path(os.environ.get('STREAM_DIR', '/var/www/streams')) SERVER_DOMAIN = os.environ.get('SERVER_DOMAIN', 'yummers.dev') STREAM_HEX = secrets.token_hex(16) STREAM_PATH = BASE_DIR / 'live' / STREAM_HEX DASH_ROUTE_PREFIX = f"/dash/{STREAM_HEX}" # Setup logging logging.basicConfig( level=getattr(logging, os.environ.get('LOG_LEVEL', 'INFO')), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('obs_proxy') # Global state stream_active = False ffmpeg_process = None # Validate configuration if not INGEST_PSK: logger.error("STREAM_PSK is not set") exit(1) # Create required directories BASE_DIR.mkdir(parents=True, exist_ok=True) STREAM_PATH.mkdir(parents=True, exist_ok=True) def reset_stream_path(): """Ensure the live stream directory is empty and ready.""" shutil.rmtree(STREAM_PATH, ignore_errors=True) STREAM_PATH.mkdir(parents=True, exist_ok=True) def start_ffmpeg_process(): """Start FFmpeg process to convert RTMP ingest into low-latency DASH.""" global ffmpeg_process, stream_active reset_stream_path() logger.info(f"Stream directory ready at {STREAM_PATH}") # Sanity check that we can write to the path before spawning ffmpeg try: test_file = STREAM_PATH / "write_test.txt" with open(test_file, "w", encoding="utf-8") as probe: probe.write("ok") if test_file.exists(): test_file.unlink() except Exception as exc: # pragma: no cover - best effort logging logger.error(f"Could not write to stream directory: {exc}") command = [ 'ffmpeg', '-nostdin', '-loglevel', 'warning', '-i', f'rtmp://localhost/live/{INGEST_PSK}', '-map', '0:v:0?', '-map', '0:a:0?', '-c:v', 'copy', '-c:a', 'copy', '-f', 'dash', '-seg_duration', str(DASH_SEGMENT_TIME), '-frag_duration', str(DASH_FRAGMENT_TIME), '-window_size', '6', '-extra_window_size', '10', '-remove_at_exit', '1', '-streaming', '1', '-ldash', '1', '-use_template', '1', '-use_timeline', '1', '-init_seg_name', 'init-stream$RepresentationID$.m4s', '-media_seg_name', 'chunk-stream$RepresentationID$-$Number%05d$.m4s', str(STREAM_PATH / 'manifest.mpd') ] logger.info("Starting FFmpeg for live stream") try: process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1 ) ffmpeg_process = process stream_active = True def monitor_process(): """Collect ffmpeg result and reset state when it exits.""" global ffmpeg_process, stream_active logger.info(f"FFmpeg process started with PID {process.pid}") exit_code = process.wait() stderr = process.stderr.read() if stderr: logger.error(f"FFmpeg error output: {stderr}") if exit_code != 0: logger.error(f"FFmpeg exited with code {exit_code}") else: logger.info("FFmpeg process completed successfully") ffmpeg_process = None stream_active = False threading.Thread(target=monitor_process, daemon=True).start() return True except Exception as exc: logger.error(f"Failed to start FFmpeg: {exc}") ffmpeg_process = None stream_active = False return False def cleanup_stream(): """Stop FFmpeg and purge any cached DASH fragments.""" global ffmpeg_process, stream_active if ffmpeg_process: try: ffmpeg_process.terminate() ffmpeg_process.wait(timeout=5) except Exception as exc: # pragma: no cover - best effort logging logger.error(f"Error stopping FFmpeg: {exc}") try: ffmpeg_process.kill() except Exception: # pragma: no cover pass finally: ffmpeg_process = None stream_active = False try: reset_stream_path() except Exception as exc: # pragma: no cover - best effort logging logger.error(f"Error resetting stream directory: {exc}") # Routes @app.route(f"{DASH_ROUTE_PREFIX}/manifest.mpd") def serve_dash_manifest(): """Serve the MPEG-DASH manifest""" if not stream_active: return "Stream not found", 404 manifest_file = STREAM_PATH / "manifest.mpd" if not manifest_file.exists(): return "Manifest not ready", 503 return send_from_directory(str(STREAM_PATH), "manifest.mpd") @app.route(f"{DASH_ROUTE_PREFIX}/") def serve_dash_segments(filename): """Serve the MPEG-DASH segment files""" if not stream_active: return "Stream not found", 404 return send_from_directory(str(STREAM_PATH), filename) @app.route('/rtmp_callbacks/on_publish', methods=['POST']) def on_publish(): """Callback when a stream starts""" global stream_active stream_key = request.form.get('name') if not stream_key or stream_key != INGEST_PSK: logger.warning("Unauthorized stream key attempted to publish") return "Unauthorized", 403 if stream_active: logger.info("Stream already active, recycling existing session") cleanup_stream() if not start_ffmpeg_process(): return "Failed to start stream", 500 logger.info( "Stream active; manifest available at https://%s%s/manifest.mpd", SERVER_DOMAIN, DASH_ROUTE_PREFIX ) return "OK" @app.route('/rtmp_callbacks/on_publish_done', methods=['POST']) def on_publish_done(): """Callback when a stream ends""" global stream_active stream_key = request.form.get('name') if not stream_key or stream_key != INGEST_PSK: return "Bad request", 400 if stream_active: cleanup_stream() logger.info("Stream publishing ended") return "OK" @app.route('/health') def health_check(): """Health check endpoint""" return { "status": "healthy", "streaming": ffmpeg_process is not None } def print_instructions(): """Print usage instructions""" obs_url = f"rtmp://{SERVER_DOMAIN}/live" dash_url = f"https://{SERVER_DOMAIN}{DASH_ROUTE_PREFIX}/manifest.mpd" print("\n" + "="*80) print(f"{'OBS TO VRCHAT STREAMING PROXY':^80}") print("="*80) print("\n[URLS]") print(f" OBS ingest: {obs_url}") print(f" DASH: {dash_url}") print("\n[STATUS]") print(f" Stream is {'ACTIVE' if ffmpeg_process else 'INACTIVE'}") print(f" Session ID: {STREAM_HEX}") print(f" On-disk path: {STREAM_PATH}") print("="*80 + "\n") # Register cleanup and start server if __name__ == '__main__': atexit.register(cleanup_stream) print_instructions() app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 5000)))