summaryrefslogtreecommitdiffstats
path: root/opt/obsproxy/server.py
blob: df6edb3fa5888c97c6b54347fccbdedef315c0f2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
#!/usr/bin/env python3
from dotenv import load_dotenv
import os
import secrets
import subprocess
import threading
import shutil
import time
from pathlib import Path
from flask import Flask, request
import logging
import atexit

# Setup Flask and load environment variables
app = Flask(__name__)
application = app  # WSGI servers like gunicorn look for 'application'
load_dotenv()

# Configuration
INGEST_PSK = os.environ.get('OBS_STREAM_KEY') or os.environ.get('STREAM_PSK')
INGEST_RTMP_HOST = os.environ.get('INGEST_RTMP_HOST', '127.0.0.1')
try:
    INGEST_RTMP_PORT = int(os.environ.get('INGEST_RTMP_PORT', '1936'))
except ValueError as exc:
    raise ValueError('INGEST_RTMP_PORT must be an integer') from exc
INGEST_THREAD_QUEUE_SIZE = int(os.environ.get('INGEST_THREAD_QUEUE_SIZE', '4096'))
HLS_SEGMENT_TIME = float(os.environ.get('HLS_SEGMENT_TIME', '2'))
HLS_PLAYLIST_SIZE = int(os.environ.get('HLS_PLAYLIST_SIZE', '3'))
HLS_DELETE_THRESHOLD = int(os.environ.get('HLS_DELETE_THRESHOLD', '20'))
HLS_PLAYLIST_TIMEOUT = float(os.environ.get('HLS_PLAYLIST_TIMEOUT', '5'))
HLS_PLAYLIST_POLL_INTERVAL = float(os.environ.get('HLS_PLAYLIST_POLL_INTERVAL', '0.1'))
BASE_DIR = Path(os.environ.get('STREAM_DIR', '/var/www/streams'))
SERVER_DOMAIN = os.environ.get('SERVER_DOMAIN', 'yummers.b-cdn.net')
STREAM_HEX = secrets.token_hex(16)
STREAM_PATH = BASE_DIR / 'live' / STREAM_HEX
HLS_ROUTE_PREFIX = f"/hls/{STREAM_HEX}"
# Media output settings tuned for VRChat playback
AUDIO_BITRATE = '256k'
AUDIO_CHANNELS = 2
AUDIO_SAMPLE_RATE = 48000

