diff options
| author | yum <yum.food.vr@gmail.com> | 2025-10-13 18:38:58 -0700 |
|---|---|---|
| committer | yum <yum.food.vr@gmail.com> | 2025-10-28 17:19:35 -0700 |
| commit | 8aca05a7e644f3d4aff6bcf636514882dd2ae934 (patch) | |
| tree | f2b6ad908083affb13dca32f803180e60d67638f /opt/obsproxy | |
| parent | 906f53826285a713512f199b1c99fd68bc1dbc52 (diff) | |
meow
Diffstat (limited to 'opt/obsproxy')
| -rwxr-xr-x | opt/obsproxy/server.py | 253 |
1 files changed, 253 insertions, 0 deletions
diff --git a/opt/obsproxy/server.py b/opt/obsproxy/server.py new file mode 100755 index 0000000..8191d30 --- /dev/null +++ b/opt/obsproxy/server.py @@ -0,0 +1,253 @@ +#!/usr/bin/env python3 +from dotenv import load_dotenv +import os +import secrets +import subprocess +import threading +import shutil +from pathlib import Path +from flask import Flask, request, send_from_directory +import logging +import atexit + +# Setup Flask and load environment variables +app = Flask(__name__) +load_dotenv() + +# Configuration +INGEST_PSK = os.environ.get('OBS_STREAM_KEY') +DASH_SEGMENT_TIME = float(os.environ.get('DASH_SEGMENT_TIME', '1')) +DASH_FRAGMENT_TIME = float(os.environ.get('DASH_FRAGMENT_TIME', '0.5')) +BASE_DIR = Path(os.environ.get('STREAM_DIR', '/var/www/streams')) +SERVER_DOMAIN = os.environ.get('SERVER_DOMAIN', 'yummers.dev') +STREAM_HEX = secrets.token_hex(16) +STREAM_PATH = BASE_DIR / 'live' / STREAM_HEX + +# Setup logging +logging.basicConfig( + level=getattr(logging, os.environ.get('LOG_LEVEL', 'INFO')), + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger('obs_proxy') + +# Global state +stream_active = False +ffmpeg_process = None + +# Validate configuration +if not INGEST_PSK: + logger.error("STREAM_PSK is not set") + exit(1) + +# Create required directories +BASE_DIR.mkdir(parents=True, exist_ok=True) +STREAM_PATH.mkdir(parents=True, exist_ok=True) + +def reset_stream_path(): + """Ensure the live stream directory is empty and ready.""" + shutil.rmtree(STREAM_PATH, ignore_errors=True) + STREAM_PATH.mkdir(parents=True, exist_ok=True) + + +def start_ffmpeg_process(): + """Start FFmpeg process to convert RTMP ingest into low-latency DASH.""" + global ffmpeg_process, stream_active + + reset_stream_path() + logger.info(f"Stream directory ready at {STREAM_PATH}") + + # Sanity check that we can write to the path before spawning ffmpeg + try: + test_file = STREAM_PATH / "write_test.txt" + with open(test_file, "w", encoding="utf-8") as probe: + probe.write("ok") + if test_file.exists(): + test_file.unlink() + except Exception as exc: # pragma: no cover - best effort logging + logger.error(f"Could not write to stream directory: {exc}") + + command = [ + 'ffmpeg', + '-nostdin', + '-loglevel', 'warning', + '-i', f'rtmp://localhost/live/{INGEST_PSK}', + '-map', '0:v:0?', + '-map', '0:a:0?', + '-c:v', 'copy', + '-c:a', 'aac', '-b:a', '192k', + '-f', 'dash', + '-seg_duration', str(DASH_SEGMENT_TIME), + '-frag_duration', str(DASH_FRAGMENT_TIME), + '-window_size', '6', + '-extra_window_size', '10', + '-remove_at_exit', '1', + '-streaming', '1', + '-ldash', '1', + '-use_template', '1', + '-use_timeline', '1', + '-init_seg_name', 'init-stream$RepresentationID$.m4s', + '-media_seg_name', 'chunk-stream$RepresentationID$-$Number%05d$.m4s', + str(STREAM_PATH / 'manifest.mpd') + ] + + logger.info("Starting FFmpeg for live stream") + + try: + process = subprocess.Popen( + command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1 + ) + + ffmpeg_process = process + stream_active = True + + def monitor_process(): + """Collect ffmpeg result and reset state when it exits.""" + global ffmpeg_process, stream_active + + logger.info(f"FFmpeg process started with PID {process.pid}") + + exit_code = process.wait() + stderr = process.stderr.read() + if stderr: + logger.error(f"FFmpeg error output: {stderr}") + + if exit_code != 0: + logger.error(f"FFmpeg exited with code {exit_code}") + else: + logger.info("FFmpeg process completed successfully") + + ffmpeg_process = None + stream_active = False + + threading.Thread(target=monitor_process, daemon=True).start() + return True + + except Exception as exc: + logger.error(f"Failed to start FFmpeg: {exc}") + ffmpeg_process = None + stream_active = False + return False + + +def cleanup_stream(): + """Stop FFmpeg and purge any cached DASH fragments.""" + global ffmpeg_process, stream_active + + if ffmpeg_process: + try: + ffmpeg_process.terminate() + ffmpeg_process.wait(timeout=5) + except Exception as exc: # pragma: no cover - best effort logging + logger.error(f"Error stopping FFmpeg: {exc}") + try: + ffmpeg_process.kill() + except Exception: # pragma: no cover + pass + finally: + ffmpeg_process = None + + stream_active = False + try: + reset_stream_path() + except Exception as exc: # pragma: no cover - best effort logging + logger.error(f"Error resetting stream directory: {exc}") + + +# Routes +@app.route('/dash/manifest.mpd') +def serve_dash_manifest(): + """Serve the MPEG-DASH manifest""" + if not stream_active: + return "Stream not found", 404 + + return send_from_directory(str(STREAM_PATH), "manifest.mpd") + + +@app.route('/dash/<path:filename>') +def serve_dash_segments(filename): + """Serve the MPEG-DASH segment files""" + if not stream_active: + return "Stream not found", 404 + + return send_from_directory(str(STREAM_PATH), filename) + + +@app.route('/rtmp_callbacks/on_publish', methods=['POST']) +def on_publish(): + """Callback when a stream starts""" + global stream_active + + stream_key = request.form.get('name') + + if not stream_key or stream_key != INGEST_PSK: + logger.warning("Unauthorized stream key attempted to publish") + return "Unauthorized", 403 + + if stream_active: + logger.info("Stream already active, recycling existing session") + cleanup_stream() + + if not start_ffmpeg_process(): + return "Failed to start stream", 500 + + logger.info("Stream started successfully") + logger.info(f"Access stream at https://{SERVER_DOMAIN}/dash/manifest.mpd") + + return "OK" + + +@app.route('/rtmp_callbacks/on_publish_done', methods=['POST']) +def on_publish_done(): + """Callback when a stream ends""" + global stream_active + + stream_key = request.form.get('name') + + if not stream_key or stream_key != INGEST_PSK: + return "Bad request", 400 + + if stream_active: + cleanup_stream() + + logger.info("Stream publishing ended") + return "OK" + + +@app.route('/health') +def health_check(): + """Health check endpoint""" + return { + "status": "healthy", + "streaming": ffmpeg_process is not None + } + + +def print_instructions(): + """Print usage instructions""" + obs_url = f"rtmp://{SERVER_DOMAIN}/live" + vrc_url = f"rtmp://{SERVER_DOMAIN}/live/{STREAM_HEX}" + + print("\n" + "="*80) + print(f"{'OBS TO VRCHAT STREAMING PROXY':^80}") + print("="*80) + + print("\n[URLS]") + print(f" OBS: {obs_url}") + print(f" VRChat: {vrc_url}") + + print("\n[STATUS]") + print(f" Stream is {'ACTIVE' if ffmpeg_process else 'INACTIVE'}") + print(f" Session ID: {STREAM_HEX}") + print(f" On-disk path: {STREAM_PATH}") + print("="*80 + "\n") + + +# Register cleanup and start server +if __name__ == '__main__': + atexit.register(cleanup_stream) + print_instructions() + app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 5000))) |
