1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
|
#!/usr/bin/env python3
from dotenv import load_dotenv
import os
import secrets
import subprocess
import threading
import shutil
import time
from pathlib import Path
from typing import Optional
from flask import Flask, request
import logging
import atexit
# Setup Flask and load environment variables
app = Flask(__name__)
application = app # WSGI servers like gunicorn look for 'application'
load_dotenv()
# Configuration
INGEST_PSK = os.environ.get('OBS_STREAM_KEY') or os.environ.get('STREAM_PSK')
INGEST_RTMP_HOST = os.environ.get('INGEST_RTMP_HOST', '127.0.0.1')
try:
INGEST_RTMP_PORT = int(os.environ.get('INGEST_RTMP_PORT', '1936'))
except ValueError as exc:
raise ValueError('INGEST_RTMP_PORT must be an integer') from exc
INGEST_THREAD_QUEUE_SIZE = int(os.environ.get('INGEST_THREAD_QUEUE_SIZE', '4096'))
HLS_SEGMENT_TIME = float(os.environ.get('HLS_SEGMENT_TIME', '2'))
HLS_PLAYLIST_SIZE = int(os.environ.get('HLS_PLAYLIST_SIZE', '3'))
HLS_DELETE_THRESHOLD = int(os.environ.get('HLS_DELETE_THRESHOLD', '20'))
HLS_PLAYLIST_TIMEOUT = float(os.environ.get('HLS_PLAYLIST_TIMEOUT', '5'))
HLS_PLAYLIST_POLL_INTERVAL = float(os.environ.get('HLS_PLAYLIST_POLL_INTERVAL', '0.1'))
BASE_DIR = Path(os.environ.get('STREAM_DIR', '/var/www/streams'))
SERVER_DOMAIN = os.environ.get('SERVER_DOMAIN', 'yummers.b-cdn.net')
STREAM_HEX = secrets.token_hex(16)
STREAM_PATH = BASE_DIR / 'live' / STREAM_HEX
HLS_ROUTE_PREFIX = f"/hls/{STREAM_HEX}"
SESSION_KEY_NAME = 'session.key'
SESSION_KEYINFO_NAME = 'session.keyinfo'
SESSION_KEY_URI: Optional[str] = None
# Media output settings tuned for VRChat playback
AUDIO_BITRATE = '256k'
AUDIO_CHANNELS = 2
AUDIO_SAMPLE_RATE = 48000
# Setup logging
logging.basicConfig(
level=getattr(logging, os.environ.get('LOG_LEVEL', 'INFO')),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('obs_proxy')
# Global state
ffmpeg_process = None
ffmpeg_worker_thread = None
ffmpeg_stop_event = threading.Event()
def _worker_running() -> bool:
"""Return True if the ffmpeg worker thread is currently active."""
return ffmpeg_worker_thread is not None and ffmpeg_worker_thread.is_alive()
# Validate configuration
if not INGEST_PSK:
logger.error("OBS_STREAM_KEY/STREAM_PSK is not set")
exit(1)
# Create required directories
BASE_DIR.mkdir(parents=True, exist_ok=True)
STREAM_PATH.mkdir(parents=True, exist_ok=True)
def _session_key_path() -> Path:
return STREAM_PATH / SESSION_KEY_NAME
def _session_keyinfo_path() -> Path:
return STREAM_PATH / SESSION_KEYINFO_NAME
def _write_key_material() -> None:
"""Generate and persist AES-128 key + keyinfo for the current session."""
global SESSION_KEY_URI
key_bytes = secrets.token_bytes(16)
iv_bytes = secrets.token_bytes(16)
key_path = _session_key_path()
key_path.write_bytes(key_bytes)
key_uri = f"https://{SERVER_DOMAIN}{HLS_ROUTE_PREFIX}/{SESSION_KEY_NAME}"
keyinfo_path = _session_keyinfo_path()
iv_hex = format(int.from_bytes(iv_bytes, 'big'), '032x')
keyinfo_path.write_text(
f"{key_uri}\n{key_path}\n{iv_hex}\n",
encoding="utf-8",
)
SESSION_KEY_URI = key_uri
def reset_stream_path():
"""Ensure the live stream directory is empty and ready."""
shutil.rmtree(STREAM_PATH, ignore_errors=True)
STREAM_PATH.mkdir(parents=True, exist_ok=True)
_write_key_material()
def _safe_reset_stream_path(context: str) -> bool:
"""Reset the stream directory and log any failure."""
try:
reset_stream_path()
return True
except Exception as exc: # pragma: no cover - best effort logging
logger.error("Error resetting stream directory %s: %s", context, exc)
return False
def _verify_stream_path_writable() -> None:
"""Ensure the stream directory is writable before launching FFmpeg."""
test_file = STREAM_PATH / "write_test.txt"
try:
with open(test_file, "w", encoding="utf-8") as probe:
probe.write("ok")
except Exception as exc: # pragma: no cover - best effort logging
logger.error("Could not write to stream directory: %s", exc)
finally:
if test_file.exists():
test_file.unlink()
def _wait_for_playlist(process: subprocess.Popen[str], attempt: int):
"""Block until the playlist appears or we hit the configured timeout."""
playlist_path = STREAM_PATH / 'stream.m3u8'
start = time.monotonic()
while True:
if playlist_path.exists():
elapsed = time.monotonic() - start
segments = sorted(seg.name for seg in playlist_path.parent.glob('segment-*.ts'))
logger.info(
"HLS playlist materialized after %.2fs at %s; segments=%s",
elapsed,
playlist_path,
segments,
)
return True, 'ready'
if ffmpeg_stop_event.is_set():
return False, 'stop_requested'
if process.poll() is not None:
return False, 'ffmpeg_exited'
elapsed = time.monotonic() - start
if elapsed >= HLS_PLAYLIST_TIMEOUT:
logger.warning(
"HLS playlist still missing after %.2fs on attempt %s; recycling FFmpeg",
elapsed,
attempt,
)
return False, 'timeout'
if ffmpeg_stop_event.wait(HLS_PLAYLIST_POLL_INTERVAL):
return False, 'stop_requested'
def _start_pipe_logger(pipe, level):
"""Drain an ffmpeg pipe on a background thread to avoid deadlocks."""
def pipe_logger():
with pipe:
for line in iter(pipe.readline, ''):
line = line.strip()
if line:
logger.log(level, "ffmpeg: %s", line)
threading.Thread(target=pipe_logger, daemon=True).start()
def _terminate_ffmpeg_process(process: subprocess.Popen[str], *, timeout: float = 5.0, log_errors: bool = True) -> None:
"""Terminate an ffmpeg process, falling back to kill if needed."""
try:
process.terminate()
process.wait(timeout=timeout)
except Exception as exc: # pragma: no cover - best effort logging
if log_errors:
logger.error(f"Error stopping FFmpeg: {exc}")
try:
process.kill()
except Exception: # pragma: no cover
pass
def _build_ffmpeg_command() -> list[str]:
"""Construct the ffmpeg command line we execute for each attempt."""
keyinfo_path = _session_keyinfo_path()
if not keyinfo_path.exists():
_write_key_material()
keyinfo_path = _session_keyinfo_path()
return [
'ffmpeg',
'-nostdin',
'-hide_banner',
'-loglevel', os.environ.get('FFMPEG_LOGLEVEL', 'warning'),
'-fflags', '+genpts',
'-thread_queue_size', str(INGEST_THREAD_QUEUE_SIZE),
'-i', f'rtmp://{INGEST_RTMP_HOST}:{INGEST_RTMP_PORT}/live/{INGEST_PSK}',
'-map', '0:v:0?',
'-map', '0:a:0?',
'-c:v', 'copy',
'-c:a', 'aac',
'-b:a', AUDIO_BITRATE,
'-ac', str(AUDIO_CHANNELS),
'-ar', str(AUDIO_SAMPLE_RATE),
'-f', 'hls',
'-hls_time', str(HLS_SEGMENT_TIME),
'-hls_list_size', str(HLS_PLAYLIST_SIZE),
'-hls_flags', 'delete_segments+independent_segments',
'-hls_delete_threshold', str(HLS_DELETE_THRESHOLD),
'-hls_key_info_file', str(keyinfo_path),
'-hls_segment_filename', str(STREAM_PATH / 'segment-%05d.ts'),
str(STREAM_PATH / 'stream.m3u8'),
]
def _run_ffmpeg_once(attempt: int) -> bool:
"""Start ffmpeg, wait for it to exit, and report whether it ran cleanly."""
global ffmpeg_process
logger.info("Starting FFmpeg for live stream (attempt %s)", attempt)
try:
process = subprocess.Popen(
_build_ffmpeg_command(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
)
except Exception as exc: # pragma: no cover - best effort logging
logger.error("Failed to start FFmpeg: %s", exc)
return False
ffmpeg_process = process
start_time = time.monotonic()
logger.info("FFmpeg process started with PID %s", process.pid)
logger.info('Stream active; waiting for playlist to appear')
_start_pipe_logger(process.stderr, logging.WARNING)
_start_pipe_logger(process.stdout, logging.DEBUG)
playlist_ready, playlist_reason = _wait_for_playlist(process, attempt)
if not playlist_ready and not ffmpeg_stop_event.is_set():
if playlist_reason == 'timeout' and process.poll() is None:
logger.info("Terminating FFmpeg attempt %s after playlist timeout", attempt)
_terminate_ffmpeg_process(process, log_errors=False)
elif playlist_reason == 'ffmpeg_exited':
logger.debug("FFmpeg exited before playlist became available on attempt %s", attempt)
exit_code = process.wait()
elapsed = time.monotonic() - start_time
ffmpeg_process = None
if ffmpeg_stop_event.is_set():
logger.info(
"FFmpeg stop requested; process exited with code %s after %.2fs",
exit_code,
elapsed,
)
return True
if not playlist_ready:
logger.error(
"FFmpeg attempt %s ended (exit %s) before playlist became available (reason=%s)",
attempt,
exit_code,
playlist_reason,
)
return False
if exit_code != 0:
logger.error("FFmpeg exited with code %s after %.2fs", exit_code, elapsed)
return False
logger.info("FFmpeg process completed successfully after %.2fs", elapsed)
return True
def _ffmpeg_worker_loop() -> None:
"""Keep spawning FFmpeg until it runs cleanly or a stop is requested."""
attempt = 0
while not ffmpeg_stop_event.is_set():
attempt += 1
if _run_ffmpeg_once(attempt):
break
if ffmpeg_stop_event.is_set():
break
_safe_reset_stream_path("between FFmpeg attempts")
logger.debug("FFmpeg worker exiting")
def start_ffmpeg_process():
"""Start FFmpeg to convert RTMP ingest into HLS."""
global ffmpeg_worker_thread
if _worker_running():
logger.warning("FFmpeg worker already running; skipping duplicate start")
return True
if _safe_reset_stream_path("before FFmpeg start"):
logger.info(f"Stream directory ready at {STREAM_PATH}")
_verify_stream_path_writable()
ffmpeg_stop_event.clear()
try:
ffmpeg_worker_thread = threading.Thread(
target=_ffmpeg_worker_loop,
daemon=True,
name="ffmpeg-worker",
)
ffmpeg_worker_thread.start()
return True
except Exception as exc: # pragma: no cover - defensive logging
logger.error(f"Failed to start FFmpeg worker thread: {exc}")
ffmpeg_worker_thread = None
return False
def cleanup_stream():
"""Stop FFmpeg and purge any cached HLS segments."""
global ffmpeg_process, ffmpeg_worker_thread
ffmpeg_stop_event.set()
if ffmpeg_process:
_terminate_ffmpeg_process(ffmpeg_process)
ffmpeg_process = None
worker = ffmpeg_worker_thread
if worker and worker.is_alive():
worker.join(timeout=5)
if worker.is_alive(): # pragma: no cover - diagnostic
logger.warning("FFmpeg worker thread did not exit cleanly")
ffmpeg_worker_thread = None
_safe_reset_stream_path("during cleanup")
ffmpeg_stop_event.clear()
# Routes
@app.route('/rtmp_callbacks/on_publish', methods=['POST'])
def on_publish():
"""Callback when a stream starts"""
stream_key = request.form.get('name')
logger.info("on_publish received for key=%s", stream_key)
if not stream_key or stream_key != INGEST_PSK:
logger.warning("Unauthorized stream key attempted to publish: %s", stream_key)
return "Unauthorized", 403
if ffmpeg_process or _worker_running():
logger.info("Stream already active, recycling existing session")
cleanup_stream()
if not start_ffmpeg_process():
return "Failed to start stream", 500
playlist_path = STREAM_PATH / 'stream.m3u8'
if playlist_path.exists():
logger.info("Existing playlist found at %s", playlist_path)
return "OK"
@app.route('/rtmp_callbacks/on_publish_done', methods=['POST'])
def on_publish_done():
"""Callback when a stream ends"""
stream_key = request.form.get('name')
logger.info("on_publish_done received for key=%s", stream_key)
if not stream_key or stream_key != INGEST_PSK:
logger.warning("on_publish_done received for unknown key: %s", stream_key)
return "Bad request", 400
if ffmpeg_process or _worker_running():
cleanup_stream()
logger.info("Stream publishing ended")
return "OK"
@app.route('/health')
def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"streaming": ffmpeg_process is not None
}
def print_instructions():
"""Print usage instructions"""
obs_url = f"rtmps://{SERVER_DOMAIN}:1935/live"
hls_url = f"https://{SERVER_DOMAIN}{HLS_ROUTE_PREFIX}/stream.m3u8"
print("\n" + "="*80)
print(f"{'OBS TO VRCHAT STREAMING PROXY':^80}")
print("="*80)
print("\n[URLS]")
print(f" OBS ingest: {obs_url}")
print(f" HLS: {hls_url}")
if SESSION_KEY_URI:
print(f" HLS key: {SESSION_KEY_URI}")
print("\n[STATUS]")
print(f" Stream is {'ACTIVE' if ffmpeg_process else 'INACTIVE'}")
print(f" Session ID: {STREAM_HEX}")
print(f" On-disk path: {STREAM_PATH}")
print("="*80 + "\n")
# Register cleanup once the module is imported so any WSGI server benefits.
atexit.register(cleanup_stream)
def main():
"""Entry point for running with a production WSGI server."""
try:
from waitress import serve
except ImportError as exc: # pragma: no cover - defensive guardrail
raise RuntimeError(
"Waitress is required to run this service. Install it with 'pip install waitress'."
) from exc
print_instructions()
host = os.environ.get('HOST', '0.0.0.0')
port = int(os.environ.get('PORT', 5000))
logger.info("Starting Waitress on %s:%s", host, port)
serve(app, host=host, port=port)
if __name__ == '__main__':
main()
|