# Setup logging
logging.basicConfig(
    level=getattr(logging, os.environ.get('LOG_LEVEL', 'INFO')),
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('obs_proxy')

# Global state
ffmpeg_process = None
ffmpeg_worker_thread = None
ffmpeg_stop_event = threading.Event()


def _worker_running() -> bool:
    """Return True if the ffmpeg worker thread is currently active."""
    return ffmpeg_worker_thread is not None and ffmpeg_worker_thread.is_alive()

# Validate configuration
if not INGEST_PSK:
    logger.error("OBS_STREAM_KEY/STREAM_PSK is not set")
    exit(1)

# Create required directories
BASE_DIR.mkdir(parents=True, exist_ok=True)
STREAM_PATH.mkdir(parents=True, exist_ok=True)

def reset_stream_path():
    """Ensure the live stream directory is empty and ready."""
    shutil.rmtree(STREAM_PATH, ignore_errors=True)
    STREAM_PATH.mkdir(parents=True, exist_ok=True)


def _safe_reset_stream_path(context: str) -> bool:
    """Reset the stream directory and log any failure."""
    try:
        reset_stream_path()
        return True
    except Exception as exc:  # pragma: no cover - best effort logging
        logger.error("Error resetting stream directory %s: %s", context, exc)
        return False


def _verify_stream_path_writable() -> None:
    """Ensure the stream directory is writable before launching FFmpeg."""
    test_file = STREAM_PATH / "write_test.txt"
    try:
        with open(test_file, "w", encoding="utf-8") as probe:
            probe.write("ok")
    except Exception as exc:  # pragma: no cover - best effort logging
        logger.error("Could not write to stream directory: %s", exc)
    finally:
        if test_file.exists():
            test_file.unlink()


def _wait_for_playlist(process: subprocess.Popen[str], attempt: int):
    """Block until the playlist appears or we hit the configured timeout."""
    playlist_path = STREAM_PATH / 'stream.m3u8'
    start = time.monotonic()

    while True:
        if playlist_path.exists():
            elapsed = time.monotonic() - start
            segments = sorted(seg.name for seg in playlist_path.parent.glob('segment-*.ts'))
            logger.info(
                "HLS playlist materialized after %.2fs at %s; segments=%s",
                elapsed,
                playlist_path,
                segments,
            )
            return True, 'ready'

        if ffmpeg_stop_event.is_set():
            return False, 'stop_requested'

        if process.poll() is not None:
            return False, 'ffmpeg_exited'

        elapsed = time.monotonic() - start
        if elapsed >= HLS_PLAYLIST_TIMEOUT:
            logger.warning(
                "HLS playlist still missing after %.2fs on attempt %s; recycling FFmpeg",
                elapsed,
                attempt,
            )
            return False, 'timeout'

        if ffmpeg_stop_event.wait(HLS_PLAYLIST_POLL_INTERVAL):
            return False, 'stop_requested'


def _start_pipe_logger(pipe, level):
    """Drain an ffmpeg pipe on a background thread to avoid deadlocks."""

    def pipe_logger():
        with pipe:
            for line in iter(pipe.readline, ''):
                line = line.strip()
                if line:
                    logger.log(level, "ffmpeg: %s", line)

    threading.Thread(target=pipe_logger, daemon=True).start()


def _terminate_ffmpeg_process(process: subprocess.Popen[str], *, timeout: float = 5.0, log_errors: bool = True) -> None:
    """Terminate an ffmpeg process, falling back to kill if needed."""
    try:
        process.terminate()
        process.wait(timeout=timeout)
    except Exception as exc:  # pragma: no cover - best effort logging
        if log_errors:
            logger.error(f"Error stopping FFmpeg: {exc}")
        try:
            process.kill()
        except Exception:  # pragma: no cover
            pass


def _build_ffmpeg_command() -> list[str]:
    """Construct the ffmpeg command line we execute for each attempt."""

    return [
        'ffmpeg',
        '-nostdin',
        '-hide_banner',
        '-loglevel', os.environ.get('FFMPEG_LOGLEVEL', 'warning'),
        '-fflags', '+genpts',
        '-thread_queue_size', str(INGEST_THREAD_QUEUE_SIZE),
        '-i', f'rtmp://{INGEST_RTMP_HOST}:{INGEST_RTMP_PORT}/live/{INGEST_PSK}',
        '-map', '0:v:0?',
        '-map', '0:a:0?',
        '-c:v', 'copy',
        '-c:a', 'aac',
        '-b:a', AUDIO_BITRATE,
        '-ac', str(AUDIO_CHANNELS),
        '-ar', str(AUDIO_SAMPLE_RATE),
        '-f', 'hls',
        '-hls_time', str(HLS_SEGMENT_TIME),
        '-hls_list_size', str(HLS_PLAYLIST_SIZE),
        '-hls_flags', 'delete_segments+independent_segments',
        '-hls_delete_threshold', str(HLS_DELETE_THRESHOLD),
        '-hls_segment_filename', str(STREAM_PATH / 'segment-%05d.ts'),
        str(STREAM_PATH / 'stream.m3u8'),
    ]


def _run_ffmpeg_once(attempt: int) -> bool:
    """Start ffmpeg, wait for it to exit, and report whether it ran cleanly."""
    global ffmpeg_process

    logger.info("Starting FFmpeg for live stream (attempt %s)", attempt)

    try:
        process = subprocess.Popen(
            _build_ffmpeg_command(),
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
            bufsize=1,
        )
    except Exception as exc:  # pragma: no cover - best effort logging
        logger.error("Failed to start FFmpeg: %s", exc)
        return False

    ffmpeg_process = process
    start_time = time.monotonic()

    logger.info("FFmpeg process started with PID %s", process.pid)
    logger.info('Stream active; waiting for playlist to appear')

    _start_pipe_logger(process.stderr, logging.WARNING)
    _start_pipe_logger(process.stdout, logging.DEBUG)

    playlist_ready, playlist_reason = _wait_for_playlist(process, attempt)

    if not playlist_ready and not ffmpeg_stop_event.is_set():
        if playlist_reason == 'timeout' and process.poll() is None:
            logger.info("Terminating FFmpeg attempt %s after playlist timeout", attempt)
            _terminate_ffmpeg_process(process, log_errors=False)
        elif playlist_reason == 'ffmpeg_exited':
            logger.debug("FFmpeg exited before playlist became available on attempt %s", attempt)

    exit_code = process.wait()
    elapsed = time.monotonic() - start_time

    ffmpeg_process = None

    if ffmpeg_stop_event.is_set():
        logger.info(
            "FFmpeg stop requested; process exited with code %s after %.2fs",
            exit_code,
            elapsed,
        )
        return True

    if not playlist_ready:
        logger.error(
            "FFmpeg attempt %s ended (exit %s) before playlist became available (reason=%s)",
            attempt,
            exit_code,
            playlist_reason,
        )
        return False

    if exit_code != 0:
        logger.error("FFmpeg exited with code %s after %.2fs", exit_code, elapsed)
        return False

    logger.info("FFmpeg process completed successfully after %.2fs", elapsed)
    return True


def _ffmpeg_worker_loop() -> None:
    """Keep spawning FFmpeg until it runs cleanly or a stop is requested."""
    attempt = 0

    while not ffmpeg_stop_event.is_set():
        attempt += 1
        if _run_ffmpeg_once(attempt):
            break

        if ffmpeg_stop_event.is_set():
            break

        _safe_reset_stream_path("between FFmpeg attempts")

    logger.debug("FFmpeg worker exiting")


def start_ffmpeg_process():
    """Start FFmpeg to convert RTMP ingest into HLS."""
    global ffmpeg_worker_thread

    if _worker_running():
        logger.warning("FFmpeg worker already running; skipping duplicate start")
        return True

    if _safe_reset_stream_path("before FFmpeg start"):
        logger.info(f"Stream directory ready at {STREAM_PATH}")
    _verify_stream_path_writable()

    ffmpeg_stop_event.clear()

    try:
        ffmpeg_worker_thread = threading.Thread(
            target=_ffmpeg_worker_loop,
            daemon=True,
            name="ffmpeg-worker",
        )
        ffmpeg_worker_thread.start()
        return True
    except Exception as exc:  # pragma: no cover - defensive logging
        logger.error(f"Failed to start FFmpeg worker thread: {exc}")
        ffmpeg_worker_thread = None
        return False


def cleanup_stream():
    """Stop FFmpeg and purge any cached HLS segments."""
    global ffmpeg_process, ffmpeg_worker_thread

    ffmpeg_stop_event.set()

    if ffmpeg_process:
        _terminate_ffmpeg_process(ffmpeg_process)
        ffmpeg_process = None

    worker = ffmpeg_worker_thread
    if worker and worker.is_alive():
        worker.join(timeout=5)
        if worker.is_alive():  # pragma: no cover - diagnostic
            logger.warning("FFmpeg worker thread did not exit cleanly")
    ffmpeg_worker_thread = None

    _safe_reset_stream_path("during cleanup")

    ffmpeg_stop_event.clear()


# Routes
@app.route('/rtmp_callbacks/on_publish', methods=['POST'])
def on_publish():
    """Callback when a stream starts"""
    stream_key = request.form.get('name')
    logger.info("on_publish received for key=%s", stream_key)

    if not stream_key or stream_key != INGEST_PSK:
        logger.warning("Unauthorized stream key attempted to publish: %s", stream_key)
        return "Unauthorized", 403

    if ffmpeg_process or _worker_running():
        logger.info("Stream already active, recycling existing session")
        cleanup_stream()

    if not start_ffmpeg_process():
        return "Failed to start stream", 500

    playlist_path = STREAM_PATH / 'stream.m3u8'
    if playlist_path.exists():
        logger.info("Existing playlist found at %s", playlist_path)

    return "OK"


@app.route('/rtmp_callbacks/on_publish_done', methods=['POST'])
def on_publish_done():
    """Callback when a stream ends"""
    stream_key = request.form.get('name')
    logger.info("on_publish_done received for key=%s", stream_key)

    if not stream_key or stream_key != INGEST_PSK:
        logger.warning("on_publish_done received for unknown key: %s", stream_key)
        return "Bad request", 400

    if ffmpeg_process or _worker_running():
        cleanup_stream()

    logger.info("Stream publishing ended")
    return "OK"


@app.route('/health')
def health_check():
    """Health check endpoint"""
    return {
        "status": "healthy",
        "streaming": ffmpeg_process is not None
    }


def print_instructions():
    """Print usage instructions"""
    obs_url = f"rtmps://{SERVER_DOMAIN}:1935/live"
    hls_url = f"https://{SERVER_DOMAIN}{HLS_ROUTE_PREFIX}/stream.m3u8"

    print("\n" + "="*80)
    print(f"{'OBS TO VRCHAT STREAMING PROXY':^80}")
    print("="*80)

    print("\n[URLS]")
    print(f"  OBS ingest: {obs_url}")
    print(f"  HLS:        {hls_url}")

    print("\n[STATUS]")
    print(f"  Stream is {'ACTIVE' if ffmpeg_process else 'INACTIVE'}")
    print(f"  Session ID: {STREAM_HEX}")
    print(f"  On-disk path: {STREAM_PATH}")
    print("="*80 + "\n")

# Register cleanup once the module is imported so any WSGI server benefits.
atexit.register(cleanup_stream)


def main():
    """Entry point for running with a production WSGI server."""
    try:
        from waitress import serve
    except ImportError as exc:  # pragma: no cover - defensive guardrail
        raise RuntimeError(
            "Waitress is required to run this service. Install it with 'pip install waitress'."
        ) from exc

    print_instructions()
    host = os.environ.get('HOST', '0.0.0.0')
    port = int(os.environ.get('PORT', 5000))
    logger.info("Starting Waitress on %s:%s", host, port)
    serve(app, host=host, port=port)


if __name__ == '__main__':
    main()