summaryrefslogtreecommitdiffstats
path: root/opt/obsproxy
diff options
context:
space:
mode:
Diffstat (limited to 'opt/obsproxy')
-rwxr-xr-xopt/obsproxy/server.py253
1 files changed, 253 insertions, 0 deletions
diff --git a/opt/obsproxy/server.py b/opt/obsproxy/server.py
new file mode 100755
index 0000000..8191d30
--- /dev/null
+++ b/opt/obsproxy/server.py
@@ -0,0 +1,253 @@
+#!/usr/bin/env python3
+from dotenv import load_dotenv
+import os
+import secrets
+import subprocess
+import threading
+import shutil
+from pathlib import Path
+from flask import Flask, request, send_from_directory
+import logging
+import atexit
+
+# Setup Flask and load environment variables
+app = Flask(__name__)
+load_dotenv()
+
+# Configuration
+INGEST_PSK = os.environ.get('OBS_STREAM_KEY')
+DASH_SEGMENT_TIME = float(os.environ.get('DASH_SEGMENT_TIME', '1'))
+DASH_FRAGMENT_TIME = float(os.environ.get('DASH_FRAGMENT_TIME', '0.5'))
+BASE_DIR = Path(os.environ.get('STREAM_DIR', '/var/www/streams'))
+SERVER_DOMAIN = os.environ.get('SERVER_DOMAIN', 'yummers.dev')
+STREAM_HEX = secrets.token_hex(16)
+STREAM_PATH = BASE_DIR / 'live' / STREAM_HEX
+
+# Setup logging
+logging.basicConfig(
+ level=getattr(logging, os.environ.get('LOG_LEVEL', 'INFO')),
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
+)
+logger = logging.getLogger('obs_proxy')
+
+# Global state
+stream_active = False
+ffmpeg_process = None
+
+# Validate configuration
+if not INGEST_PSK:
+ logger.error("STREAM_PSK is not set")
+ exit(1)
+
+# Create required directories
+BASE_DIR.mkdir(parents=True, exist_ok=True)
+STREAM_PATH.mkdir(parents=True, exist_ok=True)
+
+def reset_stream_path():
+ """Ensure the live stream directory is empty and ready."""
+ shutil.rmtree(STREAM_PATH, ignore_errors=True)
+ STREAM_PATH.mkdir(parents=True, exist_ok=True)
+
+
+def start_ffmpeg_process():
+ """Start FFmpeg process to convert RTMP ingest into low-latency DASH."""
+ global ffmpeg_process, stream_active
+
+ reset_stream_path()
+ logger.info(f"Stream directory ready at {STREAM_PATH}")
+
+ # Sanity check that we can write to the path before spawning ffmpeg
+ try:
+ test_file = STREAM_PATH / "write_test.txt"
+ with open(test_file, "w", encoding="utf-8") as probe:
+ probe.write("ok")
+ if test_file.exists():
+ test_file.unlink()
+ except Exception as exc: # pragma: no cover - best effort logging
+ logger.error(f"Could not write to stream directory: {exc}")
+
+ command = [
+ 'ffmpeg',
+ '-nostdin',
+ '-loglevel', 'warning',
+ '-i', f'rtmp://localhost/live/{INGEST_PSK}',
+ '-map', '0:v:0?',
+ '-map', '0:a:0?',
+ '-c:v', 'copy',
+ '-c:a', 'aac', '-b:a', '192k',
+ '-f', 'dash',
+ '-seg_duration', str(DASH_SEGMENT_TIME),
+ '-frag_duration', str(DASH_FRAGMENT_TIME),
+ '-window_size', '6',
+ '-extra_window_size', '10',
+ '-remove_at_exit', '1',
+ '-streaming', '1',
+ '-ldash', '1',
+ '-use_template', '1',
+ '-use_timeline', '1',
+ '-init_seg_name', 'init-stream$RepresentationID$.m4s',
+ '-media_seg_name', 'chunk-stream$RepresentationID$-$Number%05d$.m4s',
+ str(STREAM_PATH / 'manifest.mpd')
+ ]
+
+ logger.info("Starting FFmpeg for live stream")
+
+ try:
+ process = subprocess.Popen(
+ command,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ text=True,
+ bufsize=1
+ )
+
+ ffmpeg_process = process
+ stream_active = True
+
+ def monitor_process():
+ """Collect ffmpeg result and reset state when it exits."""
+ global ffmpeg_process, stream_active
+
+ logger.info(f"FFmpeg process started with PID {process.pid}")
+
+ exit_code = process.wait()
+ stderr = process.stderr.read()
+ if stderr:
+ logger.error(f"FFmpeg error output: {stderr}")
+
+ if exit_code != 0:
+ logger.error(f"FFmpeg exited with code {exit_code}")
+ else:
+ logger.info("FFmpeg process completed successfully")
+
+ ffmpeg_process = None
+ stream_active = False
+
+ threading.Thread(target=monitor_process, daemon=True).start()
+ return True
+
+ except Exception as exc:
+ logger.error(f"Failed to start FFmpeg: {exc}")
+ ffmpeg_process = None
+ stream_active = False
+ return False
+
+
+def cleanup_stream():
+ """Stop FFmpeg and purge any cached DASH fragments."""
+ global ffmpeg_process, stream_active
+
+ if ffmpeg_process:
+ try:
+ ffmpeg_process.terminate()
+ ffmpeg_process.wait(timeout=5)
+ except Exception as exc: # pragma: no cover - best effort logging
+ logger.error(f"Error stopping FFmpeg: {exc}")
+ try:
+ ffmpeg_process.kill()
+ except Exception: # pragma: no cover
+ pass
+ finally:
+ ffmpeg_process = None
+
+ stream_active = False
+ try:
+ reset_stream_path()
+ except Exception as exc: # pragma: no cover - best effort logging
+ logger.error(f"Error resetting stream directory: {exc}")
+
+
+# Routes
+@app.route('/dash/manifest.mpd')
+def serve_dash_manifest():
+ """Serve the MPEG-DASH manifest"""
+ if not stream_active:
+ return "Stream not found", 404
+
+ return send_from_directory(str(STREAM_PATH), "manifest.mpd")
+
+
+@app.route('/dash/<path:filename>')
+def serve_dash_segments(filename):
+ """Serve the MPEG-DASH segment files"""
+ if not stream_active:
+ return "Stream not found", 404
+
+ return send_from_directory(str(STREAM_PATH), filename)
+
+
+@app.route('/rtmp_callbacks/on_publish', methods=['POST'])
+def on_publish():
+ """Callback when a stream starts"""
+ global stream_active
+
+ stream_key = request.form.get('name')
+
+ if not stream_key or stream_key != INGEST_PSK:
+ logger.warning("Unauthorized stream key attempted to publish")
+ return "Unauthorized", 403
+
+ if stream_active:
+ logger.info("Stream already active, recycling existing session")
+ cleanup_stream()
+
+ if not start_ffmpeg_process():
+ return "Failed to start stream", 500
+
+ logger.info("Stream started successfully")
+ logger.info(f"Access stream at https://{SERVER_DOMAIN}/dash/manifest.mpd")
+
+ return "OK"
+
+
+@app.route('/rtmp_callbacks/on_publish_done', methods=['POST'])
+def on_publish_done():
+ """Callback when a stream ends"""
+ global stream_active
+
+ stream_key = request.form.get('name')
+
+ if not stream_key or stream_key != INGEST_PSK:
+ return "Bad request", 400
+
+ if stream_active:
+ cleanup_stream()
+
+ logger.info("Stream publishing ended")
+ return "OK"
+
+
+@app.route('/health')
+def health_check():
+ """Health check endpoint"""
+ return {
+ "status": "healthy",
+ "streaming": ffmpeg_process is not None
+ }
+
+
+def print_instructions():
+ """Print usage instructions"""
+ obs_url = f"rtmp://{SERVER_DOMAIN}/live"
+ vrc_url = f"rtmp://{SERVER_DOMAIN}/live/{STREAM_HEX}"
+
+ print("\n" + "="*80)
+ print(f"{'OBS TO VRCHAT STREAMING PROXY':^80}")
+ print("="*80)
+
+ print("\n[URLS]")
+ print(f" OBS: {obs_url}")
+ print(f" VRChat: {vrc_url}")
+
+ print("\n[STATUS]")
+ print(f" Stream is {'ACTIVE' if ffmpeg_process else 'INACTIVE'}")
+ print(f" Session ID: {STREAM_HEX}")
+ print(f" On-disk path: {STREAM_PATH}")
+ print("="*80 + "\n")
+
+
+# Register cleanup and start server
+if __name__ == '__main__':
+ atexit.register(cleanup_stream)
+ print_instructions()
+ app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 5000)))