diff options
Diffstat (limited to 'Scripts')
30 files changed, 0 insertions, 6242 deletions
diff --git a/Scripts/.gitignore b/Scripts/.gitignore deleted file mode 100644 index 3f2744f..0000000 --- a/Scripts/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -# Python generated files -__pycache__ diff --git a/Scripts/app_config.py b/Scripts/app_config.py deleted file mode 100644 index f911456..0000000 --- a/Scripts/app_config.py +++ /dev/null @@ -1,39 +0,0 @@ -import os -import sys -import typing - -def getConfig(path: str) -> typing.Dict[str, typing.Union[str, float, int, bool]]: - # Helper functions to detect and convert the type - def is_int(value: str) -> bool: - try: - int(value) - return True - except ValueError: - return False - - def is_float(value: str) -> bool: - try: - float(value) - return True - except ValueError: - return False - - def convert_value(key: str, value: str): - if key.startswith(("enable_", "remove_", "use_", "clear_")): - return bool(int(value)) - elif is_int(value): - return int(value) - elif is_float(value): - return float(value) - else: - return value - - config = {} - with open(path, 'r') as file: - for line in file: - key_value = line.strip().split(": ", maxsplit=1) - key = key_value[0] - value = key_value[1] if len(key_value) > 1 else "" - config[key] = convert_value(key, value.strip()) - return config - diff --git a/Scripts/browser_src.py b/Scripts/browser_src.py deleted file mode 100644 index 4ed3407..0000000 --- a/Scripts/browser_src.py +++ /dev/null @@ -1,138 +0,0 @@ -from transcribe_pipeline import StreamingPlugin, TranscriptCommit -from urllib.parse import urlparse - -import copy -import json -import http.server -import os -import socketserver -import threading -import time -import transcribe_pipeline -import typing - -class HTTPServer: - def __init__(self, port: int): - self.port = port - self.route_map = {} - self.httpd = None - - def register_file_handler(self, http_method: str, path: str, file_path: str): - print(f"File handler registered at {os.getcwd()}") - def handler(): - if os.path.exists(file_path): - with open(file_path, 'r', encoding='utf-8') as f: - return 200, f.read().replace('%PORT%', str(self.port)), 'text/html' - else: - return 404, {'error': 'file not found'}, 'application/json' - self.route_map[(http_method, path)] = handler - - def register_json_handler(self, http_method: str, path: str, handler): - self.route_map[(http_method, path)] = handler - - def run(self): - def handler(*args, **kwargs): - MyHandler(http_server_instance=self, *args, **kwargs) - - with socketserver.TCPServer(("", self.port), handler) as httpd: - self.httpd = httpd - print(f"Webserver running at port {self.port}") - httpd.serve_forever() - print(f"Webserver exiting") - self.httpd = None - - def stop(self): - if self.httpd: - self.httpd.shutdown() - - -class MyHandler(http.server.BaseHTTPRequestHandler): - def __init__(self, *args, http_server_instance=None, **kwargs): - self.http_server_instance = http_server_instance - super().__init__(*args, **kwargs) - - def log_message(self, format, *args): - # TODO log if cfg["debug_mode_enabled"] is set - return - - def do_GET(self): - self.handle_request('GET') - - def handle_request(self, method: str): - parsed_path = urlparse(self.path) - if (method, parsed_path.path) in self.http_server_instance.route_map: - status_code, response_content, content_type = \ - self.http_server_instance.route_map[(method, parsed_path.path)]() - self.send_response(status_code) - self.send_header('Content-Type', content_type) - self.end_headers() - if content_type == 'application/json': - self.wfile.write(json.dumps(response_content).encode('utf-8')) - else: - self.wfile.write(response_content.encode('utf-8')) - else: - self.send_response(404) - self.send_header('Content-Type', 'application/json') - self.end_headers() - self.wfile.write(json.dumps({'error': 'not found'}).encode('utf-8')) - - -class BrowserSource(StreamingPlugin): - def __init__(self, cfg: typing.Dict): - port = cfg["browser_src_port"] - print(f"Browser source running on port {port}") - self.commits = [] - self.preview_commit = None - self.http_server = HTTPServer(port) - self.http_server.register_json_handler('GET', '/api/v0/transcript', self.get_transcript_json) - - index_html_path = os.path.join("Resources", "BrowserSource", "index.html") - self.http_server.register_file_handler('GET', '/', index_html_path) - self.http_server.register_file_handler('GET', '/index.html', index_html_path) - - # Start the HTTP server in a new thread - self.server_thread = threading.Thread(target=self.run) - self.server_thread.start() - - def transform(self, commit: TranscriptCommit) -> TranscriptCommit: - original_commit = commit - commit = copy.deepcopy(original_commit) - del commit.audio - if commit.delta: - self.commits.append(commit) - # Limit commits to last N. - now = time.time() - self.commits = [commit for commit in self.commits] - max_commits = 10 - if len(self.commits) > max_commits: - self.commits = self.commits[-int(max_commits/2):] - self.preview_commit = commit - return original_commit - - # return (http_code, body, content_type) - def get_transcript_json(self) -> typing.Tuple[int, str, str]: - processed_commits = [vars(commit) for commit in self.commits] - transcript_data = { - 'commits': processed_commits, - 'preview': vars(self.preview_commit) if self.preview_commit else None, - 'ts': time.time() - } - return 200, json.dumps(transcript_data), 'text/json' - - def run(self): - self.http_server.run() - - def stop(self): - self.http_server.stop() - self.server_thread.join() - - -# Example usage -def my_callback() -> typing.Tuple[int, typing.Dict[str, str]]: - return 200, {'message': 'Hello, world!'}, 'text/json' - -if __name__ == '__main__': - server = HTTPServer(port=8080) - server.register_json_handler('GET', '/api/v0/transcript', my_callback) - server.run() - diff --git a/Scripts/cpp_transcribe.py b/Scripts/cpp_transcribe.py deleted file mode 100644 index c499769..0000000 --- a/Scripts/cpp_transcribe.py +++ /dev/null @@ -1,197 +0,0 @@ -#!/usr/bin/env python3 - -# The app loop does 2 things: -# 1. Read lines from stdin and send them into the game via OSC. -# 2. Write control info to stdout. -# The app exits when stdin closes. - -from playsound import playsound - -import argparse -import dataclasses -import generate_utils -import os -import osc_ctrl -import steamvr -import sys -import threading -import time - -@dataclasses.dataclass -class AudioState: - text: str - osc_state: osc_ctrl.OscState - enable_local_beep: int - use_builtin: int - button: str - - send_transcript: bool - run_app: bool - -def writeControlMessage(run: bool): - msg = "" - if run: - msg += "1" - else: - msg += "0" - print(f"{msg}") - -def readControllerInput(audio_state: AudioState): - session = None - first = True - while session == None and audio_state.run_app == True: - try: - session = steamvr.SessionState() - except: - print("steamvr is off, no controller input", file=sys.stderr) - session = None - time.sleep(5) - - RECORD_STATE = 0 - PAUSE_STATE = 1 - state = PAUSE_STATE - osc_ctrl.indicateSpeech(audio_state.osc_state.client, False) - osc_ctrl.indicatePaging(audio_state.osc_state.client, False) - - hand_id = steamvr.hands[audio_state.button.split()[0]] - button_id = steamvr.buttons[audio_state.button.split()[1]] - - last_rising = time.time() - while audio_state.run_app == True: - time.sleep(0.05) - - event = steamvr.pollButtonPress(session, hand_id=hand_id, - button_id=button_id) - - if event == steamvr.EVENT_RISING_EDGE: - last_rising = time.time() - elif event == steamvr.EVENT_FALLING_EDGE: - now = time.time() - if now - last_rising > 0.3: - # Long hold - state = PAUSE_STATE - if not audio_state.use_builtin: - osc_ctrl.indicateSpeech(audio_state.osc_state.client, False) - osc_ctrl.toggleBoard(audio_state.osc_state.client, False) - - osc_ctrl.send_transcript = False - osc_ctrl.clear(audio_state.osc_state) - else: - # Short hold - if state == RECORD_STATE: - state = PAUSE_STATE - if not audio_state.use_builtin: - osc_ctrl.indicateSpeech(audio_state.osc_state.client, False) - osc_ctrl.lockWorld(audio_state.osc_state.client, True) - - osc_ctrl.send_transcript = False - - if audio_state.enable_local_beep == 1: - playsound(os.path.abspath("Resources/Sounds/Noise_Off_Quiet.wav")) - elif state == PAUSE_STATE: - state = RECORD_STATE - if not audio_state.use_builtin: - osc_ctrl.indicateSpeech(audio_state.osc_state.client, True) - osc_ctrl.toggleBoard(audio_state.osc_state.client, True) - osc_ctrl.lockWorld(audio_state.osc_state.client, False) - - osc_ctrl.send_transcript = True - osc_ctrl.clear(audio_state.osc_state) - - audio_state.drop_transcription = True - audio_state.audio_paused = False - - if audio_state.enable_local_beep == 1: - playsound(os.path.abspath("Resources/Sounds/Noise_On_Quiet.wav")) - -def drainStdin(audio_state: AudioState): - while True: - try: - line = input() - except EOFError: - # Invoking process closes the write end of their stdin to signal us - # to exit. - # TODO(yum) merge all threads - audio_state.run_app = False - return - if len(line) > 0: - print(f"stdin get: {line}", file=sys.stderr) - -def mainLoop(audio_state: AudioState): - steamvr_input_thd = threading.Thread(target = readControllerInput, - args = [audio_state]) - steamvr_input_thd.daemon = True - steamvr_input_thd.start() - - drain_stdin_thd = threading.Thread(target = drainStdin, - args = [audio_state]) - drain_stdin_thd.daemon = True - drain_stdin_thd.start() - - writeControlMessage(False) - - while audio_state.run_app: - time.sleep(0.01) - writeControlMessage(audio_state.send_transcript) - -if __name__ == "__main__": - print("args: {}".format(" ".join(sys.argv)), file=sys.stderr) - - # Set cwd to TaSTT/ - abspath = os.path.abspath(__file__) - dname = os.path.dirname(abspath) - dname = os.path.dirname(dname) - dname = os.path.dirname(dname) - os.chdir(dname) - print(f"Set cwd to {os.getcwd()}", file=sys.stderr) - - parser = argparse.ArgumentParser() - parser.add_argument("--bytes_per_char", type=str, help="The number of bytes to use to represent each character") - parser.add_argument("--chars_per_sync", type=str, help="The number of characters to send on each sync event") - parser.add_argument("--rows", type=int, help="The number of rows on the board") - parser.add_argument("--cols", type=int, help="The number of columns on the board") - parser.add_argument("--enable_local_beep", type=int, - help=("Whether to play a local auditory indicator when " - "transcription starts/stops.")) - parser.add_argument("--use_builtin", type=int, - help=("If set to 1, use the text box built into the game.")) - parser.add_argument("--button", type=str, - help=("The controller button used to start/stop transcription. " - "E.g. \"left joystick\"")) - args = parser.parse_args() - - if args.bytes_per_char is None or args.chars_per_sync is None: - print("--bytes_per_char and --chars_per_sync required", file=sys.stderr) - sys.exit(1) - if args.rows is None or args.cols is None: - print("--rows and --cols required", file=sys.stderr) - sys.exit(1) - if args.button is None: - print("--button required", file=sys.stderr) - sys.exit(1) - if args.enable_local_beep is None: - print("--enable_local_beep required", file=sys.stderr) - sys.exit(1) - if args.use_builtin is None: - print("--use_builtin required", file=sys.stderr) - sys.exit(1) - - generate_utils.config.BYTES_PER_CHAR = int(args.bytes_per_char) - generate_utils.config.CHARS_PER_SYNC = int(args.chars_per_sync) - generate_utils.config.BOARD_ROWS = int(args.rows) - generate_utils.config.BOARD_COLS = int(args.cols) - - audio_state = AudioState( - text = "", - osc_state = osc_ctrl.OscState( - generate_utils.config.CHARS_PER_SYNC, - generate_utils.config.BOARD_ROWS, - generate_utils.config.BOARD_COLS), - button = args.button, - enable_local_beep = args.enable_local_beep, - use_builtin = args.use_builtin, - send_transcript = False, - run_app = True) - - mainLoop(audio_state) - diff --git a/Scripts/dump_mic_devices.py b/Scripts/dump_mic_devices.py deleted file mode 100644 index 874445c..0000000 --- a/Scripts/dump_mic_devices.py +++ /dev/null @@ -1,8 +0,0 @@ -#!/usr/bin/env python3 - -from transcribe_v2 import MicStream - -if __name__ == "__main__": - # This implicitly prints mic devices. - s = MicStream(0) - diff --git a/Scripts/emotes.py b/Scripts/emotes.py deleted file mode 100644 index 6ae0930..0000000 --- a/Scripts/emotes.py +++ /dev/null @@ -1,143 +0,0 @@ -#!/usr/bin/env python3 - -import argparse -from math import floor -import os -# python3 -m pip install pillow -from PIL import Image -import sys - -# (row, col) -TEX_SZ = (2048, 2048) - -IMG_SZ_PX = 256 -IMG_PER_ROW = int(TEX_SZ[0] / IMG_SZ_PX) -IMG_PER_COL = int(TEX_SZ[1] / IMG_SZ_PX) - -# TODO(yum) this should live in a config file. -# Note: the name of the emote must be no longer than 6 characters. -IMG_TEX_DATA = [] -IMG_TEX_DATA.append(("Images/Emotes/xdd.png", "xdd")) -IMG_TEX_DATA.append(("Images/Emotes/pog.png", "pog")) -IMG_TEX_DATA.append(("Images/Emotes/lulw.png", "laugh")) -IMG_TEX_DATA.append(("Images/Emotes/bighardo.png", "hard")) -IMG_TEX_DATA.append(("Images/Emotes/peepoHappy.png", "happy")) -IMG_TEX_DATA.append(("Images/Emotes/peepoSad.png", "sad")) -IMG_TEX_DATA.append(("Images/Emotes/bedge.png", "bed")) -IMG_TEX_DATA.append(("Images/Emotes/reallymad.png", "mad")) -IMG_TEX_DATA.append(("Images/Emotes/clueless.png", "surely")) -IMG_TEX_DATA.append(("Images/Emotes/what.png", "what")) -IMG_TEX_DATA.append(("Images/Emotes/based.png", "based")) -IMG_TEX_DATA.append(("Images/Emotes/chad.png", "chad")) -IMG_TEX_DATA.append(("Images/Emotes/aware.png", "aware")) -IMG_TEX_DATA.append(("Images/Emotes/girl.png", "girl")) -IMG_TEX_DATA = [] - -IMG_TEX_KEYWORD_TO_COORD = {} -for i in range(0, len(IMG_TEX_DATA)): - IMG_TEX_KEYWORD_TO_COORD[IMG_TEX_DATA[i][1]] = i - -# We treat images like words. To keep things simple, they're the same height as -# a word, and they're a fixed width. -IMG_SZ_LETTER_ROWS = 1 -IMG_SZ_LETTER_COLS = 6 - -def lookup(word: str): - word = word.lower() - word = ''.join(c for c in word.lower() if c.isalpha()) - if word in IMG_TEX_KEYWORD_TO_COORD.keys(): - return word, IMG_TEX_KEYWORD_TO_COORD[word] - return None, None - -def openTexture(tex_path: str): - if not os.path.exists(args.texture_path): - return Image.new("RGB", TEX_SZ) - tex = Image.open(args.texture_path) - if tex.size != TEX_SZ: - print("Texture at {} has mismatching size {}, creating new texture".format( - tex_path, tex.size), file=sys.stderr) - return Image.new("RGB", TEX_SZ) - return tex - -# Add an image to the texture at the coordinates (x, y). x and y should be in -# the range [0, IMG_PER_COL) and [0, IMG_PER_ROW) respectively. -def addImageToTexture(tex: Image, img_path: str, x: int, y:int): - # Transparent images will be composited on top of a black background. - img = Image.open(img_path).convert('RGBA') - img_bg = Image.new('RGBA', img.size, (0, 0, 0)) - img = Image.alpha_composite(img_bg, img).convert('RGB') - - max_px = IMG_SZ_PX - - # Scale the image up so it uses as much space as is given to it. - # I originally planned to support multiple scales, but this proved to be - # too much work - getting line wrapping to work with this would be a pain. - # So for now, all images are the same height as words. - scale = 1 - img_x, img_y = img.size - max_dim = max(img_x, img_y) - img_scale = (max_px / max_dim) * scale - new_sz = (int(floor(img.size[0] * img_scale)), - int(floor(img.size[1] * img_scale))) - print("Add image {}".format(img_path)) - print(" Original size: {}".format(img.size)) - print(" Scaled size: {}".format(new_sz)) - img = img.resize(new_sz) - - # Center the image within its new coordinate space. - padded_img_sz = (IMG_SZ_PX * scale, IMG_SZ_PX * scale) - padded_img = Image.new("RGB", padded_img_sz) - centered_x = int(floor((padded_img_sz[0] - new_sz[0]) / 2)) - centered_y = int(floor((padded_img_sz[1] - new_sz[1]) / 2)) - padded_img.paste(img, box=(centered_x, centered_y)) - img = padded_img - - # Break the image into tiles and write them into the texture. - for slot in range(0, scale * scale): - tile_x = slot % scale - tile_y = int(floor(slot / scale)) - tile_bbox = (tile_x * IMG_SZ_PX, tile_y * IMG_SZ_PX, (tile_x + 1) * IMG_SZ_PX, (tile_y + 1) * IMG_SZ_PX) - tile = img.crop(tile_bbox) - print(" tile {},{} (bbox={})".format(tile_x, tile_y, tile_bbox)) - - slot_x = x + slot % IMG_PER_ROW - slot_y = y + int(floor(slot / IMG_PER_ROW)) - slot_x_px = slot_x * IMG_SZ_PX - slot_y_px = slot_y * IMG_SZ_PX - print(" Add img at {},{} (px {},{})".format(slot_x, slot_y, slot_x_px, slot_y_px)) - - tex.paste(tile, box=(slot_x_px, slot_y_px)) - -def parseArgs(): - parser = argparse.ArgumentParser() - parser.add_argument("--texture_path", type=str, help="Path to save the generated texture.") - parser.add_argument("--rows", type=str, help="The number of rows on the board") - parser.add_argument("--cols", type=str, help="The number of columns on the board") - args = parser.parse_args() - - if not args.texture_path or not args.rows or not args.cols: - print("--texture_path, --rows, --cols required", file=sys.stderr) - sys.exit(1) - - return args - -if __name__ == "__main__": - args = parseArgs() - - rows = int(args.rows) - cols = int(args.cols) - # board is this much wider than tall - board_aspect_ratio = 2 - # each cell a square divided into `rows`x`cols` is this much wider than tall - cell_aspect_ratio = rows / cols - # each cell is this much wider than tall - board_cell_aspect_ratio = board_aspect_ratio * cell_aspect_ratio - - tex = openTexture(args.texture_path) - for i in range(0, len(IMG_TEX_DATA)): - filename = IMG_TEX_DATA[i][0] - x = i % IMG_PER_ROW - y = int(floor(i / IMG_PER_ROW)) - addImageToTexture(tex, filename, x, y) - tex.save(args.texture_path) - diff --git a/Scripts/emotes_v2.py b/Scripts/emotes_v2.py deleted file mode 100644 index a9c037f..0000000 --- a/Scripts/emotes_v2.py +++ /dev/null @@ -1,149 +0,0 @@ -#!/usr/bin/env python3 - -import argparse -import os -import pickle -import sys - -from math import floor -from PIL import Image -from typing import Any, Dict, List, Tuple - -# The character range [0x0000, 0xDFFF] is reserved for text. -# The range [0xE000, infinity) is left over for emotes. -EMOTES_LETTER_OFFSET = 0xE000 -EMOTES_HEIGHT = 512 -EMOTES_TEX_SZ = 4096 - -def superimpose_image(base_img: Image, overlay_img: Image, position: Tuple[int, int]) -> Image: - base_img.paste(overlay_img, position, overlay_img) - return base_img - -def i_to_pos(i, sm_wd, sm_ht, big_wd, big_ht) -> Tuple[int, int]: - x = i * sm_wd % big_wd - row = floor((i * sm_wd) / big_wd) - y = row * sm_ht - return int(x), int(y) - -def get_images_from_directory(directory_path: str) -> List[Tuple[Any, str]]: - images = [] - for filename in os.listdir(directory_path): - file_path = os.path.join(directory_path, filename) - if os.path.isfile(file_path) and file_path.endswith(".png"): - image = Image.open(file_path).convert("RGBA") - name = os.path.basename(filename).split('.')[0] - images.append((image, name)) - return images - -def split_resized_image(img, wd: int, ht: int) -> List[Any]: - aspect_ratio = img.width / img.height - width = int(ht * aspect_ratio) - img = img.resize((width, ht)) - - split_images = [] - for i in range(0, img.width, wd): - split_image = img.crop((i, 0, i + wd, ht)) - split_images.append(split_image) - - return split_images - -def resize_image_with_aspect_ratio(img: Image, aspect_ratio: float) -> Image: - original_width, original_height = img.size - new_width = int(original_height * aspect_ratio) - new_height = original_height - return img.resize((new_width, new_height)) - -def resize_image_to_height(img: Image, height: int) -> Image: - aspect_ratio = img.width / img.height - new_width = int(height * aspect_ratio) - return img.resize((new_width, height)) - -class EmotesState: - def __init__(self): - self.bits = {} - - def load(self, pickle_path): - try: - with open(pickle_path, 'rb') as f: - self.bits = pickle.load(f) - except FileNotFoundError: - print(f"Emotes map does not exist at {pickle_path}", - file=sys.stderr) - - # This is quite slow since we do a search and replace (O(n)) - # for each keyword O(m) times each variant of said keyword (O(k)). - # Thus total complexity is O(m*n*k). All three of these numbers are - # typically small: m and k typically < 10, n typically < 200. - # - # Naively one might split the input into words, but this only works for - # English-like languages. Eastern Asian languages like Japanese don't - # really divide into words AFAIK so this wouldn't work for them. - # - # Unless the performance becomes a user-reported problem, stick with this - # inefficient but reliable method. - def encode_emotes(self, msg: str): - for keyword, bits in self.bits.items(): - bits_str = "" - for bit in bits: - bits_str += chr(bit) - # ALL CAPS - tmp = keyword.upper() - msg = msg.replace(tmp, bits_str) - # lowercase - tmp = keyword.lower() - msg = msg.replace(tmp, bits_str) - # Capitalized - tmp = keyword.lower().capitalize() - msg = msg.replace(tmp, bits_str) - # dashes inserted - tmp = '-'.join(keyword.upper()) - msg = msg.replace(tmp, bits_str) - # uppercase, spaces inserted - tmp = ' '.join(keyword.upper()) - msg = msg.replace(tmp, bits_str) - # lowercase, spaces inserted - tmp = ' '.join(keyword.lower()) - msg = msg.replace(tmp, bits_str) - # uppercase, commas and spaces inserted - tmp = ', '.join(keyword.upper()) - msg = msg.replace(tmp, bits_str) - return msg - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("dir", type=str, help="directory to get images from") - parser.add_argument("board_aspect_ratio", help="aspect ratio of a cell in the board") - parser.add_argument("texture_aspect_ratio", help="aspect ratio of a cell in the texture") - parser.add_argument("tex_path", type=str, help="path to save the texture to") - parser.add_argument("pickle_path", type=str, help="path to save the texture index to") - args = parser.parse_args() - - directory_path = args.dir - board_aspect_ratio = int(args.board_aspect_ratio) - texture_aspect_ratio = int(args.texture_aspect_ratio) - - base_img = Image.new("RGBA", (EMOTES_TEX_SZ, EMOTES_TEX_SZ), (0, 0, 0, 0)) - images_and_filenames = get_images_from_directory(directory_path) - i = 0 - bits = {} # Dict[str, List[int]] - for img, filename in images_and_filenames: - print(f"Adding {filename}") - img = resize_image_with_aspect_ratio(img, board_aspect_ratio) - img = resize_image_to_height(img, EMOTES_HEIGHT) - img_fragments = split_resized_image(img, int(EMOTES_HEIGHT / texture_aspect_ratio), EMOTES_HEIGHT) - img_bits = [] # List[int] - for img_fragment in img_fragments: - i = i + 1 - img_pos = i_to_pos(i, - EMOTES_HEIGHT / texture_aspect_ratio, EMOTES_HEIGHT, - EMOTES_TEX_SZ, EMOTES_TEX_SZ) - print(f"{img_pos}") - superimpose_image(base_img, img_fragment, img_pos) - img_bits.append(EMOTES_LETTER_OFFSET + i) - emote_name = os.path.basename(filename).split('.')[0] - print(f"{emote_name} -> {img_bits}") - bits[emote_name] = img_bits - base_img.save(args.tex_path) - with open(args.pickle_path, 'wb') as f: - pickle.dump(bits, f) - diff --git a/Scripts/generate_fonts.py b/Scripts/generate_fonts.py deleted file mode 100644 index 8dc8a89..0000000 --- a/Scripts/generate_fonts.py +++ /dev/null @@ -1,184 +0,0 @@ -#!/usr/bin/env python3 - -# python3 -m pip install pillow -# License: HPND license. -from PIL import Image, ImageFont, ImageDraw - -import math - -# Use a power of 2 pixels per character so we can evenly divide the plane. -font_pixels = 128 -full_ratio = 0.75 -half_ratio = 0.5 -full_sz = int(font_pixels * full_ratio) -half_sz = int(font_pixels * half_ratio) -layout_engine = ImageFont.Layout.BASIC - -unifont = ImageFont.truetype("Fonts/unifont-15.0.01.ttf", full_sz, layout_engine=layout_engine) -unifont_half = ImageFont.truetype("Fonts/unifont-15.0.01.ttf", half_sz, layout_engine=layout_engine) - -noto_sans_mono = ImageFont.truetype( - "Fonts/Noto_Sans_Mono/static/NotoSansMono/NotoSansMono-Bold.ttf", - full_sz, index=0, layout_engine=layout_engine) - -noto_sans_sc_half = ImageFont.truetype("Fonts/Noto_Sans_Simplified_Chinese/NotoSansSC-Regular.otf", half_sz, layout_engine=layout_engine) - -noto_sans_kr_half = ImageFont.truetype("Fonts/Noto_Sans_Korean/NotoSansKR-Regular.otf", half_sz, layout_engine=layout_engine) - -n_rows = 64 -n_cols = 128 - -class FontInfo: - def __init__(self, font, dy): - self.font = font - self.dy = dy - -def allow_range(allowlist, lo_hi, font = None, dy = 0): - for i in range(lo_hi[0], lo_hi[1] + 1): - allowlist[i] = FontInfo(font, dy) -def ban_range(allowlist, lo, hi): - for i in range(lo, hi + 1): - del allowlist[i] -allowlist = {} -# ASCII -basic_latin = (32, 126) -allow_range(allowlist, basic_latin, font=noto_sans_mono, dy = -20) -# Latin-1 supplement -latin_1_supplement = (0x00A1, 0x00ff) -allow_range(allowlist, latin_1_supplement, font = noto_sans_mono) -# Latin extended-A -latin_extended_a = (0x0100, 0x017f) -allow_range(allowlist, latin_extended_a, font = noto_sans_mono) -# Latin extended-B -latin_extended_b = (0x0180, 0x024f) -allow_range(allowlist, latin_extended_b, font = noto_sans_mono) -# Spacing modifier letters -ipa_extensions = (0x0250, 0x02af) -allow_range(allowlist, ipa_extensions, font = unifont) -# Greek and Coptic -greek = (0x0370, 0x03ff) -allow_range(allowlist, greek, font = noto_sans_mono) -ban_range(allowlist, 0x0378, 0x03a2) -# Cyrillic -cyrillic = (0x0400, 0x04ff) -allow_range(allowlist, cyrillic, font = unifont) -# Currency symbols -currency_symbols = (0x20a0, 0x20c0) -allow_range(allowlist, currency_symbols, font = noto_sans_mono) - -# CJK -# -hangul_jamo = (0x1100, 0x11FF) -allow_range(allowlist, hangul_jamo, font = noto_sans_kr_half) -# -general_punctuation = (0x2000, 0x206f) -allow_range(allowlist, general_punctuation, font = noto_sans_mono) -# -kangxi_radicals = (0x2f00, 0x2fdf) -allow_range(allowlist, kangxi_radicals, font = noto_sans_sc_half) -# -cjk_symbols_and_punctuation = (0x3000, 0x303f) -allow_range(allowlist, cjk_symbols_and_punctuation, font = noto_sans_sc_half) -# -hiragana = (0x3041, 0x309f) -allow_range(allowlist, hiragana, font = noto_sans_sc_half) -ban_range(allowlist, 0x3097, 0x3098) -# -katakana = (0x30a0, 0x30ff) -allow_range(allowlist, katakana, font = noto_sans_sc_half) -# -hangul_compatibility_jamo = (0x3130, 0x318f) -allow_range(allowlist, hangul_compatibility_jamo, font = noto_sans_sc_half) -# -enclosed_cjk_letters_and_months = (0x3200, 0x32FF) -allow_range(allowlist, enclosed_cjk_letters_and_months, font = noto_sans_sc_half) -# -cjk_compatibility = (0x3300, 0x33ff) -allow_range(allowlist, cjk_compatibility, font = noto_sans_sc_half) -# -cjk_unified_extension_a = (0x3400, 0x4dbf) -allow_range(allowlist, cjk_unified_extension_a, font = noto_sans_sc_half) -# -cjk_ideographs = (0x4e00, 0x9fff) -allow_range(allowlist, cjk_ideographs, font = noto_sans_sc_half) -# -hangul_syllables = (0xAC00, 0xD7A3) -allow_range(allowlist, hangul_syllables, font = noto_sans_kr_half) -# -halfwidth_and_fullwidth = (0xff00, 0xffef) -allow_range(allowlist, halfwidth_and_fullwidth, font = noto_sans_sc_half) - -def in_range(x, range_pair) -> bool: - return x >= range_pair[0] and x <= range_pair[1] - -max_char = max(allowlist) -print("max char: {}".format(max_char)) -print("num chars: {}".format(len(allowlist))) - -def genUnicode(): - total_rows = math.ceil(max_char / n_cols) - print("total rows {}".format(total_rows)) - total_textures = math.ceil(total_rows / n_rows) - print("total textures {}".format(total_textures)) - - for nth_texture in range(0, total_textures): - # Create an 8K grayscale ("L") or black and white ("1") image - # Unity will re-encode b&w to grayscale, so using b&w just helps keep - # the package size low (we vendor these, we don't generate them - # client-side). - image = Image.new(mode="1", size=(8192,8192), color=0) - draw = ImageDraw.Draw(image) - - row_begin = nth_texture * n_rows - - for row in range(row_begin, row_begin + n_rows): - line = "" - for col in range(0, n_cols): - # Generate the unicode character for this spot. - n = row * n_cols + col - char = None - font_info = None - if n in allowlist.keys(): - char = chr(n) - font_info = allowlist[n] - else: - char = " " - font_info = FontInfo(unifont, 0) - # Hack: Chinese, Japanese, and Korean characters are all double - # width and are all on textures [1,6]. To fit them in the same - # grid, we use a half-size font. - draw.text((col * font_pixels / 2, (row - row_begin) * font_pixels + - font_info.dy), char, font=font_info.font, fill=255) - - image.save("Fonts/Bitmaps/font-%01d.png" % nth_texture) - -def genASCII(): - # Create an 8k grayscale image. 16 glyphs wide, 8 glyphs tall. - # Only characters on the range [0, 128). - image = Image.new(mode="RGBA", size=(8192,8192), color=0) - draw = ImageDraw.Draw(image) - n_rows = 8 - n_cols = 16 - - font = ImageFont.truetype( - "Fonts/Noto_Sans_Mono/static/NotoSansMono/NotoSansMono-Bold.ttf", - int((8192 / 8) * 0.75), index=0, layout_engine=layout_engine) - - for row in range(0, n_rows): - for col in range(0, n_cols): - n = row * n_cols + col - char = None - font_info = None - if n in allowlist.keys(): - char = chr(n) - else: - char = " " - draw.text((col * font_pixels * 8 / 2, row * font_pixels * 8 - 20), - char, font=font, fill=(255,255,255)) - image.save("Fonts/Bitmaps/font-ascii.png") - -if __name__ == "__main__": - print("Generating unicode fonts") - #genUnicode() - print("Generating ASCII fonts") - genASCII() diff --git a/Scripts/generate_menu.py b/Scripts/generate_menu.py deleted file mode 100644 index 2da50b2..0000000 --- a/Scripts/generate_menu.py +++ /dev/null @@ -1,41 +0,0 @@ -#!/usr/bin/env python3 - -import argparse -import sys - -MENU_SUFFIX = """ - - name: TaSTT - icon: {fileID: 0} - type: 103 - parameter: - name: - value: 1 - style: 0 - subMenu: {fileID: 11400000, guid: 111d8d5f909f534429bfe46268723200, type: 2} - subParameters: [] - labels: [] -"""[1:] - -def append(old_path, new_path): - merged = "" - with open(old_path, "r") as f: - merged = f.read() - merged += MENU_SUFFIX - with open(new_path, "w") as f: - f.write(merged) - -if __name__ == "__main__": - - parser = argparse.ArgumentParser() - parser.add_argument("--old_menu", type=str, help="The menu to append to") - parser.add_argument("--new_menu", type=str, help="The menu to create") - args = parser.parse_args() - - if not args.old_menu or not args.new_menu: - print("--old_menu and --new_menu are both required", - file=sys.stderr) - parser.print_help() - parser.exit(1) - - append(args.old_menu, args.new_menu) - diff --git a/Scripts/generate_params.py b/Scripts/generate_params.py deleted file mode 100644 index 0d47fde..0000000 --- a/Scripts/generate_params.py +++ /dev/null @@ -1,131 +0,0 @@ -#!/usr/bin/env python3 - -import app_config -import argparse -import generate_utils -import sys - -PARAM_HEADER = """ -%YAML 1.1 -%TAG !u! tag:unity3d.com,2011: ---- !u!114 &11400000 -MonoBehaviour: - m_ObjectHideFlags: 0 - m_CorrespondingSourceObject: {fileID: 0} - m_PrefabInstance: {fileID: 0} - m_PrefabAsset: {fileID: 0} - m_GameObject: {fileID: 0} - m_Enabled: 1 - m_EditorHideFlags: 0 - m_Script: {fileID: -1506855854, guid: 67cc4cb7839cd3741b63733d5adf0442, type: 3} - m_Name: TaSTT_params - m_EditorClassIdentifier: - parameters: -"""[1:] - -INT_PARAM = """ - - name: %PARAM_NAME% - valueType: 0 - saved: 0 - defaultValue: 0 - networkSynced: %SYNCED% -"""[1:] - -BOOL_PARAM = """ - - name: %PARAM_NAME% - valueType: 2 - saved: %SAVED% - defaultValue: 0 - networkSynced: %SYNCED% -"""[1:] - -FLOAT_PARAM = """ - - name: %PARAM_NAME% - valueType: 1 - saved: 0 - defaultValue: %DEFAULT_FLOAT% - networkSynced: %SYNCED% -"""[1:] - -def generate(cfg): - result = "" - - # We're working with an 84-character board, and each FX layer is responsible - # for 8 of those characters. - params = {} - params["SAVED"] = "0" - params["DEFAULT_FLOAT"] = "0" - params["SYNCED"] = "1" - - params["PARAM_NAME"] = generate_utils.getDummyParam() - result += generate_utils.replaceMacros(BOOL_PARAM, params) - - params["PARAM_NAME"] = generate_utils.getEnableParam() - result += generate_utils.replaceMacros(BOOL_PARAM, params) - - params["PARAM_NAME"] = generate_utils.getEllipsisParam() - result += generate_utils.replaceMacros(BOOL_PARAM, params) - - if not cfg["enable_phonemes"]: - params["SYNCED"] = "0" - for i in range(5): - params["PARAM_NAME"] = generate_utils.getSoundParam(i+1) - result += generate_utils.replaceMacros(BOOL_PARAM, params) - params["PARAM_NAME"] = generate_utils.getEnablePhonemeParam() - result += generate_utils.replaceMacros(BOOL_PARAM, params) - params["SYNCED"] = "1" - - params["PARAM_NAME"] = generate_utils.getScaleParam() - params["DEFAULT_FLOAT"] = "0.05" - result += generate_utils.replaceMacros(FLOAT_PARAM, params) - params["DEFAULT_FLOAT"] = "0" - - params["PARAM_NAME"] = generate_utils.getToggleParam() - result += generate_utils.replaceMacros(BOOL_PARAM, params) - - params["PARAM_NAME"] = generate_utils.getLockWorldParam() - result += generate_utils.replaceMacros(BOOL_PARAM, params) - - params["PARAM_NAME"] = generate_utils.getClearBoardParam() - result += generate_utils.replaceMacros(BOOL_PARAM, params) - - params["PARAM_NAME"] = generate_utils.getSelectParam() - result += generate_utils.replaceMacros(INT_PARAM, params) - - for byte in range(0, generate_utils.config.BYTES_PER_CHAR): - for i in range(0, generate_utils.config.CHARS_PER_SYNC): - params["PARAM_NAME"] = generate_utils.getBlendParam(i, byte) - result += generate_utils.replaceMacros(FLOAT_PARAM, params) - - return result - -def append(old_path, params, new_path): - merged = "" - with open(old_path, "r") as f: - merged = f.read() - merged += params - with open(new_path, "w") as f: - f.write(merged) - -if __name__ == "__main__": - - parser = argparse.ArgumentParser() - parser.add_argument("--old_params", type=str, help="The parameters to append to") - parser.add_argument("--new_params", type=str, help="The parameters to create") - parser.add_argument("--config", type=str, help="The path to the app config.") - parser.add_argument("--chars_per_sync", type=str, help="The number of characters to send on each sync event") - args = parser.parse_args() - - if not args.old_params or not args.new_params: - print("--old_params and --new_params are both required", - file=sys.stderr) - parser.print_help() - parser.exit(1) - - cfg = app_config.getConfig(args.config) - - generate_utils.config.BYTES_PER_CHAR = int(cfg["bytes_per_char"]) - generate_utils.config.CHARS_PER_SYNC = int(cfg["chars_per_sync"]) - - append(args.old_params, generate(cfg), args.new_params) - diff --git a/Scripts/generate_shader.py b/Scripts/generate_shader.py deleted file mode 100644 index 80f6704..0000000 --- a/Scripts/generate_shader.py +++ /dev/null @@ -1,161 +0,0 @@ -#!/usr/bin/env python3 - -import argparse -import generate_utils -import os -import sys - -# A single parameter looks like this: -# _Letter_Row00_Col00_Byte0("_Letter_Row00_Col00_Byte0", float) = 0 -def generateUnityParams(nbytes: int, nrows: int, ncols: int, prefix: str = "") -> str: - lines = [] - lines.append(prefix + "// BEGIN GENERATED CODE BLOCK") - for byte in range(0, nbytes): - for row in range(0, nrows): - for col in range(0, ncols): - param_name = generate_utils.getShaderParamByRowColByte(row, col, byte) - line = prefix + """{}("{}", float) = 0""".format(param_name, param_name) - lines.append(line) - lines.append(prefix + "// END GENERATED CODE BLOCK") - return '\n'.join(lines) - -# A single parameter looks like this: -# float _Letter_Row00_Col00_Byte0; -def generateCgParams(nbytes: int, nrows: int, ncols: int, prefix: str = "") -> str: - lines = [] - lines.append(prefix + "// BEGIN GENERATED CODE BLOCK") - for byte in range(0, nbytes): - for row in range(0, nrows): - for col in range(0, ncols): - param_name = generate_utils.getShaderParamByRowColByte(row, col, byte) - line = prefix + """float {};""".format(param_name) - lines.append(line) - lines.append(prefix + "// END GENERATED CODE BLOCK") - return '\n'.join(lines) - -# Define 3 constants: -# uniform int BYTES_PER_CHAR = $nbytes; -# uniform int NROWS = $nrows; -# uniform int NCOLS = $ncols; -def generateCgConstants(nbytes: int, board_nrows: int, board_ncols: int, - texture_nrows: int, texture_ncols: int, prefix: str = "") -> str: - lines = [] - lines.append(prefix + "// BEGIN GENERATED CODE BLOCK") - lines.append(prefix + "#define BYTES_PER_CHAR {}".format(nbytes)) - lines.append(prefix + "#define BOARD_NROWS {}".format(board_nrows)) - lines.append(prefix + "#define BOARD_NCOLS {}".format(board_ncols)) - lines.append(prefix + "#define TEXTURE_NROWS {}".format(texture_nrows)) - lines.append(prefix + "#define TEXTURE_NCOLS {}".format(texture_ncols)) - lines.append(prefix + "// END GENERATED CODE BLOCK") - return '\n'.join(lines) - -# This is the basic idea of what we're generating: -# // Get the value of the parameter for the cell we're in. -# uint GetLetterParameter(float2 uv) -# { -# float CHAR_COL = floor(uv.x * Cols); -# float CHAR_ROW = floor(uv.y * Rows); -# uint res = 0; -# -# [forcecase] switch(CHAR_ROW) { -# case n: -# case n-1: -# ... -# -# [forcecase] switch (CHAR_COL) { -# case 0: -# case 1: -# ... -# -# res |= ((uint) _Letter_Row00_Col00_Byte0) << (0 * 8); -# res |= ((uint) _Letter_Row00_Col00_Byte1) << (1 * 8); -# continue; -# } -# } -# return res; -# } -# In English, this provides an accessor to the many (possibly thousands) -# float parameters which hold the text on the board. -def generateLetterAccessor(nbytes: int, nrows: int, ncols: int, prefix: str = "") -> str: - lines = [] - lines.append(prefix + "// BEGIN GENERATED CODE BLOCK") - lines.append(prefix + "[forcecase] switch (CHAR_ROW) {") - for row in range(0, nrows): - lines.append(prefix + " case {}:".format(nrows - (row + 1))) - lines.append(prefix + " [forcecase] switch (CHAR_COL) {") - for col in range(0, ncols): - lines.append(prefix + " case {}:".format(col)) - for byte in range(0, nbytes): - param_name = generate_utils.getShaderParamByRowColByte(row, col, byte) - lines.append(prefix + " res |= ((uint) {}) << ({} * 8);".format(param_name, byte)) - lines.append(prefix + " return res;") - lines.append(prefix + " default:") - lines.append(prefix + " return 0;") - lines.append(prefix + " }") - lines.append(prefix + "}") - lines.append(prefix + "// END GENERATED CODE BLOCK") - return '\n'.join(lines) - -# Replace any line containing `macro` with `replacement`. -def applyLineMacro(old_path: str, new_path: str, macro: str, replacement: str) -> bool: - new_lines = [] - times_applied = 0 - with open(old_path, 'r', encoding="utf-8") as f: - for line in f: - if line[-1] == '\n': - line = line[0:len(line)-1] - if macro in line: - new_lines.append(replacement) - times_applied += 1 - else: - new_lines.append(line) - with open(new_path, 'w', encoding="utf-8") as f: - f.write('\n'.join(new_lines)) - return times_applied - -if __name__ == "__main__": - print("args: {}".format(" ".join(sys.argv))) - - parser = argparse.ArgumentParser() - parser.add_argument("--bytes_per_char", type=str, help="The number of bytes to use to represent each character") - parser.add_argument("--board_rows", type=str, help="The number of rows on the board") - parser.add_argument("--board_cols", type=str, help="The number of columns on the board") - parser.add_argument("--texture_rows", type=str, help="The number of rows on the font textures") - parser.add_argument("--texture_cols", type=str, help="The number of columns on the font textures") - parser.add_argument("--shader_template", type=str, help="The path to the shader template") - parser.add_argument("--shader_path", type=str, help="The path where the generated shader will be written") - args = parser.parse_args() - - if not args.bytes_per_char or not args.board_rows or not args.board_cols \ - or not args.texture_rows or not args.texture_cols \ - or not args.shader_template or not args.shader_path: - print(("--bytes_per_char, --board_rows, --board_cols, --texture_rows, " - "--texture_cols, --shader_template, --shader_path required"), file=sys.stderr) - sys.exit(1) - - nbytes = int(args.bytes_per_char) - board_nrows = int(args.board_rows) - board_ncols = int(args.board_cols) - texture_nrows = int(args.texture_rows) - texture_ncols = int(args.texture_cols) - - replacement = generateUnityParams(nbytes, board_nrows, board_ncols, prefix = "") - #print(replacement) - macro = "// %TEMPLATE__UNITY_ROW_COL_PARAMS%" - applyLineMacro(args.shader_template, args.shader_path, macro, replacement) - - replacement = generateCgParams(nbytes, board_nrows, board_ncols, prefix = " ") - #print(replacement) - macro = "// %TEMPLATE__CG_ROW_COL_PARAMS%" - applyLineMacro(args.shader_path, args.shader_path, macro, replacement) - - replacement = generateCgConstants(nbytes, board_nrows, board_ncols, - texture_nrows, texture_ncols, prefix = " ") - #print(replacement) - macro = "// %TEMPLATE__CG_ROW_COL_CONSTANTS%" - applyLineMacro(args.shader_path, args.shader_path, macro, replacement) - - replacement = generateLetterAccessor(nbytes, board_nrows, board_ncols, prefix = " ") - #print(replacement) - macro = "// %TEMPLATE__CG_LETTER_ACCESSOR%" - applyLineMacro(args.shader_path, args.shader_path, macro, replacement) diff --git a/Scripts/generate_utils.py b/Scripts/generate_utils.py deleted file mode 100644 index ccc92fc..0000000 --- a/Scripts/generate_utils.py +++ /dev/null @@ -1,134 +0,0 @@ -from math import ceil -from math import floor - -def replaceMacros(lines, macro_defs): - for k,v in macro_defs.items(): - lines = lines.replace("%" + k + "%", v) - return lines - -class Config(): - def __init__(self): - self.BOARD_ROWS=4 - self.BOARD_COLS=48 - self.CHARS_PER_CELL=256 - self.BYTES_PER_CHAR=2 - self.CHARS_PER_SYNC=10 - - def numRegions(self, which_layer): - num_cells = self.BOARD_ROWS * self.BOARD_COLS - layers_in_last_region = num_cells % self.CHARS_PER_SYNC - float_result = num_cells / self.CHARS_PER_SYNC - if which_layer >= layers_in_last_region: - return floor(float_result) - else: - return ceil(float_result) - - def layerNeedsParity(self, which_layer): - num_cells = self.BOARD_ROWS * self.BOARD_COLS - layers_in_last_region = num_cells % self.CHARS_PER_SYNC - if layers_in_last_region > 0 and which_layer >= layers_in_last_region: - return True - else: - return False - -config = Config() - -# Implementation detail. We use this parameter to return from the terminal -# state of the FX layer to the starting state. -def getDummyParam(): - return "TaSTT_Dummy" - -def getToggleParam(): - return "TaSTT_Toggle" - -def getScaleParam(): - return "TaSTT_Scale" - -def getEnablePhonemeParam(): - return "TaSTT_Enable_Phoneme" - -# When this is set to true, the board clears. -def getClearBoardParam(): - return "TaSTT_Clear_Board" - -def getLockWorldParam(): - return "TaSTT_Lock_World" - -# Each layer controls a group of cells. There's only one letter per layer, thus -# this is also the name of the parameter which sets the letter for a layer. -def getLayerParam(which_layer: int, byte: int) -> str: - return "TaSTT_L%02dB%01d" % (which_layer, byte) - -def getLayerName(which_layer: int, byte: int) -> str: - return getLayerParam(which_layer, byte) - -def getBlendParam(which_layer: int, byte: int) -> str: - return "TaSTT_L%02dB%01d_Blend" % (which_layer, byte) - -def getDefaultStateName(which_layer:int , byte: int): - return "TaSTT_L%02dB%01d_Do_Nothing" % (which_layer, byte) - -def getActiveStateName(which_layer: int, byte: int): - return "TaSTT_L%02dB%01d_Active" % (which_layer, byte) - -def getSelectStateName(which_layer, select): - return "TaSTT_L%02d_S%02d_B%01d" % (which_layer, select, byte) - -def getBlendStateName(which_layer, select, byte): - return "TaSTT_L%02d_S%02d_B%01d_Blend" % (which_layer, select, byte) - -def getLetterStateName(which_layer, select, letter, byte): - return "TaSTT_L%02d_S%02d_L%03d_B%01d" % (which_layer, select, letter, byte) - -def getSelectParam() -> str: - return "TaSTT_Select" - -def getEnableParam(): - return "TaSTT_Enable" - -def getSoundParam(i: int): - return f"TaSTT_Sound{str(i)}" - -def getEllipsisParam(): - return "TaSTT_Ellipsis" - -def getBoardIndex(which_layer, select): - # Because we divide the board into a multiple of 8 cells, some cells may - # describe animations which don't exist, depending on the size of the board. - # We work around this by simply wrapping those animations back to the top - # of the board, and rely on the OSC controller to simply not reference - # those cells. - return (select * config.CHARS_PER_SYNC + which_layer) % (config.BOARD_ROWS * config.BOARD_COLS) - -def getShaderParamByRowColByte(row, col, byte): - return "_Letter_Row%02d_Col%02d_Byte%01d" % (row, col, byte) - -# Mapping from layer to shader param. -def getShaderParam(which_layer, select, byte): - index = getBoardIndex(which_layer, select) - - col = index % config.BOARD_COLS - row = floor(index / config.BOARD_COLS) - - return getShaderParamByRowCol(row, col, byte) - -# The name of the animation which writes `letter` at a specific position in the -# display. -def getLetterAnimationName(row, col, letter, nth_byte): - return "R%02dC%02dL%02dB%01d" % (row, col, letter, nth_byte) - -# The name of the animation which clears the entire board. -def getClearAnimationName(): - return "TaSTT_Clear_Board" - -def getAnimationNameByLayerAndIndex(which_layer, select, letter, nth_byte): - index = getBoardIndex(which_layer, select) - - col = index % config.BOARD_COLS - row = floor(index / config.BOARD_COLS) - - return "R%02dC%02dL%02dB%01d" % (row, col, letter, nth_byte) - -# Returns the path to the animation for the given shader parameter + letter. -def getAnimationPath(shader_param, letter): - return "generated/animations/%s_Letter%02d.anim" % (shader_param, letter) diff --git a/Scripts/keybind_event_machine.py b/Scripts/keybind_event_machine.py deleted file mode 100644 index 3ce6794..0000000 --- a/Scripts/keybind_event_machine.py +++ /dev/null @@ -1,21 +0,0 @@ -import keyboard -import time - -class KeybindEventMachine: - def __init__(self, keybind: str): - self.keybind = keybind - self.events = [] - keyboard.add_hotkey(keybind, self.onPress) - - def onPress(self) -> None: - self.events.append(time.time()) - - # Returns the timestamp when the keybind was pressed, or 0 if no keypresses - # are queued. - def getNextPressTime(self) -> int: - if len(self.events) == 0: - return 0 - ret = self.events[0] - self.events = self.events[1:] - return ret - diff --git a/Scripts/lang_compat.py b/Scripts/lang_compat.py deleted file mode 100644 index af35921..0000000 --- a/Scripts/lang_compat.py +++ /dev/null @@ -1,58 +0,0 @@ -# This file provides mappings between language codes used by different -# third-party libraries. - -# Whisper to NLLB. -whisper_to_nllb = { - "catalan": "cat_Ltn", # catalan - "czech": "ces_Latn", # czech - "danish": "dan_Latn", # danish - "dutch": "nld_Latn", # dutch - "english": "eng_Latn", # english - "finnish": "fin_Latn", # finnish - "french": "fra_Latn", # french - "german": "deu_Latn", # german - "greek": "ell_Grek", # greek - "hungarian": "hun_Latn", # hungarian - "icelandic": "isl_Latn", # icelandic - "italian": "ita_Latn", # italian - "latvian": "lvs_Latn", # latvian - "lithuanian": "lit_Latn", # lithuanian - "norwegian": "nob_Latn", # norwegian (bokmal) - "polish": "pol_Latn", # polish - "portugese": "por_Latn", # portugese - "romanian": "ron_Latn", # romanian - "russian": "rus_Cyrl", # russian - "slovak": "slk_Latn", # slovak - "slovene": "slv_Latn", # slovene - "spanish": "spa_Latn", # spanish - "swedish": "swe_Latn", # swedish - "turkish": "tur_Latn", # turkish - } - -# NLLB to sentence_splitter (SS). -nllb_to_ss = { - "cat_Ltn": "ca", # catalan - "ces_Latn": "cs", # czech - "dan_Latn": "da", # danish - "nld_Latn": "nl", # dutch - "eng_Latn": "en", # english - "fin_Latn": "fi", # finnish - "fra_Latn": "fr", # french - "deu_Latn": "de", # german - "ell_Grek": "el", # greek - "hun_Latn": "hu", # hungarian - "isl_Latn": "is", # icelandic - "ita_Latn": "it", # italian - "lvs_Latn": "lv", # latvian - "lit_Latn": "lt", # lithuanian - "nob_Latn": "no", # norwegian (bokmal) - "pol_Latn": "pl", # polish - "por_Latn": "pt", # portugese - "ron_Latn": "ro", # romanian - "rus_Cyrl": "ru", # russian - "slk_Latn": "sk", # slovak - "slv_Latn": "sl", # slovene - "spa_Latn": "es", # spanish - "swe_Latn": "sv", # swedish - "tur_Latn": "tr", # turkish - } diff --git a/Scripts/libtastt.py b/Scripts/libtastt.py deleted file mode 100644 index 81baa8b..0000000 --- a/Scripts/libtastt.py +++ /dev/null @@ -1,1085 +0,0 @@ -#!/usr/bin/env python3 - -import app_config -import argparse -import array -import generate_utils -import libunity -import os -import pickle -import sys -import typing - -# TODO(yum) we're getting the encoding scheme from here, but I think it should -# be in a different layer. -import osc_ctrl - -SCALE_ANIMATION_TEMPLATE = """ -%YAML 1.1 -%TAG !u! tag:unity3d.com,2011: ---- !u!74 &7400000 -AnimationClip: - m_ObjectHideFlags: 0 - m_CorrespondingSourceObject: {fileID: 0} - m_PrefabInstance: {fileID: 0} - m_PrefabAsset: {fileID: 0} - m_Name: TaSTT_Scale_0 - serializedVersion: 6 - m_Legacy: 0 - m_Compressed: 0 - m_UseHighQualityCurve: 1 - m_RotationCurves: [] - m_CompressedRotationCurves: [] - m_EulerCurves: [] - m_PositionCurves: [] - m_ScaleCurves: - - curve: - serializedVersion: 2 - m_Curve: - - serializedVersion: 3 - time: 0 - value: {x: 5, y: 5, z: 5} - inSlope: {x: 0, y: 0, z: 0} - outSlope: {x: 0, y: 0, z: 0} - tangentMode: 0 - weightedMode: 0 - inWeight: {x: 0, y: 0.33333334, z: 0.33333334} - outWeight: {x: 0, y: 0.33333334, z: 0.33333334} - m_PreInfinity: 2 - m_PostInfinity: 2 - m_RotationOrder: 4 - path: World Constraint/Container/TaSTT - m_FloatCurves: [] - m_PPtrCurves: [] - m_SampleRate: 60 - m_WrapMode: 0 - m_Bounds: - m_Center: {x: 0, y: 0, z: 0} - m_Extent: {x: 0, y: 0, z: 0} - m_ClipBindingConstant: - genericBindings: - - serializedVersion: 2 - path: 1272388438 - attribute: 3 - script: {fileID: 0} - typeID: 4 - customType: 0 - isPPtrCurve: 0 - - serializedVersion: 2 - path: 1272388438 - attribute: 1225223716 - script: {fileID: 0} - typeID: 23 - customType: 0 - isPPtrCurve: 0 - pptrCurveMapping: [] - m_AnimationClipSettings: - serializedVersion: 2 - m_AdditiveReferencePoseClip: {fileID: 0} - m_AdditiveReferencePoseTime: 0 - m_StartTime: 0 - m_StopTime: 0.016666668 - m_OrientationOffsetY: 0 - m_Level: 0 - m_CycleOffset: 0 - m_HasAdditiveReferencePose: 0 - m_LoopTime: 1 - m_LoopBlend: 0 - m_LoopBlendOrientation: 0 - m_LoopBlendPositionY: 0 - m_LoopBlendPositionXZ: 0 - m_KeepOriginalOrientation: 0 - m_KeepOriginalPositionY: 1 - m_KeepOriginalPositionXZ: 0 - m_HeightFromFeet: 0 - m_Mirror: 0 - m_EditorCurves: - - curve: - serializedVersion: 2 - m_Curve: - - serializedVersion: 3 - time: 0 - value: 5 - inSlope: 0 - outSlope: 0 - tangentMode: 136 - weightedMode: 0 - inWeight: 0 - outWeight: 0 - m_PreInfinity: 2 - m_PostInfinity: 2 - m_RotationOrder: 4 - attribute: m_LocalScale.x - path: World Constraint/Container/TaSTT - classID: 4 - script: {fileID: 0} - - curve: - serializedVersion: 2 - m_Curve: - - serializedVersion: 3 - time: 0 - value: 5 - inSlope: 0 - outSlope: 0 - tangentMode: 136 - weightedMode: 0 - inWeight: 0 - outWeight: 0 - m_PreInfinity: 2 - m_PostInfinity: 2 - m_RotationOrder: 4 - attribute: m_LocalScale.y - path: World Constraint/Container/TaSTT - classID: 4 - script: {fileID: 0} - - curve: - serializedVersion: 2 - m_Curve: - - serializedVersion: 3 - time: 0 - value: 5 - inSlope: 0 - outSlope: 0 - tangentMode: 136 - weightedMode: 0 - inWeight: 0 - outWeight: 0 - m_PreInfinity: 2 - m_PostInfinity: 2 - m_RotationOrder: 4 - attribute: m_LocalScale.z - path: World Constraint/Container/TaSTT - classID: 4 - script: {fileID: 0} - m_EulerEditorCurves: [] - m_HasGenericRootTransform: 0 - m_HasMotionFloatCurves: 0 - m_Events: [] -""" - -SOUND_ANIMATION_TEMPLATE = """ -%YAML 1.1 -%TAG !u! tag:unity3d.com,2011: ---- !u!74 &7400000 -AnimationClip: - m_ObjectHideFlags: 0 - m_CorrespondingSourceObject: {fileID: 0} - m_PrefabInstance: {fileID: 0} - m_PrefabAsset: {fileID: 0} - m_Name: Sound1_On - serializedVersion: 6 - m_Legacy: 0 - m_Compressed: 0 - m_UseHighQualityCurve: 1 - m_RotationCurves: [] - m_CompressedRotationCurves: [] - m_EulerCurves: [] - m_PositionCurves: [] - m_ScaleCurves: [] - m_FloatCurves: - - curve: - serializedVersion: 2 - m_Curve: - - serializedVersion: 3 - time: 0 - value: 1 - inSlope: Infinity - outSlope: Infinity - tangentMode: 103 - weightedMode: 0 - inWeight: 0 - outWeight: 0 - m_PreInfinity: 2 - m_PostInfinity: 2 - m_RotationOrder: 4 - attribute: m_IsActive - path: World Constraint/Container/TaSTT/Audio 1 - classID: 1 - script: {fileID: 0} - m_PPtrCurves: [] - m_SampleRate: 60 - m_WrapMode: 0 - m_Bounds: - m_Center: {x: 0, y: 0, z: 0} - m_Extent: {x: 0, y: 0, z: 0} - m_ClipBindingConstant: - genericBindings: - - serializedVersion: 2 - path: 2267216663 - attribute: 2086281974 - script: {fileID: 0} - typeID: 1 - customType: 0 - isPPtrCurve: 0 - pptrCurveMapping: [] - m_AnimationClipSettings: - serializedVersion: 2 - m_AdditiveReferencePoseClip: {fileID: 0} - m_AdditiveReferencePoseTime: 0 - m_StartTime: 0 - m_StopTime: 0 - m_OrientationOffsetY: 0 - m_Level: 0 - m_CycleOffset: 0 - m_HasAdditiveReferencePose: 0 - m_LoopTime: 0 - m_LoopBlend: 0 - m_LoopBlendOrientation: 0 - m_LoopBlendPositionY: 0 - m_LoopBlendPositionXZ: 0 - m_KeepOriginalOrientation: 0 - m_KeepOriginalPositionY: 1 - m_KeepOriginalPositionXZ: 0 - m_HeightFromFeet: 0 - m_Mirror: 0 - m_EditorCurves: [] - m_EulerEditorCurves: [] - m_HasGenericRootTransform: 0 - m_HasMotionFloatCurves: 0 - m_Events: [] -""" - -LETTER_ANIMATION_TEMPLATE = """ -%YAML 1.1 -%TAG !u! tag:unity3d.com,2011: ---- !u!74 &7400000 -AnimationClip: - m_ObjectHideFlags: 0 - m_CorrespondingSourceObject: {fileID: 0} - m_PrefabInstance: {fileID: 0} - m_PrefabAsset: {fileID: 0} - m_Name: REPLACEME_ANIMATION_NAME - serializedVersion: 6 - m_Legacy: 0 - m_Compressed: 0 - m_UseHighQualityCurve: 1 - m_RotationCurves: [] - m_CompressedRotationCurves: [] - m_EulerCurves: [] - m_PositionCurves: [] - m_ScaleCurves: [] - m_FloatCurves: - - curve: - serializedVersion: 2 - m_Curve: - - serializedVersion: 3 - time: 0 - value: REPLACEME_LETTER_VALUE - inSlope: 0 - outSlope: 0 - tangentMode: 136 - weightedMode: 0 - inWeight: 0 - outWeight: 0 - - serializedVersion: 3 - time: 0.016666668 - value: REPLACEME_LETTER_VALUE - inSlope: 0 - outSlope: 0 - tangentMode: 136 - weightedMode: 0 - inWeight: 0 - outWeight: 0 - m_PreInfinity: 2 - m_PostInfinity: 2 - m_RotationOrder: 4 - attribute: material.REPLACEME_LETTER_PARAM - path: TaSTT - classID: 23 - script: {fileID: 0} - - curve: - serializedVersion: 2 - m_Curve: - - serializedVersion: 3 - time: 0 - value: REPLACEME_LETTER_VALUE - inSlope: 0 - outSlope: 0 - tangentMode: 136 - weightedMode: 0 - inWeight: 0 - outWeight: 0 - - serializedVersion: 3 - time: 0.016666668 - value: REPLACEME_LETTER_VALUE - inSlope: 0 - outSlope: 0 - tangentMode: 136 - weightedMode: 0 - inWeight: 0 - outWeight: 0 - m_PreInfinity: 2 - m_PostInfinity: 2 - m_RotationOrder: 4 - attribute: material.REPLACEME_LETTER_PARAM - path: TaSTT - classID: 137 - script: {fileID: 0} - m_PPtrCurves: [] - m_SampleRate: 60 - m_WrapMode: 0 - m_Bounds: - m_Center: {x: 0, y: 0, z: 0} - m_Extent: {x: 0, y: 0, z: 0} - m_ClipBindingConstant: - genericBindings: - - serializedVersion: 2 - path: 2794480623 - attribute: 2284639795 - script: {fileID: 0} - typeID: 137 - customType: 22 - isPPtrCurve: 0 - pptrCurveMapping: [] - m_AnimationClipSettings: - serializedVersion: 2 - m_AdditiveReferencePoseClip: {fileID: 0} - m_AdditiveReferencePoseTime: 0 - m_StartTime: 0 - m_StopTime: 0 - m_OrientationOffsetY: 0 - m_Level: 0 - m_CycleOffset: 0 - m_HasAdditiveReferencePose: 0 - m_LoopTime: 1 - m_LoopBlend: 0 - m_LoopBlendOrientation: 0 - m_LoopBlendPositionY: 0 - m_LoopBlendPositionXZ: 0 - m_KeepOriginalOrientation: 0 - m_KeepOriginalPositionY: 1 - m_KeepOriginalPositionXZ: 0 - m_HeightFromFeet: 0 - m_Mirror: 0 - m_EditorCurves: - - curve: - serializedVersion: 2 - m_Curve: - - serializedVersion: 3 - time: 0 - value: REPLACEME_LETTER_VALUE - inSlope: 0 - outSlope: 0 - tangentMode: 136 - weightedMode: 0 - inWeight: 0 - outWeight: 0 - - serializedVersion: 3 - time: 0.016666668 - value: REPLACEME_LETTER_VALUE - inSlope: 0 - outSlope: 0 - tangentMode: 136 - weightedMode: 0 - inWeight: 0 - outWeight: 0 - m_PreInfinity: 2 - m_PostInfinity: 2 - m_RotationOrder: 4 - attribute: material.REPLACEME_LETTER_PARAM - path: TaSTT - classID: 23 - script: {fileID: 0} - - curve: - serializedVersion: 2 - m_Curve: - - serializedVersion: 3 - time: 0 - value: REPLACEME_LETTER_VALUE - inSlope: 0 - outSlope: 0 - tangentMode: 136 - weightedMode: 0 - inWeight: 0 - outWeight: 0 - - serializedVersion: 3 - time: 0.016666668 - value: REPLACEME_LETTER_VALUE - inSlope: 0 - outSlope: 0 - tangentMode: 136 - weightedMode: 0 - inWeight: 0 - outWeight: 0 - m_PreInfinity: 2 - m_PostInfinity: 2 - m_RotationOrder: 4 - attribute: material.REPLACEME_LETTER_PARAM - path: TaSTT - classID: 137 - script: {fileID: 0} - m_EulerEditorCurves: [] - m_HasGenericRootTransform: 0 - m_HasMotionFloatCurves: 0 - m_Events: [] -""" - -ANIMATOR_TEMPLATE = """ ---- !u!91 &9100000 -AnimatorController: - m_ObjectHideFlags: 0 - m_CorrespondingSourceObject: {fileID: 0} - m_PrefabInstance: {fileID: 0} - m_PrefabAsset: {fileID: 0} - m_Name: TaSTT_fx - serializedVersion: 5 - m_AnimatorParameters: [] - m_AnimatorLayers: [] -""" - -# For whatever reason, running unrelated animations s.a. -# facial expressions can have a slight effect on supposedly -# unrelated parameters, causing letter to flip. Add a -# little buffer to reduce the odds that this effect causes -# a letter to change after it has been written. -UNITY_ANIMATION_FUDGE_MARGIN = 0.1 - -def generateClearAnimation(anim_dir: str, guid_map: typing.Dict[str, str]): - print("Generating board clearing animation", file=sys.stderr) - - parser = libunity.UnityParser() - parser.parse(LETTER_ANIMATION_TEMPLATE) - - anim_node = parser.nodes[0] - anim_clip = anim_node.mapping['AnimationClip'] - curve_template = anim_clip.mapping['m_FloatCurves'].sequence[0] - anim_clip.mapping['m_FloatCurves'].sequence = [] - anim_clip.mapping['m_EditorCurves'].sequence = [] - - letter = 0 - - for byte in range(0, generate_utils.config.BYTES_PER_CHAR): - for row in range(0, generate_utils.config.BOARD_ROWS): - for col in range(0, generate_utils.config.BOARD_COLS): - curve = curve_template.copy() - for keyframe in curve.mapping['curve'].mapping['m_Curve'].sequence: - keyframe.mapping['value'] = str(letter + - UNITY_ANIMATION_FUDGE_MARGIN) - curve.mapping['attribute'] = "material.{}".format(generate_utils.getShaderParamByRowColByte(row, col, byte)) - curve.mapping['path'] = "World Constraint/Container/TaSTT" - # Add curve to animation - anim_clip.mapping['m_FloatCurves'].sequence.append(curve) - anim_clip.mapping['m_EditorCurves'].sequence.append(curve) - # Serialize animation to file - anim_name = generate_utils.getClearAnimationName() - anim_path = os.path.join(anim_dir, anim_name + ".anim") - print("Generating clear animation at {}".format(anim_path), file=sys.stderr) - with open(anim_path, "w", encoding="utf-8") as f: - f.write(libunity.unityYamlToString([anim_node])) - # Generate metadata - meta = libunity.Metadata() - with open(anim_path + ".meta", "w", encoding="utf-8") as f: - f.write(str(meta)) - # Add metadata to guid map - guid_map[anim_path] = meta.guid - guid_map[meta.guid] = anim_path - -# sound_chord: whether to play a, e, i, o, u -# value: 0 or 1 -def generateSoundAnimation(sound_chord: typing.Tuple[int,int,int,int,int], - value: int, - anim_name: str, - anim_dir: str, guid_map: typing.Dict[str, str], - anim_delay_frames = 2): - print(f"Generating sound animation {sound_chord} / {anim_name}", file=sys.stderr) - - parser = libunity.UnityParser() - parser.parse(SOUND_ANIMATION_TEMPLATE) - - anim_node = parser.nodes[0] - anim_clip = anim_node.mapping['AnimationClip'] - curve_template = anim_clip.mapping['m_FloatCurves'].sequence[0] - anim_clip.mapping['m_FloatCurves'].sequence = [] - anim_clip.mapping['m_EditorCurves'].sequence = [] - - # Animate all notes. - for note_i in range(len(sound_chord)): - curve = curve_template.copy() - - keyframe_template = curve.mapping['curve'].mapping['m_Curve'].sequence[0] - curve.mapping['curve'].mapping['m_Curve'].sequence = [] - - # First keyframe: zero all but first note - if note_i != 0: - keyframe = keyframe_template.copy() - keyframe.mapping['time'] = 0 - keyframe.mapping['value'] = 0 - curve.mapping['path'] = f"World Constraint/Container/TaSTT/Audio {note_i + 1}" - curve.mapping['curve'].mapping['m_Curve'].sequence.append(keyframe) - - # Subsequent keyframes: animate as normal - keyframe = keyframe_template.copy() - keyframe.mapping['time']= str(note_i * anim_delay_frames * 1.0 / 60.0) - keyframe.mapping['value'] = str(sound_chord[note_i]) - curve.mapping['path'] = f"World Constraint/Container/TaSTT/Audio {note_i + 1}" - curve.mapping['curve'].mapping['m_Curve'].sequence.append(keyframe) - - # Add curve to animation - anim_clip.mapping['m_FloatCurves'].sequence.append(curve) - anim_clip.mapping['m_EditorCurves'].sequence.append(curve) - - anim_clip.mapping['m_AnimationClipSettings'].mapping['m_StopTime'] = str((len(sound_chord)-1) * anim_delay_frames * 1.0 / 60.0) - - # Serialize animation to file - anim_path = os.path.join(anim_dir, anim_name + ".anim") - with open(anim_path, "w", encoding="utf-8") as f: - f.write(libunity.unityYamlToString([anim_node])) - # Generate metadata - meta = libunity.Metadata() - with open(anim_path + ".meta", "w", encoding="utf-8") as f: - f.write(str(meta)) - # Add metadata to guid map - guid_map[anim_path] = meta.guid - guid_map[meta.guid] = anim_path - -# Generate a toggle animation for a shader parameter. -def generateToggleAnimations(anim_dir, shader_param, guid_map): - print("Generating shader toggle animation", file=sys.stderr) - - parser = libunity.UnityParser() - parser.parse(LETTER_ANIMATION_TEMPLATE) - - # 0.0 represents false, 1.0 represents true. Don't forget that we add - # `UNITY_ANIMATION_FUDGE_MARGIN` to everything. - for shader_value in range(0, 2): - anim_node = parser.nodes[0] - anim_clip = anim_node.mapping['AnimationClip'] - curve_template = anim_clip.mapping['m_FloatCurves'].sequence[0] - anim_clip.mapping['m_FloatCurves'].sequence = [] - anim_clip.mapping['m_EditorCurves'].sequence = [] - - curve = curve_template.copy() - for keyframe in curve.mapping['curve'].mapping['m_Curve'].sequence: - keyframe.mapping['value'] = str(float(shader_value) + - UNITY_ANIMATION_FUDGE_MARGIN) - curve.mapping['attribute'] = "material.{}".format(shader_param) - curve.mapping['path'] = "World Constraint/Container/TaSTT" - # Add curve to animation - anim_clip.mapping['m_FloatCurves'].sequence.append(curve) - anim_clip.mapping['m_EditorCurves'].sequence.append(curve) - - # Serialize animation to file - anim_name = generate_utils.getClearAnimationName() - anim_suffix = "_Off" - if shader_value == 1: - anim_suffix = "_On" - anim_path = os.path.join(anim_dir, shader_param + anim_suffix + - ".anim") - with open(anim_path, "w", encoding="utf-8") as f: - f.write(libunity.unityYamlToString([anim_node])) - # Generate metadata - meta = libunity.Metadata() - with open(anim_path + ".meta", "w", encoding="utf-8") as f: - f.write(str(meta)) - # Add metadata to guid map - guid_map[anim_path] = meta.guid - guid_map[meta.guid] = anim_path - -# Generate a toggle animation for a shader parameter. -def generateScaleAnimation(anim_name: str, anim_dir: str, - path: str, - value: float, - guid_map: typing.Dict[str, str]) -> str: - print("Generating scale animation {}".format(path), - file=sys.stderr) - - parser = libunity.UnityParser() - parser.parse(SCALE_ANIMATION_TEMPLATE) - - #print("kill me", file=sys.stderr) - #print(libunity.unityYamlToString([parser.nodes[0]]), file=sys.stdout) - #print("NOW", file=sys.stdout) - - # 0.0 represents false, 1.0 represents true. Don't forget that we add - # `UNITY_ANIMATION_FUDGE_MARGIN` to everything. - anim_node = parser.nodes[0] - anim_clip = anim_node.mapping['AnimationClip'] - #print("here 3", file=sys.stderr) - for curve in anim_clip.mapping['m_ScaleCurves'].sequence: - for keyframe in curve.mapping['curve'].mapping['m_Curve'].sequence: - keyframe.mapping['value'].mapping['x'] = str(value) - keyframe.mapping['value'].mapping['y'] = str(value) - keyframe.mapping['value'].mapping['z'] = str(value) - #print("here 4", file=sys.stderr) - for curve in anim_clip.mapping['m_EditorCurves'].sequence: - for keyframe in curve.mapping['curve'].mapping['m_Curve'].sequence: - keyframe.mapping['value'] = value - - #print("here 5", file=sys.stderr) - - # Serialize animation to file - anim_path = os.path.join(anim_dir, anim_name + ".anim") - with open(anim_path, "w", encoding="utf-8") as f: - f.write(libunity.unityYamlToString([anim_node])) - # Generate metadata - meta = libunity.Metadata() - with open(anim_path + ".meta", "w", encoding="utf-8") as f: - f.write(str(meta)) - # Add metadata to guid map - guid_map[anim_path] = meta.guid - guid_map[meta.guid] = anim_path - - return meta.guid - -def generateAnimations(anim_dir: str, guid_map: typing.Dict[str, str]): - generateClearAnimation(anim_dir, guid_map) - - for chord_bits in range(2**5): - chord = [0, 0, 0, 0, 0] - for i in range(5): - if (chord_bits >> i) % 2 == 1: - chord[i] = 1 - print(f"Generating chord {chord}", file=sys.stderr) - anim_name = f"Sound_a{chord[0]}_e{chord[1]}_i{chord[2]}_o{chord[3]}_u{chord[4]}" - generateSoundAnimation(chord, 0, anim_name, anim_dir, guid_map) - - print("Generating letter animations", file=sys.stderr) - - parser = libunity.UnityParser() - parser.parse(LETTER_ANIMATION_TEMPLATE) - - anim_node = parser.nodes[0] - anim_clip = anim_node.mapping['AnimationClip'] - curve_template = anim_clip.mapping['m_FloatCurves'].sequence[0] - anim_clip.mapping['m_FloatCurves'].sequence = [] - anim_clip.mapping['m_EditorCurves'].sequence = [] - - # To support more languages, we use 2 bytes per character, giving us a 64K character set. - for byte in range(0, generate_utils.config.BYTES_PER_CHAR): - for row in range(0, generate_utils.config.BOARD_ROWS): - print("Generating letter animations (row {}/{}) (byte {}/2)".format(row, - generate_utils.config.BOARD_ROWS, byte), file=sys.stderr) - for col in range(0, generate_utils.config.BOARD_COLS): - for letter in range(0, 2): - if letter == 1: - letter = generate_utils.config.CHARS_PER_CELL - 1 - - # Make a deep copy of the templates - node = anim_node.copy() - curve = curve_template.copy() - clip = node.mapping['AnimationClip'] - # Populate animation name - anim_name = generate_utils.getLetterAnimationName(row, col, letter, byte) - clip.mapping['m_Name'] = anim_name - # Populate letter value - for keyframe in curve.mapping['curve'].mapping['m_Curve'].sequence: - keyframe.mapping['value'] = str(letter + UNITY_ANIMATION_FUDGE_MARGIN) - # Populate path to letter parameter - curve.mapping['attribute'] = "material.{}".format(generate_utils.getShaderParamByRowColByte(row, col, byte)) - curve.mapping['path'] = "World Constraint/Container/TaSTT" - # Add curve to animation - clip.mapping['m_FloatCurves'].sequence.append(curve) - clip.mapping['m_EditorCurves'].sequence.append(curve) - # Serialize animation to file - anim_path = os.path.join(anim_dir, anim_name + ".anim") - with open(anim_path, "w", encoding="utf-8") as f: - f.write(libunity.unityYamlToString([node])) - # Generate metadata - meta = libunity.Metadata() - with open(anim_path + ".meta", "w", encoding="utf-8") as f: - f.write(str(meta)) - # Add metadata to guid map - guid_map[anim_path] = meta.guid - guid_map[meta.guid] = anim_path - -def generateFXController(anim: libunity.UnityAnimator) -> typing.Dict[int, libunity.UnityDocument]: - parser = libunity.UnityParser() - parser.parse(ANIMATOR_TEMPLATE) - anim.addNodes(parser.nodes) - - anim.addParameter(generate_utils.getEnableParam(), bool) - anim.addParameter(generate_utils.getDummyParam(), bool) - anim.addParameter(generate_utils.getToggleParam(), bool) - anim.addParameter(generate_utils.getClearBoardParam(), bool) - anim.addParameter(generate_utils.getScaleParam(), float) - anim.addParameter(generate_utils.getEnablePhonemeParam(), bool) - - for i in range(5): - anim.addParameter(generate_utils.getSoundParam(i+1), bool) - - anim.addLayer("=== TaSTT ===", weight=0.0) - - layers = {} - for byte in range(0, generate_utils.config.BYTES_PER_CHAR): - layers[byte] = {} - for i in range(0, generate_utils.config.CHARS_PER_SYNC): - anim.addParameter(generate_utils.getBlendParam(i, byte), float) - - layer = anim.addLayer(generate_utils.getLayerName(i, byte)) - layers[byte][i] = layer - anim.addParameter(generate_utils.getSelectParam(), int) - - return layers - -def generateFXLayer(which_layer: int, anim: libunity.UnityAnimator, layer: - libunity.UnityDocument, gen_anim_dir: str, byte: int): - is_default_state = True - default_state = anim.addAnimatorState(layer, - generate_utils.getDefaultStateName(which_layer, byte), is_default_state) - - dy = 100 - active_state = anim.addAnimatorState(layer, - generate_utils.getActiveStateName(which_layer, byte), dy = dy) - - active_state_transition = anim.addTransition(active_state) - enable_param = generate_utils.getEnableParam() - anim.addTransitionBooleanCondition(default_state, active_state_transition, - enable_param, True) - - select_states = {} - for i in range(0, generate_utils.config.numRegions(which_layer)): - dx = i * 200 - dy = 200 - - # Create blend tree for this region. - anim_lo_path = os.path.join(gen_anim_dir, - generate_utils.getAnimationNameByLayerAndIndex( - which_layer, i, 0, byte) + \ - ".anim") - guid_lo = guid_map[anim_lo_path] - anim_hi_path = os.path.join(gen_anim_dir, - generate_utils.getAnimationNameByLayerAndIndex( - which_layer, i, generate_utils.config.CHARS_PER_CELL - 1, byte) + \ - ".anim") - guid_hi = guid_map[anim_hi_path] - - select_states[i] = anim.addAnimatorBlendTree(layer, - generate_utils.getBlendStateName(which_layer, i, byte), - generate_utils.getBlendParam(which_layer, byte), - guid_lo, guid_hi, dx = dx, dy = dy) - state = select_states[i] - - # Create transition to state. - select_state_transition = anim.addTransition(state) - select_param = generate_utils.getSelectParam() - anim.addTransitionIntegerEqualityCondition(active_state, - select_state_transition, select_param, i) - - # Create return-home transition. - home_state_transition = anim.addTransition(default_state) - home_state_transition.mapping['AnimatorStateTransition'].mapping['m_InterruptionSource'] = '0' - dummy_param = generate_utils.getDummyParam() - anim.addTransitionBooleanCondition(state, - home_state_transition, dummy_param, False) - - if generate_utils.config.layerNeedsParity(which_layer): - # There may be layers which never write to the text box. In this case, - # when those layers are turned on to write to that last region, they - # simply transition back to the default (idle) state. - home_state_transition = anim.addTransition(default_state) - select_param = generate_utils.getSelectParam() - i = generate_utils.config.numRegions(0) - 1 - anim.addTransitionIntegerEqualityCondition(active_state, - home_state_transition, select_param, i) - -# Generic toggle adding utility. -# Generates the layer and parameter. -# Returns a map containing the off and on states, as well as the -# transitions between them. -def generateToggle(layer_name: str, - parameter_name: str, - gen_anim_dir: str, - off_anim_basename: str, - on_anim_basename: str, - anim: libunity.UnityAnimator, - guid_map: typing.Dict[str, str], - duration_s: float = 0.0) -> typing.Dict[str, - libunity.UnityDocument]: - layer = anim.addLayer(layer_name) - - # For simplicity, use the layer name as the parameter name. - anim.addParameter(parameter_name, bool) - - off_state = anim.addAnimatorState(layer, layer_name + "_Off", - is_default_state = True) - on_state = anim.addAnimatorState(layer, layer_name + "_On", dy=100) - - if off_anim_basename: - off_anim_path = os.path.join(gen_anim_dir, off_anim_basename) - off_anim_meta = libunity.Metadata() - off_anim_meta.loadOrCreate(off_anim_path, guid_map) - anim.setAnimatorStateAnimation(off_state, off_anim_meta.guid) - - if on_anim_basename: - on_anim_path = os.path.join(gen_anim_dir, on_anim_basename) - on_anim_meta = libunity.Metadata() - on_anim_meta.loadOrCreate(on_anim_path, guid_map) - anim.setAnimatorStateAnimation(on_state, on_anim_meta.guid) - - off_to_on_trans = anim.addTransition(on_state, duration_s) - anim.addTransitionBooleanCondition(off_state, - off_to_on_trans, parameter_name, True) - - on_to_off_trans = anim.addTransition(off_state, duration_s) - anim.addTransitionBooleanCondition(on_state, - on_to_off_trans, parameter_name, False) - - result = {} - result["off"] = off_state - result["on"] = on_state - result["off_to_on"] = off_to_on_trans - result["on_to_off"] = on_to_off_trans - - return result - -def generateScaleLayer(anim: libunity.UnityAnimator, - gen_anim_dir: str, - guid_map: typing.Dict[str, str]): - - scale_layer = anim.addLayer(generate_utils.getScaleParam()) - - path = "World Constraint/Container/TaSTT" - attribute = "blendShape.Scale" - - guid_lo = generateScaleAnimation("TaSTT_Scale_0", gen_anim_dir, - path, - 5.0, guid_map) - guid_hi = generateScaleAnimation("TaSTT_Scale_100", gen_anim_dir, - path, - 100.0, guid_map) - - anim.addAnimatorBlendTree(scale_layer, - generate_utils.getScaleParam(), - generate_utils.getScaleParam(), - guid_lo, guid_hi, - lo_threshold = 0.0, hi_threshold = 1.0); - - pass - -def generateSoundLayer(anim: libunity.UnityAnimator, - gen_anim_dir: str, - guid_map: typing.Dict[str, str], - anim_len_s = 12.0/60.0): - - layer = anim.addLayer("TaSTT_Sound") - - idle_state = anim.addAnimatorState(layer, "Idle", is_default_state=True, dy=-100) - a_state = anim.addAnimatorState(layer, "a") - - trans = anim.addTransition(a_state) - param = generate_utils.getEnablePhonemeParam() - anim.addTransitionBooleanCondition(idle_state, trans, param, True) - - for a_bool in range(2): - dy = 100 - dx = a_bool * 800 - # Create `e` state. - ax_e_state = anim.addAnimatorState(layer, - f"a{a_bool}_e", - dy=dy, dx=dx) - # Create transition based on whether `a` is set. - trans = anim.addTransition(ax_e_state) - param = generate_utils.getSoundParam(1) - anim.addTransitionBooleanCondition(a_state, trans, param, a_bool) - - for e_bool in range(2): - dy = 200 - dx = a_bool * 800 + e_bool * 400 - - # Create `i` state. - ax_ex_i_state = anim.addAnimatorState(layer, - f"a{a_bool}_e{e_bool}_i", - dy=dy, dx=dx) - - # Create transition based on whether `e` is set. - trans = anim.addTransition(ax_ex_i_state) - param = generate_utils.getSoundParam(2) - anim.addTransitionBooleanCondition(ax_e_state, trans, param, e_bool) - - for i_bool in range(2): - dy = 300 - dx = a_bool * 800 + e_bool * 400 + i_bool * 200 - - # Create `o` state. - ax_ex_ix_o_state = anim.addAnimatorState(layer, - f"a{a_bool}_e{e_bool}_i{i_bool}_o", - dy=dy, dx=dx) - # Create transition based on whether `i` is set. - trans = anim.addTransition(ax_ex_ix_o_state) - param = generate_utils.getSoundParam(3) - anim.addTransitionBooleanCondition(ax_ex_i_state, trans, param, i_bool) - - for o_bool in range(2): - dy = 400 - dx = a_bool * 800 + e_bool * 400 + i_bool * 200 + o_bool * 100 - - # Create `u` state. - ax_ex_ix_ox_u_state = anim.addAnimatorState(layer, - f"a{a_bool}_e{e_bool}_i{i_bool}_o{o_bool}_u", - dy=dy, dx=dx) - # Create transition based on whether `o` is set. - trans = anim.addTransition(ax_ex_ix_ox_u_state) - param = generate_utils.getSoundParam(4) - anim.addTransitionBooleanCondition(ax_ex_ix_o_state, - trans, param, o_bool) - - for u_bool in range(2): - dy = 500 - dx = a_bool * 800 + e_bool * 400 + i_bool * 200 + o_bool * 100 + u_bool * 50 - if u_bool == 1: - dy = 550 - - # Create `u` state. - ax_ex_ix_ox_ux_state = anim.addAnimatorState(layer, - f"a{a_bool}_e{e_bool}_i{i_bool}_o{o_bool}_u{u_bool}", - dy=dy, dx=dx) - # Create transition based on whether `u` is set. - trans = anim.addTransition(ax_ex_ix_ox_ux_state) - param = generate_utils.getSoundParam(5) - anim.addTransitionBooleanCondition(ax_ex_ix_ox_u_state, - trans, param, u_bool) - - chord = [a_bool, e_bool, i_bool, o_bool, u_bool] - anim_name = f"Sound_a{chord[0]}_e{chord[1]}_i{chord[2]}_o{chord[3]}_u{chord[4]}" - anim_path = os.path.join(gen_anim_dir, anim_name + ".anim") - anim_guid = guid_map[anim_path] - anim.setAnimatorStateAnimation(ax_ex_ix_ox_ux_state, anim_guid) - - # Create return-home transitions. - trans = anim.addTransition(idle_state, dur_s = anim_len_s) - trans.mapping['AnimatorStateTransition'].mapping['m_InterruptionSource'] = '0' - param = generate_utils.getSoundParam(1) - anim.addTransitionBooleanCondition(ax_ex_ix_ox_ux_state, trans, param, 1 - a_bool) - - trans = anim.addTransition(idle_state, dur_s = anim_len_s) - trans.mapping['AnimatorStateTransition'].mapping['m_InterruptionSource'] = '0' - param = generate_utils.getSoundParam(2) - anim.addTransitionBooleanCondition(ax_ex_ix_ox_ux_state, trans, param, 1 - e_bool) - - trans = anim.addTransition(idle_state, dur_s = anim_len_s) - trans.mapping['AnimatorStateTransition'].mapping['m_InterruptionSource'] = '0' - param = generate_utils.getSoundParam(3) - anim.addTransitionBooleanCondition(ax_ex_ix_ox_ux_state, trans, param, 1 - i_bool) - - trans = anim.addTransition(idle_state, dur_s = anim_len_s) - trans.mapping['AnimatorStateTransition'].mapping['m_InterruptionSource'] = '0' - param = generate_utils.getSoundParam(4) - anim.addTransitionBooleanCondition(ax_ex_ix_ox_ux_state, trans, param, 1 - o_bool) - - trans = anim.addTransition(idle_state, dur_s = anim_len_s) - trans.mapping['AnimatorStateTransition'].mapping['m_InterruptionSource'] = '0' - param = generate_utils.getSoundParam(5) - anim.addTransitionBooleanCondition(ax_ex_ix_ox_ux_state, trans, param, 1 - u_bool) - -def generateFX(guid_map, gen_anim_dir): - anim = libunity.UnityAnimator() - - layers = generateFXController(anim) - - # TODO(yum) parallelize - for byte in range(0, generate_utils.config.BYTES_PER_CHAR): - for which_layer, layer in layers[byte].items(): - print("Generating layer {}/{}".format(which_layer, len(layers[byte].items())), file=sys.stderr) - generateFXLayer(which_layer, anim, layer, gen_anim_dir, byte) - - generateToggle(generate_utils.getToggleParam(), - generate_utils.getToggleParam(), - gen_anim_dir, - "TaSTT_Toggle_Off.anim", - "TaSTT_Toggle_On.anim", - anim, guid_map) - generateToggle(generate_utils.getLockWorldParam(), - generate_utils.getLockWorldParam(), - gen_anim_dir, - "TaSTT_Lock_World_Disable.anim", - "TaSTT_Lock_World_Enable.anim", - anim, guid_map) - generateToggle(generate_utils.getEllipsisParam(), - generate_utils.getEllipsisParam(), - gen_anim_dir, - "TaSTT_Ellipsis_Off.anim", - "TaSTT_Ellipsis_On.anim", - anim, guid_map) - generateToggle( - generate_utils.getClearBoardParam(), - generate_utils.getClearBoardParam(), - gen_anim_dir, - None, # No animation in the `off` state. - generate_utils.getClearAnimationName() + ".anim", - anim, guid_map) - generateToggle("TaSTT_Expand", - generate_utils.getToggleParam(), - gen_anim_dir, - "TaSTT_Emerge_000.anim", - "TaSTT_Emerge_100.anim", - anim, guid_map, 0.5) - - generateScaleLayer(anim, gen_anim_dir, guid_map) - generateSoundLayer(anim, gen_anim_dir, guid_map) - - return anim - -def parseArgs(): - print("args: {}".format(" ".join(sys.argv))) - - parser = argparse.ArgumentParser() - parser.add_argument("cmd", type=str, help="") - parser.add_argument("--config", type=str, help="The app config.") - parser.add_argument("--gen_dir", type=str, help="The directory under " + - "which all generated assets are placed") - parser.add_argument("--gen_anim_dir", type=str, help="The directory under " + - "which all generated animations are placed.") - parser.add_argument("--guid_map", type=str, help="The path to a file which will store guids") - parser.add_argument("--fx_dest", type=str, help="The path at which to save the generated FX controller") - args = parser.parse_args() - - if not args.gen_dir: - args.gen_dir = "generated/" - - if not args.config: - print("--config required") - sys.exit(1) - - if not args.gen_anim_dir: - args.gen_anim_dir = args.gen_dir + "animations/" - - if not args.guid_map: - args.guid_map = "guid.map" - - if not args.fx_dest: - args.fx_dest = args.gen_dir + "TaSTT_fx.controller" - - return args - -if __name__ == "__main__": - args = parseArgs() - cfg = app_config.getConfig(args.config) - - print(f"chdir to {os.path.dirname(os.path.abspath(__file__))}") - os.chdir(os.path.dirname(os.path.abspath(__file__))) - - if args.cmd == "gen_anims": - generate_utils.config.BYTES_PER_CHAR = int(cfg["bytes_per_char"]) - generate_utils.config.CHARS_PER_SYNC = int(cfg["chars_per_sync"]) - generate_utils.config.BOARD_ROWS = int(cfg["rows"]) - generate_utils.config.BOARD_COLS = int(cfg["cols"]) - - guid_map = {} - with open(args.guid_map, 'rb') as f: - guid_map = pickle.load(f) - - os.makedirs(args.gen_anim_dir, exist_ok=True) - generateAnimations(args.gen_anim_dir, guid_map) - - with open(args.guid_map, 'wb') as f: - pickle.dump(guid_map, f) - elif args.cmd == "gen_fx": - generate_utils.config.BYTES_PER_CHAR = int(cfg["bytes_per_char"]) - generate_utils.config.CHARS_PER_SYNC = int(cfg["chars_per_sync"]) - generate_utils.config.BOARD_ROWS = int(cfg["rows"]) - generate_utils.config.BOARD_COLS = int(cfg["cols"]) - - guid_map = {} - with open(args.guid_map, 'rb') as f: - guid_map = pickle.load(f) - os.makedirs(os.path.dirname(args.fx_dest), exist_ok=True) - with open(args.fx_dest, "w", encoding="utf-8") as f: - f.write(str(generateFX(guid_map, args.gen_anim_dir))) - with open(args.guid_map, 'wb') as f: - pickle.dump(guid_map, f) - - # If we don't do this, then VRChat will fail to update the animator - # when users update their avatars. - if os.path.exists(args.fx_dest + ".meta"): - os.remove(args.fx_dest + ".meta") - diff --git a/Scripts/libunity.py b/Scripts/libunity.py deleted file mode 100644 index 77eeb95..0000000 --- a/Scripts/libunity.py +++ /dev/null @@ -1,1432 +0,0 @@ -#!/usr/bin/env python3 - -from functools import partial - -import argparse -import copy -import enum -import math -import os -import pickle -import random -import sys -import typing -# python3 -m pip install pyyaml -# License: MIT. -import yaml - -import multiprocessing as mp - -WRITE_DEFAULTS_ANIM_TEMPLATE = """ -%YAML 1.1 -%TAG !u! tag:unity3d.com,2011: ---- !u!74 &7400000 -AnimationClip: - m_ObjectHideFlags: 0 - m_CorrespondingSourceObject: {fileID: 0} - m_PrefabInstance: {fileID: 0} - m_PrefabAsset: {fileID: 0} - m_Name: TaSTT_Reset_Animations - serializedVersion: 6 - m_Legacy: 0 - m_Compressed: 0 - m_UseHighQualityCurve: 1 - m_RotationCurves: [] - m_CompressedRotationCurves: [] - m_EulerCurves: [] - m_PositionCurves: [] - m_ScaleCurves: [] - m_FloatCurves: - - curve: - serializedVersion: 2 - m_Curve: - - serializedVersion: 3 - time: 0 - value: 0 - inSlope: 0 - outSlope: 0 - tangentMode: 136 - weightedMode: 0 - inWeight: 0 - outWeight: 0 - m_PreInfinity: 2 - m_PostInfinity: 2 - m_RotationOrder: 4 - attribute: REPLACEME_ATTRIBUTE - path: REPLACEME_PATH - classID: 137 - script: {fileID: 0} - m_PPtrCurves: [] - m_SampleRate: 60 - m_WrapMode: 0 - m_Bounds: - m_Center: {x: 0, y: 0, z: 0} - m_Extent: {x: 0, y: 0, z: 0} - m_ClipBindingConstant: - genericBindings: - - serializedVersion: 2 - path: 2794480623 - attribute: 2284639795 - script: {fileID: 0} - typeID: 137 - customType: 22 - isPPtrCurve: 0 - pptrCurveMapping: [] - m_AnimationClipSettings: - serializedVersion: 2 - m_AdditiveReferencePoseClip: {fileID: 0} - m_AdditiveReferencePoseTime: 0 - m_StartTime: 0 - m_StopTime: 0 - m_OrientationOffsetY: 0 - m_Level: 0 - m_CycleOffset: 0 - m_HasAdditiveReferencePose: 0 - m_LoopTime: 1 - m_LoopBlend: 0 - m_LoopBlendOrientation: 0 - m_LoopBlendPositionY: 0 - m_LoopBlendPositionXZ: 0 - m_KeepOriginalOrientation: 0 - m_KeepOriginalPositionY: 1 - m_KeepOriginalPositionXZ: 0 - m_HeightFromFeet: 0 - m_Mirror: 0 - m_EditorCurves: - - curve: - serializedVersion: 2 - m_Curve: - - serializedVersion: 3 - time: 0 - value: 0 - inSlope: 0 - outSlope: 0 - tangentMode: 136 - weightedMode: 0 - inWeight: 0 - outWeight: 0 - m_PreInfinity: 2 - m_PostInfinity: 2 - m_RotationOrder: 4 - attribute: REPLACEME_ATTRIBUTE - path: REPLACEME_PATH - classID: 137 - script: {fileID: 0} - m_EulerEditorCurves: [] - m_HasGenericRootTransform: 0 - m_HasMotionFloatCurves: 0 - m_Events: [] -"""[1:][:-1] - -METADATA_TEMPLATE = """ -fileFormatVersion: 2 -guid: REPLACEME_GUID -NativeFormatImporter: - externalObjects: {} - mainObjectFileID: 7400000 - userData: - assetBundleName: - assetBundleVariant: -"""[1:][:-1] - -ANIMATION_STATE_TEMPLATE = """ ---- !u!1102 &110200000 -AnimatorState: - serializedVersion: 6 - m_ObjectHideFlags: 1 - m_CorrespondingSourceObject: {fileID: 0} - m_PrefabInstance: {fileID: 0} - m_PrefabAsset: {fileID: 0} - m_Name: REPLACEME_ANIMATION_NAME - m_Speed: 1 - m_CycleOffset: 0 - m_Transitions: [] - m_StateMachineBehaviours: [] - m_Position: {x: 50, y: 50, z: 0} - m_IKOnFeet: 0 - m_WriteDefaultValues: 0 - m_Mirror: 0 - m_SpeedParameterActive: 0 - m_MirrorParameterActive: 0 - m_CycleOffsetParameterActive: 0 - m_TimeParameterActive: 0 - m_Motion: {} - m_Tag: - m_SpeedParameter: - m_MirrorParameter: - m_CycleOffsetParameter: - m_TimeParameter: -"""[1:][:-1] - -TRANSITION_TEMPLATE = """ ---- !u!1101 &110100000 -AnimatorStateTransition: - m_ObjectHideFlags: 1 - m_CorrespondingSourceObject: {fileID: 0} - m_PrefabInstance: {fileID: 0} - m_PrefabAsset: {fileID: 0} - m_Name: - m_Conditions: [] - m_DstStateMachine: {fileID: 0} - m_DstState: {fileID: 0} - m_Solo: 0 - m_Mute: 0 - m_IsExit: 0 - serializedVersion: 3 - m_TransitionDuration: 0 - m_TransitionOffset: 0 - m_ExitTime: 0.0 - m_HasExitTime: 0 - m_HasFixedDuration: 1 - m_InterruptionSource: 2 - m_OrderedInterruption: 1 - m_CanTransitionToSelf: 1 -"""[1:][:-1] - -BLEND_TREE_TEMPLATE = """ ---- !u!206 &1071664566462684110 -BlendTree: - m_ObjectHideFlags: 1 - m_CorrespondingSourceObject: {fileID: 0} - m_PrefabInstance: {fileID: 0} - m_PrefabAsset: {fileID: 0} - m_Name: REPLACEME_BLEND_TREE_NAME - m_Childs: - - serializedVersion: 2 - m_Motion: {fileID: 7400000, guid: REPLACEME_GUID_LO, type: 2} - m_Threshold: -1 - m_Position: {x: 0, y: 0} - m_TimeScale: 1 - m_CycleOffset: 0 - m_DirectBlendParameter: REPLACEME_BLEND_PARAMETER - m_Mirror: 0 - - serializedVersion: 2 - m_Motion: {fileID: 7400000, guid: REPLACEME_GUID_HI, type: 2} - m_Threshold: 1 - m_Position: {x: 0, y: 0} - m_TimeScale: 1 - m_CycleOffset: 0 - m_DirectBlendParameter: REPLACEME_BLEND_PARAMETER - m_Mirror: 0 - m_BlendParameter: REPLACEME_BLEND_PARAMETER - m_BlendParameterY: REPLACEME_BLEND_PARAMETER - m_MinThreshold: -1 - m_MaxThreshold: 1 - m_UseAutomaticThresholds: 0 - m_NormalizedBlendValues: 0 - m_BlendType: 0 -"""[1:][:-1] - -class Metadata: - def __init__(self): - self.guid = "%032x" % random.randrange(16 ** 32) - - def load(self, path): - if not path.endswith(".meta"): - path = path + ".meta" - - self.guid = None - with open(path, "r", encoding="utf-8") as f: - for line in f: - if line.startswith("guid"): - self.guid = line.split()[1] - - def loadOrCreate(self, path, guid_map): - if not path.endswith(".meta"): - path = path + ".meta" - - if os.path.exists(path): - self.load(path) - return - - self.persist(path, guid_map) - - def persist(self, path, guid_map): - with open(path, "w", encoding="utf-8") as f: - f.write(str(self)) - - guid_map[self.guid] = path - guid_map[path] = self.guid - - def __str__(self): - return METADATA_TEMPLATE.replace("REPLACEME_GUID", self.guid) - -class Node: - def __init__(self): - # Optional. In Unity, this is the fileID of an object. Not all YAML - # mappings have an anchor. - self.anchor = None - - # Pointer to the Node containing this one. - self.parent = None - -class Sequence(Node): - def __init__(self): - super().__init__() - self.sequence = [] - - def copy(self): - new = Sequence() - new.anchor = self.anchor - new.parent = self.parent - - for v in self.sequence: - if hasattr(v, "copy"): - new.sequence.append(v.copy()) - new.sequence[-1].parent = new - else: - new.sequence.append(v) - - return new - - def prettyPrint(self, first_indent=None, leading_newline=None): - depth = 0 - p = self.parent - while p != None: - depth += 1 - p = p.parent - indent = " " * depth - - lines = [] - first = True - for item in self.sequence: - cur_indent = indent - if first: - if first_indent != None: - cur_indent = first_indent - first = False - if hasattr(item, "prettyPrint"): - lines.append("{}- {}".format(cur_indent, item.prettyPrint(first_indent="", leading_newline=False))) - else: - lines.append("{}- {}".format(cur_indent, item)) - - if len(lines) == 0: - return "[]" - - return "\n" + '\n'.join(lines) - - def __str__(self): - return self.prettyPrint() - - def addChildMapping(self, anchor = None, add_to_head = False): - child = Mapping() - child.anchor = anchor - child.parent = self - child.sequence = [] - - if add_to_head: - self.sequence = [child] + self.sequence - else: - self.sequence.append(child) - - return child - - def addChildSequence(self, anchor = None): - child = Sequence() - child.anchor = anchor - child.parent = self - child.sequence = [] - - self.sequence.append(child) - - return child - - def forEach(self, cb): - for k in self.sequence: - cb(k) - -class Mapping(Node): - def __init__(self): - super().__init__() - self.mapping = {} - - def copy(self): - new = Mapping() - new.anchor = self.anchor - new.parent = self.parent - - for k, v in self.mapping.items(): - if hasattr(v, "copy"): - new.mapping[k] = v.copy() - new.mapping[k].parent = new - else: - new.mapping[k] = v - - return new - - def prettyPrint(self, first_indent=None, leading_newline=True): - depth = 0 - p = self.parent - while p != None: - depth += 1 - p = p.parent - indent = " " * depth - - lines = [] - first = True - for k, v in self.mapping.items(): - cur_indent = indent - if first: - if first_indent != None: - cur_indent = first_indent - first = False - lines.append("{}{}: {}".format(cur_indent, k, v)) - - result = '\n'.join(lines) - - # Inline 1-item mappings, matching Unity behavior. - if len(self.mapping.keys()) == 1 and len(result.split("\n")) == 1: - if first_indent == None: - return self.prettyPrint(first_indent="") - return "{" + lines[0] + "}" - - # Empty mappings are represented by '{}'. If we don't do this, Unity - # will assume that they are Sequences and get very sad. - if len(self.mapping.keys()) == 0: - return "{}" - - if leading_newline: - result = "\n" + result - - return result - - def __str__(self): - return self.prettyPrint() - - def addChildMapping(self, key, anchor = None): - child = Mapping() - child.anchor = anchor - child.parent = self - child.mapping = {} - - self.mapping[key] = child - - return child - - def addChildSequence(self, key, anchor = None): - child = Sequence() - child.anchor = anchor - child.parent = self - child.mapping = {} - - self.mapping[key] = child - - return child - - def forEach(self, cb): - for k, v in self.mapping.items(): - cb(v) - -class UnityDocument(Mapping): - def __init__(self): - super().__init__() - self.class_id = None - - def __str__(self): - return super().__str__() - - def copy(self): - result = super().copy() - result.class_id = self.class_id - return result - -# Class representing a Unity AnimatorController. Implements manipulations, like -# merging and reanchoring. -class UnityAnimator(): - def __init__(self): - self.nodes = [] - self.id_to_node = {} - self.next_id = 1000 * 1000 - - def __str__(self): - return unityYamlToString(self.nodes) - - def addNodes(self, nodes): - for node in nodes: - self.nodes.append(node) - anchor = node.anchor - if anchor == None: - anchor = self.allocateId() - if anchor in self.id_to_node: - raise Exception("Duplicate anchor: {}, node 1: {}, node 2: {}".format(anchor, str(node), str(self.id_to_node[anchor]))) - self.id_to_node[anchor] = node - - if int(anchor) > self.next_id: - self.next_id = int(anchor) + 1 - # I don't know why but this fixes a bug in the `fixWriteDefaults` - # codepath: two documents wind up with the same anchor. - self.next_id += 1 - - def allocateId(self) -> int: - result = self.next_id - self.next_id += 1 - return result - - # Checks if `old_id` is in `self.id_mapping`, and if so, returns the - # already-generated ID. Otherwise this allocates a new ID and - # records it in `self.id_mapping`. - def mapId(self, old_id: str) -> int: - new_id = None - if old_id in self.id_mapping.keys(): - new_id = self.id_mapping[old_id] - else: - new_id = self.allocateId() - self.id_mapping[old_id] = new_id - return new_id - - # Recursively iterate every mapping under `node` and assign new IDs to - # every identifier. Mappings are recorded in `self.id_mapping`. - def mergeIterator(self, node): - if hasattr(node, "mapping"): - # Don't relabel anything that's defined in an external file. - # TODO(yum) do this. - if 'fileID' in node.mapping and not 'guid' in node.mapping: - if node.mapping['fileID'] != '0': - old_id = node.mapping['fileID'] - new_id = self.mapId(old_id) - node.mapping['fileID'] = str(new_id) - if hasattr(node, "forEach"): - node.forEach(self.mergeIterator) - - # Delete any key-value pairs where the value == the value. - def scrubReferencesByValue(self, node, values: typing.Set[str]): - if hasattr(node, "mapping"): - node.mapping = {k: v for k, v in node.mapping.items() if v not in values} - if hasattr(node, "forEach"): - node.forEach(partial(self.scrubReferencesByValue, values=values)) - - def peekNodeOfClass(self, classId): - for node in self.nodes: - if node.class_id == classId: - return node - return None - - def popNodeOfClass(self, classId): - result = None - for node in self.nodes: - if node.class_id == classId: - result = node - self.nodes.remove(result) - break - if result: - del self.id_to_node[result.anchor] - return result - - def pushNode(self, node): - self.nodes.append(node) - self.id_to_node[node.anchor] = node - - # Merges two animator controllers and returns the result. Any identifiers - # in the animators are reassigned in a new namespace. The mappings from old - # identifiers to new identifiers are recorded in `self.id_mapping0` and - # `self.id_mapping1`. - def mergeAnimatorControllers(self, ctrl0, ctrl1): - ctrl0 = copy.deepcopy(ctrl0) - ctrl1 = copy.deepcopy(ctrl1) - - self.id_mapping0 = {} - self.id_mapping1 = {} - - p0 = ctrl0.mapping['AnimatorController'].mapping['m_AnimatorParameters'] - p1 = ctrl1.mapping['AnimatorController'].mapping['m_AnimatorParameters'] - - a0 = ctrl0.mapping['AnimatorController'].mapping['m_AnimatorLayers'] - a1 = ctrl1.mapping['AnimatorController'].mapping['m_AnimatorLayers'] - - self.id_mapping = self.id_mapping0 - p0.forEach(self.mergeIterator) - a0.forEach(self.mergeIterator) - - # Hack to prevent ctrl1 from getting a new ID for the animator. - # TODO(yum) delete this? - #del self.class_to_next_id['91'] - - self.id_mapping = self.id_mapping1 - p1.forEach(self.mergeIterator) - a1.forEach(self.mergeIterator) - - p0.sequence += p1.sequence - a0.sequence += a1.sequence - - for elm in p0.sequence: - elm.mapping['m_Controller'].mapping['fileID'] = ctrl0.anchor - for elm in a0.sequence: - elm.mapping['m_Controller'].mapping['fileID'] = ctrl0.anchor - - return ctrl0 - - def merge(self, other): - ctrl0 = self.popNodeOfClass('91') - ctrl1 = other.popNodeOfClass('91') - # Merge animators and populate `self.id_mapping0` and - # `self.id_mapping1. - merged_anim = self.mergeAnimatorControllers(ctrl0, ctrl1) - - # Mapping from class ID (string) to new class ID (int) - self.id_mapping = self.id_mapping0 - for node in self.nodes: - new_id = self.mapId(node.anchor) - node.anchor = str(new_id) - node.forEach(self.mergeIterator) - - self.id_mapping = self.id_mapping1 - for node in other.nodes: - new_id = self.mapId(node.anchor) - node.anchor = str(new_id) - node.forEach(self.mergeIterator) - - nodes = self.nodes - self.nodes = [] - self.id_to_node = {} - self.pushNode(merged_anim) - self.addNodes(nodes) - self.addNodes(other.nodes) - - # TODO(yum) support overwriting duplicates - def addParameter(self, param_name, param_type): - unity_type = None - if param_type == float: - unity_type = '1' - elif param_type == int: - unity_type = '3' - elif param_type == bool: - unity_type = '4' - - anim = self.peekNodeOfClass('91') - params = anim.mapping['AnimatorController'].mapping['m_AnimatorParameters'] - - for p in params.sequence: - if p.mapping['m_Name'] == param_name: - return - - param = params.addChildMapping() - param.mapping['m_Name'] = param_name - param.mapping['m_Type'] = unity_type - param.mapping['m_DefaultFloat'] = '0' - param.mapping['m_DefaultInt'] = '0' - param.mapping['m_DefaultBool'] = '0' - ctrl = param.addChildMapping('m_Controller') - ctrl.mapping['fileID'] = anim.anchor - - def addLayer(self, layer_name, add_to_head = False, weight: float = 1.0) -> UnityDocument: - # Add layer to controller - anim = self.peekNodeOfClass('91') - layers = anim.mapping['AnimatorController'].mapping['m_AnimatorLayers'] - layer = layers.addChildMapping(add_to_head = add_to_head) - layer.mapping['serializedVersion'] = '5' - layer.mapping['m_Name'] = layer_name - new_id = self.allocateId() - layer.addChildMapping('m_StateMachine').mapping['fileID'] = str(new_id) - layer.addChildMapping('m_Mask').mapping['fileID'] = '0' - layer.addChildSequence('m_Motions') - layer.addChildSequence('m_Behaviours') - layer.mapping['m_BlendingMode'] = '0' - layer.mapping['m_SyncedLayerIndex'] = '-1' - layer.mapping['m_DefaultWeight'] = str(weight) - layer.mapping['m_IKPass'] = '0' - layer.mapping['m_SyncedLayerAffectsTiming'] = '0' - layer.addChildMapping('m_Controller').mapping['fileID'] = anim.anchor - - # Create layer object - layer = UnityDocument() - layer.class_id = "1107" - layer.anchor = str(new_id) - mach = layer.addChildMapping('AnimatorStateMachine') - - mach.mapping['serializedVersion'] = '6' - - mach.mapping['m_ObjectHideFlags'] = '1' - mach.addChildMapping('m_CorrespondingSourceObject').mapping['fileID'] = '0' - mach.addChildMapping('m_PrefabInstance').mapping['fileID'] = '0' - mach.addChildMapping('m_PrefabAsset').mapping['fileID'] = '0' - mach.mapping['m_Name'] = layer_name - mach.addChildSequence('m_ChildStates') - mach.addChildSequence('m_ChildStateMachines') - mach.addChildSequence('m_AnyStateTransitions') - mach.addChildSequence('m_EntryTransitions') - mach.addChildMapping('m_StateMachineTransitions') - mach.addChildSequence('m_StateMachineBehaviours') - pos = mach.addChildMapping('m_AnyStatePosition') - pos.mapping['x'] = '50' - pos.mapping['y'] = '20' - pos.mapping['z'] = '0' - pos = mach.addChildMapping('m_EntryPosition') - pos.mapping['x'] = '50' - pos.mapping['y'] = '120' - pos.mapping['z'] = '0' - pos = mach.addChildMapping('m_ExitPosition') - pos.mapping['x'] = '800' - pos.mapping['y'] = '120' - pos.mapping['z'] = '0' - pos = mach.addChildMapping('m_ParentStateMachinePosition') - pos.mapping['x'] = '800' - pos.mapping['y'] = '20' - pos.mapping['z'] = '0' - mach.addChildMapping('m_DefaultState') - - self.nodes.append(layer) - return layer - - def addAnimatorState(self, layer, state_name, is_default_state = False, - dx = 0, dy = 0) -> UnityDocument: - # Create animation state - parser = UnityParser() - parser.parse(ANIMATION_STATE_TEMPLATE) - new_anim = UnityAnimator() - new_anim.addNodes(parser.nodes) - node = new_anim.nodes[0] - - new_id = self.allocateId() - node.class_id = "1102" - node.anchor = str(new_id) - state = node.mapping['AnimatorState'] - state.mapping['m_Name'] = state_name - #state.mapping['m_Motion'].mapping['guid'] = anim_guid - self.nodes.append(node) - - # Add state to layer - child_state = layer.mapping['AnimatorStateMachine'].mapping['m_ChildStates'].addChildMapping() - child_state.mapping['serializedVersion'] = '1' - child_state.addChildMapping('m_State').mapping['fileID'] = str(new_id) - state_pos = child_state.addChildMapping('m_Position') - state_pos.mapping['x'] = str(280 + dx) - state_pos.mapping['y'] = str(80 + dy) - state_pos.mapping['z'] = '0' - - if is_default_state: - layer.mapping['AnimatorStateMachine'].mapping['m_DefaultState'].mapping['fileID'] = str(new_id) - - return node - - def setAnimatorStateAnimation(self, anim_state, anim_guid): - anim_state.mapping['AnimatorState'].mapping['m_Motion'].mapping['guid'] = anim_guid - anim_state.mapping['AnimatorState'].mapping['m_Motion'].mapping['fileID'] = '7400000' - anim_state.mapping['AnimatorState'].mapping['m_Motion'].mapping['type'] = '2' - - # Adds a blend tree which uses the parameter named `param_name` to blend - # between anim_lo and anim_hi. Also creates the corresponding animation - # state. - def addAnimatorBlendTree(self, layer, state_name, param_name, - anim_guid_lo, anim_guid_hi, dx = 0, dy = 0, - lo_threshold = -1.0, hi_threshold = 1.0, - is_default_state = False) -> UnityDocument: - # Create the blend tree. - parser = UnityParser() - parser.parse(BLEND_TREE_TEMPLATE) - new_anim = UnityAnimator() - new_anim.addNodes(parser.nodes) - node = new_anim.nodes[0] - - new_id = self.allocateId() - node.class_id = "206" - node.anchor = str(new_id) - tree = node.mapping['BlendTree'] - tree.mapping['m_Name'] = state_name - # Low animation - tree.mapping['m_Childs'].sequence[0].mapping['m_Motion'].mapping['guid'] = anim_guid_lo - tree.mapping['m_Childs'].sequence[0].mapping['m_DirectBlendParameter'] = param_name - tree.mapping['m_Childs'].sequence[0].mapping['m_Threshold'] = str(lo_threshold) - # High animation - tree.mapping['m_Childs'].sequence[1].mapping['m_Motion'].mapping['guid'] = anim_guid_hi - tree.mapping['m_Childs'].sequence[1].mapping['m_DirectBlendParameter'] = param_name - tree.mapping['m_Childs'].sequence[1].mapping['m_Threshold'] = str(hi_threshold) - - tree.mapping['m_BlendParameter'] = param_name - tree.mapping['m_BlendParameterY'] = param_name - - self.nodes.append(node) - - # Create the corresponding animation state. - anim_state = self.addAnimatorState(layer, state_name, is_default_state, dx = dx, dy = - dy) - anim_state.mapping['AnimatorState'].mapping['m_Motion'].mapping['fileID'] = node.anchor - - return anim_state - - def addTransition(self, dst_state, dur_s = 0.0): - # Create animation state - parser = UnityParser() - parser.parse(TRANSITION_TEMPLATE) - new_transition = UnityAnimator() - new_transition.addNodes(parser.nodes) - node = new_transition.nodes[0] - - new_id = self.allocateId() - node.class_id = "1101" - node.anchor = str(new_id) - state = node.mapping['AnimatorStateTransition'] - state.mapping['m_DstState'].mapping['fileID'] = copy.copy(dst_state.anchor) - state.mapping['m_TransitionDuration'] = dur_s - self.nodes.append(node) - - return node - - def fixWriteDefaults(self, guid_map, generated_anim_path): - # TODO(yum) we should have an Animation class which encapsulates all - # this stuff. - parser = UnityParser() - parser.parse(WRITE_DEFAULTS_ANIM_TEMPLATE) - new_anim = UnityAnimator() - new_anim.addNodes(parser.nodes) - - new_clip = new_anim.peekNodeOfClass('74').mapping['AnimationClip'] - curve_template = new_clip.mapping['m_FloatCurves'].sequence[0] - new_clip.mapping['m_FloatCurves'].sequence = [] - new_clip.mapping['m_EditorCurves'].sequence = [] - - # Keep track of the (attribute, path) tuples we've already set to avoid - # animating the same thing twice. - attributes_set = set() - - animator_state_id = '1102' - for node in self.nodes: - if node.class_id != animator_state_id: - continue - - # Looking at an animator state. - if node.mapping['AnimatorState'].mapping['m_WriteDefaultValues'] != '1': - continue - - # Disable write defaults. - node.mapping['AnimatorState'].mapping['m_WriteDefaultValues'] = '0' - - # Looking at an animator state with write defaults. - motion = node.mapping['AnimatorState'].mapping['m_Motion'] - # Some animations have write defaults but don't trigger an - # animation. No idea what that's about. For now, just ignore. - if not 'guid' in motion.mapping: - continue - guid = motion.mapping['guid'] - - # Again, not really sure what's going on here, just ignore and - # revisit if we hit problems. - if not guid in guid_map.keys(): - continue - - # OK, we found an animation with write defaults, and we know where - # the animation lives. Crack it open and see what it's writing. - animation_path = guid_map[guid] - print("Animation has write defaults: {}".format(animation_path), file=sys.stderr) - parser = UnityParser() - parser.parseFile(animation_path) - anim = UnityAnimator() - anim.addNodes(parser.nodes) - - clip = anim.peekNodeOfClass('74') - - for curve in clip.mapping['AnimationClip'].mapping['m_FloatCurves'].sequence: - attr = curve.mapping['attribute'] - path = curve.mapping['path'] - if (attr, path) in attributes_set: - continue - #print("Fix attr/path {}/{}".format(attr, path), file=sys.stderr) - attributes_set.add((attr, path)) - - new_curve = curve_template.copy() - new_curve.mapping['attribute'] = attr - new_curve.mapping['path'] = path - - new_clip.mapping['m_FloatCurves'].sequence.append(new_curve) - new_clip.mapping['m_EditorCurves'].sequence.append(new_curve) - - #print("len float curves: {}".format(len(new_clip.mapping['m_FloatCurves'].sequence)), file=sys.stderr) - - def generateOffAnimationForGuid(self, guid_map, generated_anim_dir, guid): - # Looking at an animation. - if not guid in guid_map.keys(): - return - - animation_path = guid_map[guid] - print("Checking animation at {}".format(animation_path), file=sys.stderr) - parser = UnityParser() - parser.parseFile(animation_path) - anim = UnityAnimator() - anim.addNodes(parser.nodes) - - clip = anim.peekNodeOfClass('74') - - has_nonzero = False - curve_members = ["m_FloatCurves", "m_EditorCurves"] - for memb in curve_members: - for curve in clip.mapping['AnimationClip'].mapping[memb].sequence: - attr = curve.mapping['attribute'] - path = curve.mapping['path'] - - for m_curve in curve.mapping['curve'].mapping['m_Curve'].sequence: - if m_curve.mapping['value'] != '0': - has_nonzero = True - m_curve.mapping['value'] = '0' - - if not has_nonzero: - print("Animation does not set anything nonzero") - return - - print("Animation sets things nonzero, fixing") - - new_anim_path = "OFF_{}".format(os.path.basename(animation_path)) - new_anim_path = "{}/{}".format(generated_anim_dir, new_anim_path) - - with open(new_anim_path, "w", encoding="utf-8") as f: - f.write(str(anim)) - - meta = Metadata() - with open(new_anim_path + ".meta", "w", encoding="utf-8") as f: - f.write(str(meta)) - - def generateOffAnimationsAnimStates(self, guid_map, generated_anim_dir): - animator_state_id = '1102' - for node in self.nodes: - if node.class_id != animator_state_id: - continue - - # Looking at an animation state. - motion = node.mapping['AnimatorState'].mapping['m_Motion'] - if not 'guid' in motion.mapping: - continue - guid = motion.mapping['guid'] - self.generateOffAnimationForGuid(guid_map, generated_anim_dir, guid) - - - def generateOffAnimationsBlendTrees(self, guid_map, generated_anim_dir): - animator_state_id = '206' - for node in self.nodes: - if node.class_id != animator_state_id: - continue - - # Looking at an animation state. - for child in node.mapping['BlendTree'].mapping['m_Childs'].sequence: - motion = child.mapping['m_Motion'] - - if not 'guid' in motion.mapping: - continue - guid = motion.mapping['guid'] - self.generateOffAnimationForGuid(guid_map, generated_anim_dir, guid) - - def generateOffAnimations(self, guid_map, generated_anim_dir): - self.generateOffAnimationsAnimStates(guid_map, generated_anim_dir) - self.generateOffAnimationsBlendTrees(guid_map, generated_anim_dir) - - def addTransitionBooleanCondition(self, from_state, trans, param, branch): - # Populate the transition's condition logic. - cond = trans.mapping['AnimatorStateTransition'].mapping['m_Conditions'].addChildMapping() - if branch: - cond.mapping['m_ConditionMode'] = '1' - else: - cond.mapping['m_ConditionMode'] = '2' - cond.mapping['m_ConditionEvent'] = param - cond.mapping['m_EventThreshold'] = '0' - # Register the transition with the `from_state`. - if from_state: - from_state_trans = from_state.mapping['AnimatorState'].mapping['m_Transitions'].addChildMapping() - from_state_trans.mapping['fileID'] = copy.copy(trans.anchor) - - def addTransitionIntegerEqualityCondition(self, from_state, trans, param, param_val): - # Populate the transition's condition logic. - cond = trans.mapping['AnimatorStateTransition'].mapping['m_Conditions'].addChildMapping() - cond.mapping['m_ConditionMode'] = '6' - cond.mapping['m_ConditionEvent'] = param - # Curiously, the typo ("treshold" only has 1 'h') is needed for this to - # work, but not for boolean conditions to work. - cond.mapping['m_EventTreshold'] = str(param_val) - # Register the transition with the `from_state`. - if from_state: - from_state_trans = from_state.mapping['AnimatorState'].mapping['m_Transitions'].addChildMapping() - from_state_trans.mapping['fileID'] = trans.anchor - - def addTransitionIntegerGreaterCondition(self, from_state, trans, param, param_val): - # Populate the transition's condition logic. - cond = trans.mapping['AnimatorStateTransition'].mapping['m_Conditions'].addChildMapping() - cond.mapping['m_ConditionMode'] = '3' - cond.mapping['m_ConditionEvent'] = param - cond.mapping['m_EventThreshold'] = str(param_val) - # Register the transition with the `from_state`. - if from_state: - from_state_trans = from_state.mapping['AnimatorState'].mapping['m_Transitions'].addChildMapping() - from_state_trans.mapping['fileID'] = trans.anchor - - # TODO(yum) this should be factored out into generate_fx.py - def addTasttToggle(self, off_anim_path, on_anim_path, toggle_param, - guid_map): - self.addParameter(toggle_param, bool) - - off_anim_meta = Metadata() - off_anim_meta.loadOrCreate(off_anim_path, guid_map) - - on_anim_meta = Metadata() - on_anim_meta.loadOrCreate(on_anim_path, guid_map) - - layer = self.addLayer('TaSTT_Toggle') - off_anim = self.addAnimatorState(layer, 'TaSTT_Toggle_Off', is_default_state = True) - self.setAnimatorStateAnimation(off_anim, off_anim_meta.guid) - on_anim = self.addAnimatorState(layer, 'TaSTT_Toggle_On') - self.setAnimatorStateAnimation(on_anim, on_anim_meta.guid) - - # TODO(yum) make a Transition class with methods for adding boolean - # conditions - off_to_on = self.addTransition(on_anim) - self.addTransitionBooleanCondition(off_anim, off_to_on, toggle_param, True) - - on_to_off = self.addTransition(off_anim) - self.addTransitionBooleanCondition(on_anim, on_to_off, toggle_param, False) - - def setNoopAnimations(self, guid_map, noop_anim_path): - noop_anim_meta = Metadata() - noop_anim_meta.loadOrCreate(noop_anim_path, guid_map) - - for node in self.nodes: - if node.class_id != "1102": - continue - motion = node.mapping['AnimatorState'].mapping['m_Motion'] - replace = False - - name = node.mapping['AnimatorState'].mapping['m_Name'] - anchor = node.anchor - - # As of 8 May 2023, idle states look like this: - # m_Motion: {fileID: 7400000, guid: e5881c5b0c09be854b0fd6fd8144333f, type: 2} - # Before that, they looked like this: - # m_Motion: {fileID: 0} - # The first predicate looks for the new pattern. - # The second predicate looks for the second pattern. - if "fileID" in motion.mapping.keys() and \ - "guid" in motion.mapping.keys() and \ - not motion.mapping["guid"] in guid_map: - motion.mapping["fileID"] = "7400000" - print(f"Set noop animation to guid {noop_anim_meta.guid} in state {node.anchor}") - motion.mapping["guid"] = noop_anim_meta.guid - motion.mapping["type"] = "2" - elif not ("fileID" in motion.mapping.keys() and - motion.mapping["fileID"] != "0") and not ("guid" in - motion.mapping.keys() and motion.mapping["guid"] in - guid_map): - motion.mapping["fileID"] = "7400000" - print(f"Set noop animation to guid {noop_anim_meta.guid} in state {node.anchor}") - motion.mapping["guid"] = noop_anim_meta.guid - motion.mapping["type"] = "2" - else: - #print(f"Skipping state {anchor} / {name}") - pass - -def unityYamlToString(nodes): - lines = [] - preamble = """ -%YAML 1.1 -%TAG !u! tag:unity3d.com,2011: -"""[1:][:-1] - if len(nodes) > 1 or (len(nodes) == 1 and nodes[0].anchor): - lines.append(preamble) - for doc in nodes: - if len(nodes) > 1 or (len(nodes) == 1 and nodes[0].anchor): - lines.append("--- !u!" + doc.class_id + " &" + doc.anchor) - lines.append(str(doc)) - result = '\n'.join(lines) - - for i in range(0,10): - result = result.replace("\n\n", "\n") - - return result - -class UnityParser: - STREAM_START = 100 - STREAM_END = 199 - - DOCUMENT_START = 200 - DOCUMENT_END = 299 - - MAPPING_START = 300 - MAPPING_KEY = 301 - - SEQUENCE_VALUE = 400 - - def __init__(self): - self.state = self.STREAM_START - self.cur_scalar = None - self.cur_node = None - - # Simple list of parsed documents. Populated by parse(). - self.nodes = [] - self.prev_states = [] - - def __str__(self): - return unityYamlToString(self.nodes) - - def pushState(self, state): - self.prev_states.append(self.state) - self.state = state - #print("state {} ({})".format(self.state, len(self.prev_states))) - - def popState(self): - self.state = self.prev_states[-1] - self.prev_states = self.prev_states[0:len(self.prev_states) - 1] - #print("state {} ({})".format(self.state, len(self.prev_states))) - return self.state - - def cleanYaml(self, yaml_str): - lines = [] - first_document = True - got_document = False - for line in yaml_str.split("\n"): - # Add end-of-document indicators. - if line.startswith("---"): - got_document = True - if not first_document: - lines.append("...\n") - first_document = False - - # Remove class ID tag from each block. - if line.startswith("---"): - parts = line.split() - lines.append(parts[0] + " " + parts[2] + "\n") - continue - lines.append(line) - - if got_document: - lines.append("...\n") - return '\n'.join(lines) - - def getClassIds(self, yaml_str): - anchor_to_class_id = {} - for line in yaml_str.split("\n"): - if not line.startswith("---"): - continue - - parts = line.split() - class_id = parts[1][3:] - anchor = parts[2][1:] - anchor_to_class_id[anchor] = class_id - - return anchor_to_class_id - - def parseFile(self, yaml_file): - yaml_str = "" - with open(yaml_file, "r", encoding="utf-8") as f: - yaml_str = f.read() - return self.parse(yaml_str) - - def parse(self, yaml_str): - anchor_to_class_id = self.getClassIds(yaml_str) - yaml_str = self.cleanYaml(yaml_str) - - for event in yaml.parse(yaml_str): - if isinstance(event, yaml.StreamStartEvent): - if len(self.prev_states) > 0: - raise Exception("Multiple StreamStartEvents received") - self.pushState(self.STREAM_START) - - elif isinstance(event, yaml.StreamEndEvent): - if self.state != self.STREAM_START: - raise Exception("Document end received after state {}".format(self.state)) - self.popState() - if len(self.prev_states) > 0: - raise Exception("Extra states at stream end") - - elif isinstance(event, yaml.DocumentStartEvent): - if self.state != self.STREAM_START and self.state != self.DOCUMENT_END: - raise Exception("Document start received after state {}".format(self.state)) - self.pushState(self.DOCUMENT_START) - - elif isinstance(event, yaml.DocumentEndEvent): - if self.state != self.DOCUMENT_START: - raise Exception("Document end received after state {}".format(self.state)) - self.popState() - self.nodes.append(self.cur_node) - self.cur_node = None - - elif isinstance(event, yaml.MappingStartEvent): - if self.cur_node == None: - self.cur_node = UnityDocument() - self.cur_node.anchor = event.anchor - if event.anchor: - self.cur_node.class_id = anchor_to_class_id[event.anchor] - else: - self.cur_node = self.cur_node.addChildMapping(self.cur_scalar) - self.pushState(self.MAPPING_START) - - elif isinstance(event, yaml.MappingEndEvent): - if self.state != self.MAPPING_START: - raise Exception("Mapping end received after state {}".format(self.state)) - self.popState() - if self.state == self.MAPPING_KEY: - self.popState() - if self.cur_node.parent != None: - self.cur_node = self.cur_node.parent - - elif isinstance(event, yaml.SequenceStartEvent): - self.cur_node = self.cur_node.addChildSequence(self.cur_scalar) - self.pushState(self.SEQUENCE_VALUE) - - elif isinstance(event, yaml.SequenceEndEvent): - if self.state != self.SEQUENCE_VALUE: - raise Exception("Sequence end received after state {}".format(self.state)) - self.popState() - if self.state == self.MAPPING_KEY: - self.popState() - self.cur_node = self.cur_node.parent - - elif isinstance(event, yaml.ScalarEvent): - if self.state == self.MAPPING_START: - self.cur_scalar = event.value - self.pushState(self.MAPPING_KEY) - elif self.state == self.MAPPING_KEY: - self.cur_node.mapping[self.cur_scalar] = event.value - self.popState() - elif self.state == self.SEQUENCE_VALUE: - self.cur_node.sequence.append(event.value) - else: - raise Exception("Scalar event received after state {}".format(self.state)) - else: - raise Exception("Unhandled event {}".format(event)) - continue - -class MulticoreUnityParser: - def parseFile(self, yaml_file): - yaml_str = "" - with open(yaml_file, "r", encoding="utf-8") as f: - yaml_str = f.read() - return self.parse(yaml_str) - - def parse(self, yaml_str): - lines = [] - documents = [] - first = True - n_lines = 0 - for line in yaml_str.split("\n"): - n_lines += 1 - if line.startswith("---"): - if not first: - documents.append("\n".join(lines)) - lines = [] - first = False - lines.append(line) - if len(lines) > 0: - documents.append("\n".join(lines)) - lines = [] - print("Got {} documents out of {} lines".format(len(documents), n_lines), file=sys.stderr) - - # Divide the work evenly among the # of CPUs we have available. - n_threads = os.cpu_count() - window_size = int(math.ceil(len(documents) / n_threads)) - merge_window = [] - merged_documents = [] - for i in range(0, len(documents)): - if i > 0 and i % window_size == 0: - merged_documents.append("\n".join(merge_window)) - merge_window = [] - merge_window.append(documents[i]) - if len(merge_window) > 0: - merged_documents.append("\n".join(merge_window)) - merge_window = [] - documents = merged_documents - - mgr = mp.Manager() - - print("Spawning {} threads".format(len(documents)), file=sys.stderr) - threads = [] - for document in documents: - res = mgr.dict() - thread = mp.Process(target = self.parseOneSerial, args = (document, res,)) - threads.append((thread, res)) - thread.start() - - print("Joining threads", file=sys.stderr) - nodes = [] - for thread, res in threads: - thread.join() - nodes += res['nodes'] - - print("Creating animator", file=sys.stderr) - result = UnityAnimator() - result.addNodes(nodes) - - return result - - def parseOneSerial(self, document, res): - parser = UnityParser() - parser.parse(document) - res['nodes'] = parser.nodes - - def parseFile(self, yaml_file): - yaml_str = "" - with open(yaml_file, "r", encoding="utf-8") as f: - yaml_str = f.read() - return self.parse(yaml_str) - -def getGuidMap(d): - result = {} - for f in os.scandir(d): - path = f.path - if f.is_dir(): - result.update(getGuidMap(path)) - if not f.is_file(): - continue - suffix = ".meta" - if path.endswith(suffix): - with open(path, "r", encoding="utf-8") as f: - for line in f: - if line.startswith("guid"): - guid = line.split()[1] - result[guid] = path[:-len(suffix)] - return result - -if __name__ == "__main__": - os.chdir(os.path.dirname(os.path.abspath(__file__))) - - parser = argparse.ArgumentParser() - parser.add_argument("cmd", type=str, help="One of merge, guid_map, fix_write_defaults") - parser.add_argument("--fx0", type=str, help="The first animator to merge") - parser.add_argument("--fx1", type=str, help="The second animator to merge") - parser.add_argument("--fx_dest", type=str, help="The path at which to " + - "save the generated/merged animator") - parser.add_argument("--project_root", type=str, help="The path to the " + - "Unity project Assets folder") - parser.add_argument("--save_to", type=str, help="The path to save the " + - "result of the computation") - parser.add_argument("--guid_map", type=str, help="Path to guid.map, " + - "generated by a previous call to `guid_map`") - parser.add_argument("--guid_map_append", type=bool, help="If set, " + - "append to GUID map instead of overwriting.") - parser.add_argument("--gen_anim_dir", type=str, help="The folder under which generated animations are stored") - args = parser.parse_args() - - if args.cmd == "merge": - if not args.fx0 or not args.fx1 or not args.fx_dest: - print("--fx0, --fx1, and --fx_dest required", file=sys.stderr) - parser.print_help() - parser.exit(1) - - print("Parsing {}".format(args.fx0), file=sys.stderr) - parser0 = MulticoreUnityParser() - anim0 = parser0.parseFile(args.fx0) - - arg1 = "TaSTT_fx.controller" - print("Parsing {}".format(args.fx1), file=sys.stderr) - parser1 = MulticoreUnityParser() - anim1 = parser1.parseFile(args.fx1) - - print("Merging animators", file=sys.stderr) - anim0.merge(anim1) - - print("Serializing to {}".format(args.fx_dest), file=sys.stderr) - with open(args.fx_dest, "w", encoding="utf-8") as f: - f.write(unityYamlToString(anim0.nodes)) - - elif args.cmd == "guid_map": - if not args.project_root or not args.save_to: - print("--project_root and --save_to required") - parser.print_help() - parser.exit(1) - - print("Looking up GUIDs under {}".format(args.project_root), - file=sys.stderr) - guid_map = getGuidMap(args.project_root) - - save_to_dir = os.path.dirname(args.save_to) - os.makedirs(save_to_dir, exist_ok=True) - - if args.guid_map_append: - tmp_map = {} - with open(args.save_to, "rb") as f: - tmp_map = pickle.load(f) - # combine guid_map and tmp_map - guid_map = {**guid_map, **tmp_map} - print("Saving to {}".format(args.save_to), file=sys.stderr) - with open(args.save_to, 'wb') as f: - pickle.dump(guid_map, f) - elif args.cmd == "fix_write_defaults": - if not args.fx0 or not args.guid_map: - print("--fx0 and --guid_map required") - parser.print_help() - parser.exit(1) - - guid_map = {} - with open(args.guid_map, 'rb') as f: - guid_map = pickle.load(f) - - print("Parsing {}".format(args.fx0), file=sys.stderr) - parser0 = MulticoreUnityParser() - anim = parser0.parseFile(args.fx0) - - print("Fixing write defaults", file=sys.stderr) - anim_dir = "generated/animations/" - os.makedirs(anim_dir, exist_ok=True) - anim.fixWriteDefaults(guid_map, anim_dir + "TaSTT_Reset_Animation.anim") - print(str(anim)) - - elif args.cmd == "gen_off_anims": - if not args.fx0 or not args.guid_map: - print("--fx0 and --guid_map required") - parser.print_help() - parser.exit(1) - - guid_map = {} - with open(args.guid_map, 'rb') as f: - guid_map = pickle.load(f) - - print("Parsing {}".format(args.fx0), file=sys.stderr) - parser0 = MulticoreUnityParser() - anim = parser0.parseFile(args.fx0) - - print("Generating off animations", file=sys.stderr) - anim_dir = "generated/animations/" - os.makedirs(anim_dir, exist_ok=True) - anim.generateOffAnimations(guid_map, "generated/animations") - - elif args.cmd == "add_toggle": - if not args.fx0 or not args.fx_dest or not args.gen_anim_dir or not args.guid_map: - print("--fx0, --fx_dest, --gen_anim_dir and --guid_map required") - parser.print_help() - parser.exit(1) - - guid_map = {} - with open(args.guid_map, 'rb') as f: - guid_map = pickle.load(f) - - print("Parsing {}".format(args.fx0), file=sys.stderr) - parser0 = MulticoreUnityParser() - anim = parser0.parseFile(args.fx0) - - print("Adding toggle", file=sys.stderr) - anim.addTasttToggle(args.gen_anim_dir + "/TaSTT_Toggle_Off.anim", - args.gen_anim_dir + "/TaSTT_Toggle_On.anim", "TaSTT_Toggle", - guid_map) - - print("Serializing to {}".format(args.fx_dest), file=sys.stderr) - with open(args.fx_dest, "w", encoding="utf-8") as f: - f.write(str(anim)) - - with open(args.guid_map, 'wb') as f: - pickle.dump(guid_map, f) - - elif args.cmd == "fast_parse_test": - if not args.fx0: - print("--fx0 required") - parser.print_help() - parser.exit(1) - - print("Parsing {}".format(args.fx0), file=sys.stderr) - parser0 = MulticoreUnityParser() - anim = parser0.parseFile(args.fx0) - print(str(anim)) - - elif args.cmd == "set_noop_anim": - if not args.fx0 or not args.fx_dest or not args.gen_anim_dir or not args.guid_map: - print("--fx0, --fx_dest, --gen_anim_dir and --guid_map required") - parser.print_help() - parser.exit(1) - - guid_map = {} - with open(args.guid_map, 'rb') as f: - guid_map = pickle.load(f) - - print("Parsing {}".format(args.fx0), file=sys.stderr) - parser = MulticoreUnityParser() - anim = parser.parseFile(args.fx0) - - anim.setNoopAnimations(guid_map, args.gen_anim_dir + "/TaSTT_Do_Nothing.anim") - - with open(args.fx_dest, "w", encoding="utf-8") as f: - f.write(str(anim)) - - else: - print("Unrecognized command: {}".format(args.cmd)) - diff --git a/Scripts/obfuscate.py b/Scripts/obfuscate.py deleted file mode 100644 index 8d01e10..0000000 --- a/Scripts/obfuscate.py +++ /dev/null @@ -1,92 +0,0 @@ -#!/usr/bin/env python3 - -# This module is used to implement obfuscation of TaSTT network -# speech data. At a high level, TaSTT is simply streaming N bits of -# arbitrary data to a shader via VRChat's parameter sync mechanism. -# -# It would be trivial to mine this data for speech information, since -# we're sending unicode (or ASCII) characters to peers. -# -# To raise the cost for the casual data collector, we can obfuscate -# this data using a one-time pad in cipher-block chaining mode. -# -# Making things interesting, encrypted data will arrive at the Unity -# animator, which processes them in 8 bit chunks. They are written -# into contiguous blocks of the animator. Thus the shader can decrypt -# the board by decrypting each block. This is thus stronger than -# applying a one-time pad to each byte of the plaintext, since the -# statistical distribution of individual letters is destroyed. -# Obviously due to the lack of an initialization vector, the -# distribution of phrases (blocks) is preserved. - -import math -import os - -def genKey(n_bits = 128) -> bytearray: - return os.urandom(int(n_bits / 8)) - -def saveKey(filename: str, key: str): - with open(filename, "wb") as f: - f.write(key) - -def loadKey(filename: str) -> bytearray: - with open(filename, "rb") as f: - return f.read() - -# Apply a symmetric cypher to `data` using cypher-block chaining. -def obfuscate(data: bytearray, key: bytearray) -> str: - n_blocks = int(math.ceil(len(data) / len(key))) - # This is a misnomer. A true IV would be randomized, but we can't - # do that since the shader doesn't have access to it. We just use - # this to implement the "chaining" aspect of CBC. - iv = bytearray(b'\x00') * len(key) - result = bytearray() - for i in range(0, n_blocks): - block_begin = i * len(key) - block_end = (i + 1) * len(key) - block_plain = data[block_begin:block_end] - block_cypher = block_plain.copy() - for i in range(0, len(block_cypher)): - block_cypher[i] ^= iv[i] - block_cypher[i] ^= key[i] - result += block_cypher - iv = block_cypher - return result - -def deobfuscate(data: bytearray, key: bytearray) -> str: - n_blocks = int(math.ceil(len(data) / len(key))) - # This is a misnomer. A true IV would be randomized, but we can't - # do that since the shader doesn't have access to it. We just use - # this to implement the "chaining" aspect of CBC. - iv = bytearray(b'\x00') * len(key) - result = bytearray() - for i in range(0, n_blocks): - block_begin = i * len(key) - block_end = (i + 1) * len(key) - block_cypher = data[block_begin:block_end] - block_plain = block_cypher.copy() - for i in range(0, len(block_plain)): - block_plain[i] ^= key[i] - block_plain[i] ^= iv[i] - result += block_plain - iv = block_cypher - return result - -def test(): - key = genKey() - saveKey("test.key", key) - new_key = loadKey("test.key") - os.remove("test.key") - assert(key == new_key) - - plaintext_original = "Lorem ipsum dolor sit amet, consectetur adipiscing elit." - plaintext_bytes = bytearray(plaintext_original, "utf-8") - cyphertext = obfuscate(plaintext_bytes, key) - assert(len(plaintext_bytes) == len(cyphertext)) - plaintext_recovered = deobfuscate(cyphertext, key).decode("utf-8") - assert(plaintext_original == plaintext_recovered) - assert(plaintext_bytes != cyphertext) - -if __name__ == "__main__": - test() - diff --git a/Scripts/osc_ctrl.py b/Scripts/osc_ctrl.py deleted file mode 100644 index c077b2b..0000000 --- a/Scripts/osc_ctrl.py +++ /dev/null @@ -1,185 +0,0 @@ -#!/usr/bin/env python3 - -from emotes_v2 import EmotesState -from generate_utils import config -from math import ceil -from paging import MultiLinePager -from pythonosc import udp_client - -import argparse -import generate_utils -import random -import time - -# 5 Hz usually works, but 3 Hz is more reliable in busy lobbies. Feel free to -# dial this up if you want faster paging, but know that it might break for -# remote users. -SYNC_FREQ_HZ = 3.0 -SYNC_DELAY_S = 1.0 / SYNC_FREQ_HZ - -def getClient(ip = "127.0.0.1", port = 9000): - return udp_client.SimpleUDPClient(ip, port) - -# The characters in the TaSTT are all numbered from top left to bottom right. -# This function provides a mapping from letter ('a') to index (26). -def generateEncoding(): - encoding = {} - for i in range(0, 65535): - encoding[chr(i)] = (i % 256, int(i / 256)) - return encoding - -class OscState: - def __init__(self, chars_per_sync: int, rows: int, cols: int, - bytes_per_char: int, - ip = "127.0.0.1", port = 9000): - self.client = getClient(ip, port) - self.pager = MultiLinePager(chars_per_sync, rows, cols) - self.encoding= generateEncoding() - self.bytes_per_char = bytes_per_char - self.client.bytes_per_char = bytes_per_char - self.builtin_msg = "" # The last message sent to the built-in chatbox - - def reset(self): - self.pager.reset() - -def encodeMessage(encoding, msg): - encoded = [] - for char in msg: - encoded.append(encoding[char]) - return encoded - -def lockWorld(client, lock: bool): - addr = "/avatar/parameters/" + generate_utils.getLockWorldParam() - client.send_message(addr, lock) - -def toggleBoard(client, show: bool): - addr = "/avatar/parameters/" + generate_utils.getToggleParam() - client.send_message(addr, show) - -def enable(client): - addr="/avatar/parameters/" + generate_utils.getEnableParam() - client.send_message(addr, True) - -def disable(client): - addr="/avatar/parameters/" + generate_utils.getEnableParam() - client.send_message(addr, False) - -def ellipsis(client, enable: bool): - addr="/avatar/parameters/" + generate_utils.getEllipsisParam() - client.send_message(addr, enable) - -def clear(osc_state: OscState): - disable(osc_state.client) - - addr="/avatar/parameters/" + generate_utils.getClearBoardParam() - osc_state.client.send_message(addr, True) - - time.sleep(SYNC_DELAY_S) - - addr="/avatar/parameters/" + generate_utils.getClearBoardParam() - osc_state.client.send_message(addr, False) - - osc_state.reset() - -# Note: `nth_audio` is 1-indexed -def playAudio(osc_state: OscState, nth_audio: int, value: bool): - addr="/avatar/parameters/" + generate_utils.getSoundParam(nth_audio) - osc_state.client.send_message(addr, value) - -def updateRegion(client, region_idx, letter_encoded): - for byte in range(0, client.bytes_per_char): - addr="/avatar/parameters/" + generate_utils.getBlendParam(region_idx, byte) - letter_remapped = (-127.5 + letter_encoded[byte]) / 127.5 - client.send_message(addr, letter_remapped) - -# Sends one slice of `msg` to the board then returns. Slices are sent -# in FIFO order; e.g., the most recently spoken words are sent last. -# Returns True if done paging, False otherwise. -def pageMessage(cfg, osc_state: OscState, msg: str, estate: EmotesState) -> bool: - msg = estate.encode_emotes(msg) - - msg_slice, slice_idx = osc_state.pager.getNextSlice(msg) - if slice_idx == -1: - for i in range(5): - playAudio(osc_state, i+1, False) - return True - - sounds_to_make = set() - letter_i = 1 - for letter in ["a", "e", "i", "o", "u"]: - if letter in msg_slice.lower(): - sounds_to_make.add(letter_i) - letter_i += 1 - if len(sounds_to_make) > 0: - for i in range(5): - if i+1 in sounds_to_make and random.randint(1,3) != 1: - playAudio(osc_state, i+1, True) - else: - playAudio(osc_state, i+1, False) - - #print("sending page {}: {} ({})".format(slice_idx, msg_slice, - # len(msg_slice))) - - # Really long messages just wrap back around. - - # if in last region: - # how long is it - num_cells = cfg["rows"] * cfg["cols"] - num_regions = ceil(num_cells / cfg["chars_per_sync"]) - which_region = slice_idx % num_regions - #print(f"which_region: {which_region}") - #print(f"num_regions: {num_regions}") - #print("num regions: {}".format(num_regions)) - if which_region == num_regions - 1: - layers_in_last_region = num_cells % cfg["chars_per_sync"] - #print(f"layers in last region: {layers_in_last_region}") - if layers_in_last_region == 0: - layers_in_last_region = cfg["chars_per_sync"] - #print("layers in last region: {}".format(layers_in_last_region)) - old_len = len(msg_slice) - msg_slice = msg_slice[0:layers_in_last_region] - #print("truncate msg_slice from length {} to length {}".format(old_len, - # len(msg_slice))) - - #print("send \"{}\" to region {}".format(msg_slice, which_region)) - - enable(osc_state.client) - - # Seek to the current region. - addr="/avatar/parameters/" + generate_utils.getSelectParam() - osc_state.client.send_message(addr, which_region) - - # Update each letter. - encoded = encodeMessage(osc_state.encoding, msg_slice) - #print("len encoded: {}".format(len(encoded))) - for i in range(0, len(encoded)): - updateRegion(osc_state.client, i, encoded[i]) - - ellipsis(osc_state.client, False) - -# Like `pageMessage` but uses the built-in chatbox. The built-in chatbox -# truncates data at about 150 chars, so just send the suffix of the message for -# now. -def pageMessageBuiltin(cfg, osc_state: OscState, msg: str) -> bool: - if len(msg) == 0 or msg.isspace(): - return False # Not paging - - msg_begin = max(len(msg) - 140, 0) - msg_suffix = msg[msg_begin:len(msg)] - - if osc_state.builtin_msg != msg: - addr="/chatbox/typing" - osc_state.client.send_message(addr, False) - - addr="/chatbox/input" - osc_state.client.send_message(addr, (msg_suffix, True)) - osc_state.builtin_msg = msg - - return False # Not paging - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("-i", default="127.0.0.1", help="OSC server IP") - parser.add_argument("-p", type=int, default=9000, help="OSC server port") - args = parser.parse_args() - diff --git a/Scripts/paging.py b/Scripts/paging.py deleted file mode 100644 index c8ba8c3..0000000 --- a/Scripts/paging.py +++ /dev/null @@ -1,128 +0,0 @@ -#!/usr/bin/env python3 - -from math import ceil -from text_wrapping import TextWrapper - -def getSlice(msg: str, idx: int, slice_len: int) -> str: - begin = idx * slice_len - end = (idx + 1) * slice_len - msg_len = len(msg) - if msg_len >= end: - return msg[begin:end] - if msg_len > begin: - return msg[begin:end] + (" " * (end - msg_len)) - return None - -def setSlice(msg: str, idx: int, slice_len: int, msg_slice: str, - include_suffix: bool = True) -> str: - begin = idx * slice_len - end = (idx + 1) * slice_len - prefix = msg[0:begin] - prefix += " " * (begin - len(prefix)) - suffix = msg[end:] - msg = prefix + msg_slice - if include_suffix: - msg += suffix - return msg - -class SingleLinePager: - def __init__(self, slice_len: int): - self.msg = "" - self.slice_len = slice_len - - def reset(self): - self.msg = "" - - def getNextSlice(self, msg) -> tuple[str, int]: - for i in range(0, ceil(len(msg) / self.slice_len)): - old_slice = getSlice(self.msg, i, self.slice_len) - new_slice = getSlice(msg, i, self.slice_len) - if old_slice != new_slice: - self.msg = setSlice(self.msg, i, self.slice_len, new_slice) - return new_slice, i - return "", -1 - -class MultiLinePager: - def __init__(self, slice_len: int, rows: int, cols: int): - # This is a list of lists of SingleLinePagers. - # It represents a list of pages, each containing a list of lines. - self.pages = [] - self.slice_len = slice_len - self.rows = rows - self.cols = cols - - def reset(self): - self.pages = [] - - def getNextSlice(self, msg) -> tuple[str, int]: - pages = TextWrapper(self.rows, self.cols).wrap(msg) - - # Wrapping split the input message along line boundaries and along page - # boundaries. However, we're going to treat each page like a single - # line, so that `slice_idx` can be used as a region index. Therefore, - # we need exactly one SingleLinePager per page. - for pi in range(len(self.pages), len(pages)): - self.pages.append(SingleLinePager(self.slice_len)) - - for pi in range(0, len(pages)): - line = "".join(pages[pi]) - pager = self.pages[pi] - msg_slice, slice_idx = pager.getNextSlice(line) - if slice_idx != -1: - # Reset every page after this. This guarantees that any text - # written in this operation will eventually be redrawn. - for pj in range(pi + 1, len(pages)): - self.pages[pj].reset() - return msg_slice, slice_idx - return "", -1 - -if __name__ == "__main__": - assert(getSlice("abcdefghij", 0, 1) == "a") - assert(getSlice("abcdefghij", 9, 1) == "j") - assert(getSlice("abcdefghij", 0, 2) == "ab") - assert(getSlice("abcdefghij", 1, 2) == "cd") - assert(getSlice("abcdefghij", 3, 3) == "j ") - assert(getSlice("abcdefghij", 10, 1) == None) - assert(getSlice("abcdefghij", 11, 1) == None) - - assert(setSlice("abcdefghij", 1, 2, "kl") == "abklefghij") - assert(setSlice("abc", 1, 2, "de") == "abde") - assert(setSlice("abc", 0, 2, "de") == "dec") - - slice_len = 2 - p = SingleLinePager(slice_len) - p.msg = "test" - assert(p.getNextSlice("test")[0] == "") - assert(p.getNextSlice("tast")[0] == "ta") - assert(p.getNextSlice("tast")[0] == "") - - p.msg = "" - assert(p.getNextSlice("test")[0] == "te") - assert(p.msg == "te") - assert(p.getNextSlice("test")[0] == "st") - assert(p.msg == "test") - assert(p.getNextSlice("test")[0] == "") - assert(p.msg == "test") - assert(p.getNextSlice("tests")[0] == "s ") - - slice_len = 2 - rows = 2 - cols = 4 - p = MultiLinePager(slice_len, rows, cols) - assert(p.getNextSlice("")[0] == "") - assert(p.getNextSlice("yo")[0] == "yo") - assert(p.getNextSlice("yogi")[0] == "gi") - assert(p.getNextSlice("yugi")[0] == "yu") - assert(p.getNextSlice("yugi is a")[0] == "is") - assert(p.getNextSlice("yugi is a")[0] == " a") - assert(p.getNextSlice("yugi is a pussy")[0] == "pu") - assert(p.getNextSlice("yugi is a pussy")[0] == "s-") - assert(p.getNextSlice("yugi is a pussy")[0] == "sy") - - p = MultiLinePager(slice_len, rows, cols) - assert(p.getNextSlice("yo")[0] == "yo") - assert(p.getNextSlice("yo")[0] == " ") - assert(p.getNextSlice("yo")[0] == " ") - assert(p.getNextSlice("yo")[0] == " ") - assert(p.getNextSlice("yo")[0] == "") - diff --git a/Scripts/profanity_filter.py b/Scripts/profanity_filter.py deleted file mode 100644 index b8c84ed..0000000 --- a/Scripts/profanity_filter.py +++ /dev/null @@ -1,43 +0,0 @@ -#!/usr/bin/env python3 - -class ProfanityFilter: - def __init__(self, en_path: str): - self.en_path = en_path - self.en_profanity = set() - - def load(self): - with open(self.en_path, 'r') as f: - for line in f: - self.en_profanity.add(line.strip()) - - def filter(self, line: str, language_code: str = "en") -> str: - filtered = "" - - if language_code not in {"en"}: - raise ValueError(f"Language code \"{language_code}\" is " + - "unsupported by the profanity filter") - - # Translation table converting vowels to asterisks. - vowel_to_asterisk = str.maketrans('aeiouAEIOU', '**********') - - result = [] - for word in line.split(): - word_clean = word.lower() - # Filter out non-alphabet characters from the word. - word_clean = ''.join([char for char in word_clean if char.isalpha()]) - if word_clean in self.en_profanity: - result.append(word.translate(vowel_to_asterisk)) - else: - result.append(word) - - return " ".join(result) - -if __name__ == "__main__": - en_path = "/mnt/d/vrc/TaSTT/GUI/Profanity/Profanity/en" - p = ProfanityFilter(en_path) - p.load() - assert(p.filter("fuck") == "f*ck") - assert(p.filter("fuck!") == "f*ck!") - assert(p.filter("fuck shit") == "f*ck sh*t") - assert(p.filter("fuck shit this should not be filtered") == "f*ck sh*t this should not be filtered") - assert(p.filter("ASS") == "*SS") diff --git a/Scripts/remove_audio_sources.py b/Scripts/remove_audio_sources.py deleted file mode 100644 index 0486169..0000000 --- a/Scripts/remove_audio_sources.py +++ /dev/null @@ -1,25 +0,0 @@ -import argparse -import libunity -import sys - -def removeAudioSources(path: str): - parser = libunity.MulticoreUnityParser() - anim = parser.parseFile(path) - anchors = set() - node = anim.popNodeOfClass("82") - while node: - print("Killed audio source") - anchors.add(node.anchor) - node = anim.popNodeOfClass("82") - for node in anim.nodes: - anim.scrubReferencesByValue(node, values=anchors) - with open(path, "w", encoding="utf-8") as f: - f.write(libunity.unityYamlToString(anim.nodes)) - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("--prefab", type=str, help="Path to .prefab file.") - args = parser.parse_args() - - removeAudioSources(args.prefab) - diff --git a/Scripts/requirements.txt b/Scripts/requirements.txt deleted file mode 100644 index 41c581c..0000000 --- a/Scripts/requirements.txt +++ /dev/null @@ -1,18 +0,0 @@ -ctranslate2==4.5.0 -editdistance -faster-whisper@https://github.com/guillaumekln/faster-whisper/archive/53bbe5401683c9a7549db62642e3d4535956b95c.tar.gz -future==0.18.2 -huggingface_hub==0.16.4 -keyboard -langcodes -language-data -openvr -pillow -pyaudio -pydub -python-osc -pyyaml -sentence_splitter -transformers>=4.21.0 -wget - diff --git a/Scripts/requirements_frozen.txt b/Scripts/requirements_frozen.txt deleted file mode 100644 index 9e6a6ab..0000000 --- a/Scripts/requirements_frozen.txt +++ /dev/null @@ -1,42 +0,0 @@ -av==13.1.0 -certifi==2024.8.30 -charset-normalizer==3.4.0 -colorama==0.4.6 -coloredlogs==15.0.1 -ctranslate2==4.5.0 -editdistance==0.8.1 -faster-whisper @ https://github.com/guillaumekln/faster-whisper/archive/53bbe5401683c9a7549db62642e3d4535956b95c.tar.gz#sha256=17b49d15a58e18d78b4639af59bd35da12bc0bf3bb73c9af4ad48891dd6793f7 -filelock==3.16.1 -flatbuffers==24.3.25 -fsspec==2024.10.0 -future==0.18.2 -huggingface-hub==0.16.4 -humanfriendly==10.0 -idna==3.10 -keyboard==0.13.5 -langcodes==3.4.1 -language_data==1.2.0 -marisa-trie==1.2.1 -mpmath==1.3.0 -numpy==2.1.3 -onnxruntime==1.20.0 -openvr==2.5.101 -packaging==24.2 -pillow==11.0.0 -protobuf==5.28.3 -PyAudio==0.2.14 -pydub==0.25.1 -pyreadline3==3.5.4 -python-osc==1.9.0 -PyYAML==6.0.2 -regex==2024.11.6 -requests==2.32.3 -safetensors==0.4.5 -sentence-splitter==1.4 -sympy==1.13.3 -tokenizers==0.15.2 -tqdm==4.67.0 -transformers==4.35.2 -typing_extensions==4.12.2 -urllib3==2.2.3 -wget==3.2 diff --git a/Scripts/set_texture_sz.py b/Scripts/set_texture_sz.py deleted file mode 100644 index f6fbb45..0000000 --- a/Scripts/set_texture_sz.py +++ /dev/null @@ -1,24 +0,0 @@ -import argparse -import libunity -import sys - -def setTextureSize(path: str, size: int): - parser = libunity.MulticoreUnityParser() - anim = parser.parseFile(path) - - node = anim.nodes[0] - node.mapping['TextureImporter'].mapping['maxTextureSize'] = size - for plat in node.mapping['TextureImporter'].mapping['platformSettings'].sequence: - plat.mapping['maxTextureSize'] = size - - with open(path, "w", encoding="utf-8") as f: - f.write(libunity.unityYamlToString(anim.nodes)) - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("--meta", type=str, help="Path to texture .meta file.") - parser.add_argument("--size", type=int, help="Texture size.") - args = parser.parse_args() - - setTextureSize(args.meta, args.size) - diff --git a/Scripts/steamvr.py b/Scripts/steamvr.py deleted file mode 100644 index 3e6c6c9..0000000 --- a/Scripts/steamvr.py +++ /dev/null @@ -1,89 +0,0 @@ -#!/usr/bin/env python3 - -# python3 -m pip install openvr -# License: BSD-3.0 (requires showing notice in binary distributions) -import openvr as vr -import sys -import time - -EVENT_NONE = 0 -EVENT_RISING_EDGE = 1 -EVENT_FALLING_EDGE = 2 - -class InputEvent: - def __init__(self, - opcode: int): - self.opcode = opcode - -# Checks if the given button on the given controller is pressed. -def pollButtonPress( - hand: str = "right", - button: str = "b", - ctrl = None # ThreadControl object - ) -> int: - hands = {} - hands["left"] = vr.TrackedControllerRole_LeftHand - hands["right"] = vr.TrackedControllerRole_RightHand - - buttons = {} - buttons["a"] = vr.k_EButton_IndexController_A - buttons["b"] = vr.k_EButton_IndexController_B - buttons["thumbstick"] = vr.k_EButton_Axis0 - - system = None - first = True - while ctrl.run_app and not system: - try: - system = vr.init(vr.VRApplication_Background) - except Exception as e: - if first: - print(f"Failed to start steamVR input thread: {repr(e)}", file=sys.stderr) - first = False - time.sleep(1) - last_packet = 0 - event_high = False - - while ctrl.run_app: - time.sleep(0.01) - - lh_idx = system.getTrackedDeviceIndexForControllerRole(hands[hand]) - #print("left hand device idx: {}".format(lh_idx)) - - got_state, state = system.getControllerState(lh_idx) - if not got_state: - continue - - if state.unPacketNum == last_packet: - continue - - # Clicking joysticks and moving joysticks fire the same events. To - # differentiate movement from clicking, we create a dead zone: if the event - # fires while the stick isn't moved far from center, we assume it's a - # click, not movement. - dead_zone_radius = 0.7 - - button_mask = (1 << buttons[button]) - ret = EVENT_NONE - if (state.ulButtonPressed & button_mask) != 0 and\ - (state.rAxis[0].x**2 + state.rAxis[0].y**2 < dead_zone_radius**2): - #print("button pressed: %016x" % state.ulButtonPressed) - #for i in range(0, 5): - # print("axis {} x: {} y: {}".format(i, state.rAxis[i].x, state.rAxis[i].y)) - if not event_high: - yield InputEvent(EVENT_RISING_EDGE) - event_high = True - elif event_high: - event_high = False - yield InputEvent(EVENT_FALLING_EDGE) - -if __name__ == "__main__": - gen = pollButtonPress() - while True: - time.sleep(0.1) - - event = pollButtonPress(session_state) - if event == EVENT_RISING_EDGE: - print("rising edge") - elif event == EVENT_FALLING_EDGE: - print("falling edge") - diff --git a/Scripts/text_to_text_demo.py b/Scripts/text_to_text_demo.py deleted file mode 100644 index 4810361..0000000 --- a/Scripts/text_to_text_demo.py +++ /dev/null @@ -1,96 +0,0 @@ -#!/usr/bin/env python3 -# python3 -m pip install python-osc pillow - -from math import ceil -from paging import MultiLinePager -from pythonosc import udp_client - -import generate_utils -import osc_ctrl -import time - -class AppConfig: - def __init__(self, - rows: int = 4, - cols: int = 40, - chars_per_sync: int = 10, - osc_sync_rate_hz: int = 3): - self.rows = rows - self.cols = cols - self.chars_per_sync = chars_per_sync - self.osc_sync_rate_hz = osc_sync_rate_hz - self.client = osc_ctrl.getClient() - -def encodeMessage(msg): - encoded = [] - for char in msg: - encoded.append(ord(char)) - return encoded - -class OSCSyncHelper: - def __init__(self, - config: AppConfig): - self.sync_delay_s = 1.0 / config.osc_sync_rate_hz - self.last = time.time() - self.sync_delay_s - - def waitForSync(self) -> None: - # sleep() can sleep for too short a time, so use a loop to ensure that - # we sleep at least a full sync window's worth of time. - while time.time() - self.last < self.sync_delay_s: - time.sleep(0.01) - self.last = time.time() - -def sendMessage(msg: str, cfg: AppConfig, osc: OSCSyncHelper) -> None: - num_cells = cfg.rows * cfg.cols - num_regions = ceil(num_cells / cfg.chars_per_sync) - - pager = MultiLinePager(cfg.chars_per_sync, cfg.rows, cfg.cols) - - # Show the chatbox - osc.waitForSync() - osc_ctrl.toggleBoard(cfg.client, True) - osc_ctrl.ellipsis(cfg.client, False) - osc_ctrl.disable(cfg.client) - - # Ensure that the chatbox is cleared. - addr="/avatar/parameters/" + generate_utils.getClearBoardParam() - cfg.client.send_message(addr, True) - osc.waitForSync() - cfg.client.send_message(addr, False) - - slice_idx = 0 - while slice_idx != -1: - - msg_slice, slice_idx = pager.getNextSlice(msg) - which_region = slice_idx % num_regions - - print(f"Sending slice '{msg_slice}' to region {which_region}") - - # Wait until OSC has had enough time to sync the previous window of - # data. - osc.waitForSync() - - # Enable chatbox animations. - osc_ctrl.enable(cfg.client) - - # Seek to the current region. - addr="/avatar/parameters/" + generate_utils.getSelectParam() - cfg.client.send_message(addr, which_region) - - # Send all characters in the current region. - encoded = encodeMessage(msg_slice) - for i in range(0, len(msg_slice)): - print(f"Sending char {msg_slice[i]} / {encoded[i]}") - addr="/avatar/parameters/" + generate_utils.getBlendParam(i, 0) - letter_remapped = (-127.5 + encoded[i]) / 127.5 - cfg.client.send_message(addr, letter_remapped) - - # Disable chatbox animations to ensure stability. - osc.waitForSync() - osc_ctrl.disable(cfg.client) - -if __name__ == "__main__": - cfg = AppConfig() - osc = OSCSyncHelper(cfg) - sendMessage("Hello, world! aiueo aiueo aiueo aiueo aiueo eeeeeeeeeeeeeeeeeeeeeeee", cfg, osc) - diff --git a/Scripts/text_wrapping.py b/Scripts/text_wrapping.py deleted file mode 100644 index 7576b78..0000000 --- a/Scripts/text_wrapping.py +++ /dev/null @@ -1,55 +0,0 @@ -#!/usr/bin/env python3 - -class TextWrapper: - def __init__(self, rows, cols): - self.rows = rows - self.cols = cols - - # Split `msg` along line boundaries. Long words tend to just go onto new - # lines. Words that are too long to fit on any line are hyphenated and - # split. - # Lines are padded with space (" ") characters so they're all `self.cols` - # characters long. Pages are padded with lines full of space characters so - # they're all `self.rows` lines long. - def wrap(self, msg: str) -> list[list[str]]: - pages = [] - lines = [] - line = "" - for word in msg.split(): - if len(line) + 1 + len(word) <= self.cols: - if len(line): - line += " " - line += word - continue - # Word won't fit onto this line. End the line. - if len(line): - line += " " * (self.cols - len(line)) - lines.append(line) - line = "" - while len(word) > self.cols: - prefix = word[0:self.cols-1] + "-" - lines.append(prefix) - suffix = word[self.cols-1:] - word = suffix - if len(word): - line = word - if len(line): - line += " " * (self.cols - len(line)) - lines.append(line) - while len(lines): - pages.append(lines[0:self.rows]) - lines = lines[self.rows:] - if len(pages): - num_extra_lines = (self.rows - (len(pages[-1]) % self.rows)) % self.rows - pages[-1] += [" " * self.cols] * num_extra_lines - return pages - -if __name__ == "__main__": - w = TextWrapper(2, 5) - - assert(w.wrap("foo") == [["foo ", " "]]) - assert(w.wrap("foo bar") == [["foo ", "bar "]]) - assert(w.wrap("bagel") == [["bagel", " "]]) - assert(w.wrap("bagels") == [["bage-", "ls "]]) - assert(w.wrap("hot bagels") == [["hot ", "bage-"], ["ls ", " "]]) - diff --git a/Scripts/transcribe_pipeline.py b/Scripts/transcribe_pipeline.py deleted file mode 100644 index 5914afc..0000000 --- a/Scripts/transcribe_pipeline.py +++ /dev/null @@ -1,35 +0,0 @@ -import time - - -class TranscriptCommit: - def __init__(self, - delta: str, - preview: str, - latency_s: float = None, - thresh_at_commit: int = None, - audio: bytes = None, - duration_s: float = None, - start_ts: float = None): - self.delta = delta - self.preview = preview - self.latency_s = latency_s - self.thresh_at_commit = thresh_at_commit - self.audio = audio - # Time at which the commit is generated - self.ts = time.time() - # Time corresponding to the start of the segment - self.start_ts = start_ts - # The duration of the audio segment, in seconds. - self.duration_s = duration_s - - -class StreamingPlugin: - def __init__(self): - pass - - def transform(self, commit: TranscriptCommit) -> TranscriptCommit: - return commit - - def stop(self): - pass - diff --git a/Scripts/transcribe_v2.py b/Scripts/transcribe_v2.py deleted file mode 100644 index e024bae..0000000 --- a/Scripts/transcribe_v2.py +++ /dev/null @@ -1,1172 +0,0 @@ -from browser_src import BrowserSource -from datetime import datetime -from emotes_v2 import EmotesState -from faster_whisper import WhisperModel -from functools import partial -from huggingface_hub import hf_hub_download -from profanity_filter import ProfanityFilter -from pydub import AudioSegment -from sentence_splitter import split_text_into_sentences -from transcribe_pipeline import StreamingPlugin, TranscriptCommit - -import app_config -import argparse -import ctranslate2 -import editdistance -import glob -import keybind_event_machine -import keyboard -import langcodes -import lang_compat -import math -import numpy as np -import os -import osc_ctrl -import pyaudio -import steamvr -import subprocess -import sys -import threading -import time -import transformers -import typing -import vad -import wave -import winsound - -class ThreadControl: - def __init__(self, cfg): - self.cfg = cfg - self.run_app = True - -class AudioStream(): - FORMAT = pyaudio.paInt16 - # Size of each frame (audio sample), in bytes. If you change FORMAT, make - # sure this stays up to date! - FRAME_SZ = 2 - # Frames per second. - FPS = 16000 - CHANNELS = 1 - def __init__(self): - pass - - def getSamples(self) -> bytes: - raise NotImplementedError("getSamples is not implemented!") - -class DiskStream(AudioStream): - def __init__(self, path: str): - fmt = None - if path.endswith(".mp3"): - fmt = "mp3" - elif path.endswith(".wav"): - fmt = "wav" - else: - raise NotImplementedError(f"Requested file type {path} " + \ - "is not supported") - print(f"Loading audio data", file=sys.stderr) - audio = AudioSegment.from_file(path, format=fmt) - audio = audio.set_channels(1) - # TODO(yum) replace manual decimation code with this! - audio = audio.set_frame_rate(16000) - frames = np.array(audio.get_array_of_samples()) - frames = np.int16(frames).tobytes() - - self.frames = frames - - print(f"Loaded data", file=sys.stderr) - - def getSamples(self) -> bytes: - # Give out samples at a fixed rate to minimize - # noise. - give_s = 0.2 - nframes = int(give_s * AudioStream.FPS) - frames = self.frames[0:nframes * AudioStream.FRAME_SZ]; - self.frames = self.frames[nframes * AudioStream.FRAME_SZ:] - - if len(frames) < nframes: - frames += np.zeros(nframes - len(frames), dtype=np.int16).tobytes() - - return frames - -class MicStream(AudioStream): - CHUNK_SZ = 1024 - - def __init__(self, which_mic: str): - self.p = pyaudio.PyAudio() - self.stream = None - self.sample_rate = None - # Each time pyaudio gives us audio data, it's in the form of a chunk of - # samples. We keep these in a list to keep the audio callback as light - # as possible. Whenever downstream layers want data, we collapse the - # list into a single array of data (a bytes object). - self.chunks = [] - # If set, incoming frames are simply discarded. - self.paused = False - - print(f"Finding mic {which_mic}", file=sys.stderr) - self.dumpMicDevices() - - got_match = False - device_index = -1 - if which_mic == "index": - target_str = "Digital Audio Interface" - elif which_mic == "focusrite": - target_str = "Focusrite" - elif which_mic == "motu": - target_str = "In 1-2 (MOTU M Series)" - elif which_mic == "beyond": - target_str = "Microphone (Beyond)" - else: - print(f"Mic {which_mic} requested, treating it as a numerical " + - "device ID", file=sys.stderr) - device_index = int(which_mic) - got_match = True - if not got_match: - info = self.p.get_host_api_info_by_index(0) - numdevices = info.get('deviceCount') - for i in range(0, numdevices): - if (self.p.get_device_info_by_host_api_device_index(0, i).get('maxInputChannels')) > 0: - device_name = self.p.get_device_info_by_host_api_device_index(0, i).get('name') - if target_str in device_name: - print(f"Got matching mic: {device_name}", - file=sys.stderr) - device_index = i - got_match = True - break - if not got_match: - raise KeyError(f"Mic {which_mic} not found") - - info = self.p.get_device_info_by_host_api_device_index(0, device_index) - print(f"Found mic {which_mic}: {info['name']}", file=sys.stderr) - self.sample_rate = int(info['defaultSampleRate']) - print(f"Mic sample rate: {self.sample_rate}", file=sys.stderr) - - self.stream = self.p.open( - rate=self.sample_rate, - channels=AudioStream.CHANNELS, - format=AudioStream.FORMAT, - input=True, - frames_per_buffer=MicStream.CHUNK_SZ, - input_device_index=device_index, - stream_callback=self.onAudioFramesAvailable) - - self.stream.start_stream() - - AudioStream.__init__(self) - - def pause(self, state: bool = True): - self.paused = state - - def dumpMicDevices(self): - info = self.p.get_host_api_info_by_index(0) - numdevices = info.get('deviceCount') - - for i in range(0, numdevices): - if (self.p.get_device_info_by_host_api_device_index(0, i).get('maxInputChannels')) > 0: - device_name = self.p.get_device_info_by_host_api_device_index(0, i).get('name') - print("Input Device id ", i, " - ", device_name) - - def onAudioFramesAvailable(self, - frames, - frame_count, - time_info, - status_flags): - if self.paused: - # Don't literally pause, just start returning silence. This allows - # the `min_segment_age_s` check to work while paused. - n_frames = int(frame_count * AudioStream.FPS / - float(self.sample_rate)) - self.chunks.append(np.zeros(n_frames, - dtype=np.int16).tobytes()) - return (frames, pyaudio.paContinue) - - decimated = b'' - # In pyaudio, a `frame` is a single sample of audio data. - frame_len = AudioStream.FRAME_SZ - next_frame = 0.0 - # The mic probably has a higher sample rate than Whisper wants, so - # decrease the sample rate by dropping samples. Note that this - # algorithm only works if the mic's rate is higher than whisper's - # expected rate. - keep_every = float(self.sample_rate) / AudioStream.FPS - for i in range(frame_count): - if i >= next_frame: - decimated += frames[i*frame_len:(i+1)*frame_len] - next_frame += keep_every - self.chunks.append(decimated) - - return (frames, pyaudio.paContinue) - - # Get audio data and the corresponding timestamp. - def getSamples(self) -> bytes: - chunks = self.chunks - self.chunks = [] - result = b''.join(chunks) - return result - -class AudioCollector: - def __init__(self, stream: AudioStream): - self.stream = stream - self.frames = b'' - # Note: by design, this is the only spot where we anchor our timestamps - # against the real world. This is done to make it possible to profile - # test cases which read from disk (at much faster than real speed) in - # the same way that we profile real-time data. - self.wall_ts = time.time() - - def getAudio(self) -> bytes: - frames = self.stream.getSamples() - if frames: - self.frames += frames - return self.frames - - def dropAudioPrefix(self, dur_s: float) -> bytes: - n_bytes = int(dur_s * AudioStream.FPS) * self.stream.FRAME_SZ - n_bytes = min(n_bytes, len(self.frames)) - cut_portion = self.frames[:n_bytes] - self.frames = self.frames[n_bytes:] - self.wall_ts += float(n_bytes / self.stream.FRAME_SZ) / self.stream.FPS - return cut_portion - - def dropAudioPrefixByFrames(self, dur_frames: int) -> bytes: - n_bytes = dur_frames * self.stream.FRAME_SZ - n_bytes = min(n_bytes, len(self.frames)) - cut_portion = self.frames[:n_bytes] - self.frames = self.frames[n_bytes:] - self.wall_ts += float(n_bytes / self.stream.FRAME_SZ) / self.stream.FPS - return cut_portion - - def keepLast(self, dur_s: float) -> bytes: - drop_len = max(0, self.duration() - dur_s) - return self.dropAudioPrefix(drop_len) - - def dropAudio(self): - self.wall_ts += self.duration() - cut_portion = self.frames - self.frames = b'' - return cut_portion - - def duration(self): - return len(self.frames) / (AudioStream.FPS * self.stream.FRAME_SZ) - - def begin(self): - return self.wall_ts - - def now(self): - return self.begin() + self.duration() - -class AudioCollectorFilter: - def __init__(self, parent: AudioCollector): - self.parent = parent - - def getAudio(self) -> bytes: - return self.parent.getAudio() - def dropAudioPrefix(self, dur_s: float): - return self.parent.dropAudioPrefix(dur_s) - def dropAudioPrefixByFrames(self, dur_frames: int): - return self.parent.dropAudioPrefixByFrames(dur_frames) - def keepLast(self, dur_s): - return self.parent.keepLast(dur_s) - def dropAudio(self): - return self.parent.dropAudio() - def duration(self): - return self.parent.duration() - def begin(self): - return self.parent.begin() - def now(self): - return self.parent.now() - -# Audio collector that enforces a minimum length on its audio data. -class LengthEnforcingAudioCollector(AudioCollectorFilter): - def __init__(self, parent: AudioCollector, min_duration_s: float): - AudioCollectorFilter.__init__(self, parent) - self.min_duration_s = min_duration_s - - def getAudio(self) -> bytes: - audio = self.parent.getAudio() - min_duration_frames = int(self.min_duration_s * AudioStream.FPS) - pad_len_frames = max(0, min_duration_frames - int(len(audio) / - AudioStream.FRAME_SZ)) - pad = np.zeros(pad_len_frames, dtype=np.int16).tobytes() - return pad + audio - -class NormalizingAudioCollector(AudioCollectorFilter): - def __init__(self, parent: AudioCollector): - AudioCollectorFilter.__init__(self, parent) - - def getAudio(self) -> bytes: - audio = self.parent.getAudio() - - audio = AudioSegment(audio, sample_width=AudioStream.FRAME_SZ, - frame_rate=AudioStream.FPS, channels=AudioStream.CHANNELS) - audio = audio.normalize() - - frames = np.array(audio.get_array_of_samples()) - frames = np.int16(frames).tobytes() - - return frames - -class CompressingAudioCollector(AudioCollectorFilter): - def __init__(self, parent: AudioCollector): - AudioCollectorFilter.__init__(self, parent) - - def getAudio(self) -> bytes: - audio = self.parent.getAudio() - - audio = AudioSegment(audio, sample_width=AudioStream.FRAME_SZ, - frame_rate=AudioStream.FPS, channels=AudioStream.CHANNELS) - # subtle compression has a slight positive effect on my benchmark - audio = audio.compress_dynamic_range(threshold=-10, ratio=2.0) - - frames = np.array(audio.get_array_of_samples()) - frames = np.int16(frames).tobytes() - - return frames - -class AudioSegmenter: - def __init__(self, - min_silence_ms=250, - max_speech_s=5): - self.vad_options = vad.VadOptions( - min_silence_duration_ms=min_silence_ms, - max_speech_duration_s=max_speech_s) - pass - - def segmentAudio(self, audio: bytes): - audio = np.frombuffer(audio, - dtype=np.int16).flatten().astype(np.float32) / 32768.0 - return vad.get_speech_timestamps(audio, vad_options=self.vad_options) - - # Returns the stable cutoff (if any) and whether there are any segments. - def getStableCutoff(self, audio: bytes) -> typing.Tuple[int, bool]: - min_delta_frames = int((self.vad_options.min_silence_duration_ms * - AudioStream.FPS) / 1000.0) - cutoff = None - - last_end = None - segments = self.segmentAudio(audio) - - for i in range(len(segments)): - s = segments[i] - #print(f"s: {s}") - #print(f"last_end: {last_end}") - - if last_end: - delta_frames = s['start'] - last_end - #print(f"delta frames: {delta_frames}") - if delta_frames > min_delta_frames: - cutoff = s['start'] - else: - last_end = s['end'] - - if i == len(segments) - 1: - now = int(len(audio) / AudioStream.FRAME_SZ) - #print(f"now: {now}") - #print(f"min d: {min_delta_frames}") - delta_frames = now - s['end'] - if delta_frames > min_delta_frames: - cutoff = now - int(min_delta_frames / 2) - - return (cutoff, len(segments) > 0) - -# A segment of transcribed audio. `start_ts` and `end_ts` are floating point -# number of seconds since the beginning of audio data. -class Segment: - def __init__(self, - transcript: str, - start_ts: float, - end_ts: float, - wall_ts: float, - avg_logprob: float, - no_speech_prob: float, - compression_ratio: float): - self.transcript = transcript - # start_ts, end_ts are timestamps in seconds relative to `wall_ts`. - self.start_ts = start_ts - self.end_ts = end_ts - # wall_ts is the time.time() at which the oldest audio sample leading - # to this transcript was collected. - self.wall_ts = wall_ts - self.avg_logprob = avg_logprob - self.no_speech_prob = no_speech_prob - self.compression_ratio = compression_ratio - - def __str__(self): - ts = f"(ts: {self.start_ts}-{self.end_ts}) " - - wall_ts_start = datetime.utcfromtimestamp(self.start_ts + self.wall_ts).strftime('%H:%M:%S') - wall_ts_end = datetime.utcfromtimestamp(self.end_ts + self.wall_ts).strftime('%H:%M:%S') - wall_ts = f"(wall ts: {wall_ts_start}-{wall_ts_end}) " - - no_speech = f"(no_speech: {self.no_speech_prob}) " - avg_logprob = f"(avg_logprob: {self.avg_logprob}) " - return f"{self.transcript} " + ts + wall_ts + no_speech + avg_logprob - -class Whisper: - def __init__(self, - collector: AudioCollector, - cfg: typing.Dict): - self.collector = collector - self.model = None - self.cfg = cfg - - abspath = os.path.abspath(__file__) - my_dir = os.path.dirname(abspath) - parent_dir = os.path.dirname(my_dir) - - model_str = cfg["model"] - model_root = os.path.join(parent_dir, "Models", - os.path.normpath(model_str)) - print(f"Model {cfg['model']} will be saved to {model_root}", - file=sys.stderr) - - model_device = "cuda" - if cfg["use_cpu"]: - model_device = "cpu" - - already_downloaded = os.path.exists(model_root) - - self.model = WhisperModel(model_str, - device = model_device, - device_index = cfg["gpu_idx"], - compute_type = cfg["compute_type"], - download_root = model_root, - local_files_only = already_downloaded) - - def transcribe(self, frames: bytes = None) -> typing.List[Segment]: - if frames is None: - frames = self.collector.getAudio() - # Convert from signed 16-bit int [-32768, 32767] to signed 32-bit float on - # [-1, 1]. - audio = np.frombuffer(frames, - dtype=np.int16).flatten().astype(np.float32) / 32768.0 - - t0 = time.time() - segments, info = self.model.transcribe( - audio, - language = langcodes.find(self.cfg["language"]).language, - vad_filter = True, - temperature=0.0, - without_timestamps = False) - res = [] - for s in segments: - # Manual touchup. I see a decent number of hallucinations sneaking - # in with high `no_speech_prob` and modest `avg_logprob`. - if s.no_speech_prob > 0.6 and s.avg_logprob < -0.5: - if cfg["enable_debug_mode"]: - print(f"Drop probable hallucination (case 1) " + - f"(text='{s.text}', " + - f"no_speech_prob={s.no_speech_prob}, " + - f"avg_logprob={s.avg_logprob})", file=sys.stderr) - continue - # Another touchup targeted at the vexatious "thanks for watching!" - # hallucination. This triggers a lot when listening to - # instrumental/electronic music. - if s.no_speech_prob > 0.15 and s.avg_logprob < -0.7: - if cfg["enable_debug_mode"]: - print(f"Drop probable hallucination (case 2) " + - f"(text='{s.text}', " + - f"no_speech_prob={s.no_speech_prob}, " + - f"avg_logprob={s.avg_logprob})", file=sys.stderr) - continue - if cfg["enable_debug_mode"]: - print(f"s get: {s}") - if s.avg_logprob < -1.0: - continue - if s.compression_ratio > 2.4: - continue - res.append(Segment(s.text, s.start, s.end, - self.collector.begin(), - s.avg_logprob, s.no_speech_prob, s.compression_ratio)) - t1 = time.time() - if cfg["enable_debug_mode"]: - print(f"Transcription latency (s): {t1 - t0}") - return res - -def saveAudio(audio: bytes, path: str): - with wave.open(path, 'wb') as wf: - print(f"Saving audio to {path}", file=sys.stderr) - wf.setnchannels(AudioStream.CHANNELS) - wf.setsampwidth(AudioStream.FRAME_SZ) - wf.setframerate(AudioStream.FPS) - wf.writeframes(audio) - -class VadCommitter: - def __init__(self, - cfg: typing.Dict, - collector: AudioCollector, - whisper: Whisper, - segmenter: AudioSegmenter): - self.cfg = cfg - self.collector = collector - self.whisper = whisper - self.segmenter = segmenter - - def getDelta(self) -> TranscriptCommit: - audio = self.collector.getAudio() - stable_cutoff, has_audio = self.segmenter.getStableCutoff(audio) - - delta = "" - commit_audio = None - latency_s = None - duration_s = self.collector.duration() - start_ts = self.collector.begin() - - if has_audio and stable_cutoff: - #print(f"stable cutoff get: {stable_cutoff}", file=sys.stderr) - latency_s = self.collector.now() - self.collector.begin() - duration_s = stable_cutoff / AudioStream.FPS - start_ts = self.collector.begin() - commit_audio = self.collector.dropAudioPrefixByFrames(stable_cutoff) - - segments = self.whisper.transcribe(commit_audio) - delta = ''.join(s.transcript for s in segments) - audio = self.collector.getAudio() - if cfg["enable_debug_mode"]: - for s in segments: - print(f"commit segment: {s}", file=sys.stderr) - print(f"delta get: {delta}", file=sys.stderr) - - if False: - ts = datetime.fromtimestamp(self.collector.now() - latency_s) - filename = str(ts.strftime('%Y_%m_%d__%H-%M-%S')) + ".wav" - saveAudio(commit_audio, filename) - - preview = "" - if self.cfg["enable_previews"] and has_audio: - segments = self.whisper.transcribe(audio) - preview = "".join(s.transcript for s in segments) - - if not has_audio: - #print("VAD detects no audio, skip transcription", file=sys.stderr) - self.collector.keepLast(1.0) - - return TranscriptCommit( - delta.strip(), - preview.strip(), - latency_s, - audio=audio, - duration_s=duration_s, - start_ts=start_ts) - -def install_in_venv(pkgs: typing.List[str]) -> bool: - pkgs_str = " ".join(pkgs) - print(f"Installing {pkgs_str}") - pip_proc = subprocess.Popen( - f"Resources/Python/python.exe -m pip install {pkgs_str} --no-warn-script-location".split(), - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - pip_stdout, pip_stderr = pip_proc.communicate() - pip_stdout = pip_stdout.decode("utf-8") - pip_stderr = pip_stderr.decode("utf-8") - print(pip_stdout, file=sys.stderr) - print(pip_stderr, file=sys.stderr) - if pip_proc.returncode != 0: - print(f"`pip install {pkgs_str}` exited with {pip_proc.returncode}", - file=sys.stderr) - return False - return True - -class TranslationPlugin(StreamingPlugin): - def __init__(self, cfg): - lang_bits = cfg["language_target"].split(" | ") - self.cfg = cfg - self.language_target = None - self.translator = None - self.tokenizer = None - if len(lang_bits) != 2: - return - self.language_target = lang_bits[1] - - print("Translation requested", file=sys.stderr) - # The ctranslate2 model converter needs torch. Grr. - if not install_in_venv(["torch==2.2.2"]): - return - - output_dir = "Resources/" + cfg["model_translation"] - # Provided by ctranslate2 Python package - cmd = "ct2-transformers-converter.exe --model facebook/" + \ - cfg["model_translation"] + " --output_dir " + output_dir - - print(f"Fetching translation algorithm ({cfg['model_translation']})", - file=sys.stderr) - if not os.path.exists(output_dir): - ct2_proc = subprocess.Popen( - cmd.split(), - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - ct2_stdout, ct2_stderr = ct2_proc.communicate() - ct2_stdout = ct2_stdout.decode("utf-8") - ct2_stderr = ct2_stderr.decode("utf-8") - print(ct2_stdout, file=sys.stderr) - print(ct2_stderr, file=sys.stderr) - if ct2_proc.returncode != 0: - print(f"Failed to get NLLB model: ct2 process exited with " - "{ct2_proc.returncode}", file=sys.stderr) - print(f"Using model at {output_dir}", file=sys.stderr) - - model_device = "cuda" - if cfg["use_cpu"]: - model_device = "cpu" - - self.translator = ctranslate2.Translator(output_dir, - device = model_device, - device_index = cfg["gpu_idx"], - compute_type = cfg["compute_type"]) - - whisper_lang = cfg["language"] - nllb_lang = lang_compat.whisper_to_nllb[whisper_lang] - - self.tokenizer = transformers.AutoTokenizer.from_pretrained( - "facebook/" + cfg["model_translation"], - src_lang=nllb_lang) - - print(f"Translation ready to go", file=sys.stderr) - - def transform(self, commit: TranscriptCommit) -> TranscriptCommit: - if not self.language_target: - return commit - - def _translate_text(text: str) -> str: - - whisper_lang = self.cfg["language"] - nllb_lang = lang_compat.whisper_to_nllb[whisper_lang] - ss_lang = lang_compat.nllb_to_ss[nllb_lang] - sentences = split_text_into_sentences(text, language=ss_lang) - - translated_sentences = [] - for sentence in sentences: - source = self.tokenizer.convert_ids_to_tokens(self.tokenizer.encode(sentence)) - target_prefix = [self.language_target] - results = self.translator.translate_batch([source], target_prefix=[target_prefix]) - target = results[0].hypotheses[0][1:] - translated_sentence = self.tokenizer.decode(self.tokenizer.convert_tokens_to_ids(target)) - translated_sentences.append(translated_sentence) - translated = " ".join(translated_sentences) - if cfg["enable_orig_lang"] and len(sentences) > 0: - translated += f" ({text})" - return translated - - commit.delta = _translate_text(commit.delta) - commit.preview = _translate_text(commit.preview) - return commit - -class LowercasePlugin(StreamingPlugin): - def __init__(self, cfg): - self.cfg = cfg - - def transform(self, commit: TranscriptCommit) -> TranscriptCommit: - if self.cfg["enable_lowercase_filter"]: - commit.delta = commit.delta.lower() - commit.preview = commit.preview.lower() - return commit - -class UppercasePlugin(StreamingPlugin): - def __init__(self, cfg): - self.cfg = cfg - - def transform(self, commit: TranscriptCommit) -> TranscriptCommit: - if self.cfg["enable_uppercase_filter"]: - commit.delta = commit.delta.upper() - commit.preview = commit.preview.upper() - return commit - -class UwuPlugin(StreamingPlugin): - def __init__(self, cfg): - self.cfg = cfg - - def transform(self, commit: TranscriptCommit) -> TranscriptCommit: - if self.cfg["enable_uwu_filter"]: - def _to_uwu(s: str) -> str: - uwu_proc = subprocess.Popen(["Resources/Uwu/Uwwwu.exe", s], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - uwu_stdout, uwu_stderr = uwu_proc.communicate() - uwu_text = uwu_stdout.decode("utf-8") - uwu_text = uwu_text.replace("\n", "") - uwu_text = uwu_text.replace("\r", "") - if uwu_text.isspace(): - return "" - # Guarantee that the segment starts with a single space and - # doesn't end with whitespace. - uwu_text = " " + uwu_text.lstrip().rstrip() - return uwu_text - commit.delta = _to_uwu(commit.delta) - commit.preview = _to_uwu(commit.preview) - return commit - -class ProfanityPlugin(StreamingPlugin): - def __init__(self, cfg): - self.cfg = cfg - en_profanity_path = os.path.abspath("Resources/Profanity/en") - self.filter = ProfanityFilter(en_profanity_path) - if cfg["enable_profanity_filter"]: - self.filter.load() - - def transform(self, commit: TranscriptCommit) -> TranscriptCommit: - if self.cfg["enable_profanity_filter"]: - commit.delta = self.filter.filter(commit.delta) - commit.preview = self.filter.filter(commit.preview) - return commit - -class PresentationFilter: - def __init__(self): - pass - - def transform(self, transcript: str, preview: str) -> typing.Tuple[str, str]: - return transcript, preview - - def stop(self): - pass - -class TrailingPeriodFilter(PresentationFilter): - def __init__(self, cfg): - self.cfg = cfg - - def transform(self, transcript: str, preview: str) -> typing.Tuple[str, str]: - if self.cfg["remove_trailing_period"]: - def _remove_trailing_period(s: str) -> str: - if len(s) > 0 and s[-1] == '.' and not s.endswith("..."): - s = s[0:len(s)-1] - return s - if len(preview) == 0: - print("here") - transcript = _remove_trailing_period(transcript) - else: - print("there") - preview = _remove_trailing_period(preview) - return transcript, preview - -class OscPager: - def __init__(self, cfg): - self.osc_state = osc_ctrl.OscState(cfg["chars_per_sync"], - cfg["rows"], - cfg["cols"], - cfg["bytes_per_char"]) - self.cfg = cfg - self.next_sync_window = time.time() - - def page(self, text): - if self.cfg["use_builtin"]: - osc_ctrl.pageMessageBuiltin(self.cfg, self.osc_state, text) - self.bumpSyncWindow(amount_s=1.5) - else: - osc_ctrl.pageMessage(self.cfg, self.osc_state, text, EmotesState()) - self.bumpSyncWindow() - - def bumpSyncWindow(self, amount_s=osc_ctrl.SYNC_DELAY_S): - self.next_sync_window = time.time() + amount_s - - def getSyncWindow(self): - while time.time() < self.next_sync_window: - time.sleep(0.01) - - def clear(self): - osc_ctrl.clear(self.osc_state) - self.bumpSyncWindow() - - def toggleBoard(self, state: bool): - osc_ctrl.toggleBoard(self.osc_state.client, state) - self.bumpSyncWindow() - - def lockWorld(self, state: bool): - osc_ctrl.lockWorld(self.osc_state.client, state) - self.bumpSyncWindow() - - def ellipsis(self, state: bool): - osc_ctrl.ellipsis(self.osc_state.client, state) - self.bumpSyncWindow() - -def transcriptionThread(ctrl: ThreadControl): - last_stable_commit = None - - while ctrl.run_app: - time.sleep(ctrl.cfg["transcription_loop_delay_ms"] / 1000.0); - - op = None - - commit = ctrl.committer.getDelta() - - for plugin in ctrl.plugins: - commit = plugin.transform(commit) - - if len(commit.delta) > 0 or len(commit.preview) > 0: - # Avoid re-sending text after long pauses. User controls the length - # of the pause in the UI. - if ctrl.cfg["reset_after_silence_s"] > 0: - silence_duration = 0 - if last_stable_commit: - last_commit_end_ts = \ - last_stable_commit.start_ts + \ - last_stable_commit.duration_s - silence_duration = commit.start_ts - last_commit_end_ts - if silence_duration > ctrl.cfg["reset_after_silence_s"]: - print(f"Resetting transcript after {silence_duration}-second " - "silence", file=sys.stderr) - ctrl.transcript = "" - ctrl.preview = "" - if commit.delta: - last_stable_commit = commit - - # Hard-cap displayed transcript length at 4k characters to prevent - # runaway memory use in UI. Keep the full transcript to avoid - # breaking OSC pager. - transcript = ctrl.transcript[-4096:] + commit.delta - preview = commit.preview - - for filt in ctrl.filters: - transcript, preview = filt.transform(transcript, preview) - - try: - print(f"Transcript: {transcript}") - except UnicodeEncodeError: - print("Failed to encode transcript - discarding delta", - file=sys.stderr) - continue - try: - print(f"Preview: {preview}") - except UnicodeEncodeError: - print("Failed to encode preview - discarding", file=sys.stderr) - - if cfg["enable_debug_mode"]: - print(f"commit latency: {commit.latency_s}", file=sys.stderr) - print(f"commit thresh: {commit.thresh_at_commit}", - file=sys.stderr) - - if len(ctrl.transcript) > 0 and \ - (not ctrl.transcript.endswith(' ')) and \ - (not commit.delta.startswith(' ')): - commit.delta = ' ' + commit.delta - if len(commit.delta) > 0 and \ - (not commit.delta.endswith(' ')) and \ - (not commit.preview.startswith(' ')): - commit.preview = ' ' + commit.preview - - ctrl.transcript += commit.delta - ctrl.preview = ctrl.transcript + commit.preview - for plugin in ctrl.plugins: - plugin.stop() - for filt in ctrl.filters: - filt.stop() - -def vrInputThread(ctrl: ThreadControl): - RECORD_STATE = 0 - PAUSE_STATE = 1 - state = PAUSE_STATE - - hand_id = ctrl.cfg["button"].split()[0] - button_id = ctrl.cfg["button"].split()[1] - - # Rough description of state machine: - # Single short press: toggle transcription - # Medium press: dismiss custom chatbox - # Long press: update chatbox in place - # Medium press + long press: type transcription - - last_rising = time.time() - last_medium_press_end = 0 - - waveform0 = os.path.abspath("Resources/Sounds/Noise_On_Quiet.wav") - waveform1 = os.path.abspath("Resources/Sounds/Noise_Off_Quiet.wav") - waveform2 = os.path.abspath("Resources/Sounds/Dismiss_Noise_Quiet.wav") - waveform3 = os.path.abspath("Resources/Sounds/KB_Noise_Off_Quiet.wav") - - button_generator = steamvr.pollButtonPress(hand=hand_id, button=button_id, - ctrl=ctrl) - while ctrl.run_app: - time.sleep(0.01) - try: - event = next(button_generator) - except StopIteration: - break - - if event.opcode == steamvr.EVENT_RISING_EDGE: - last_rising = time.time() - - if state == PAUSE_STATE: - ctrl.stream.pause(False) - ctrl.stream.getSamples() - - elif event.opcode == steamvr.EVENT_FALLING_EDGE: - now = time.time() - if now - last_rising > 1.5: - # Long press: treat as the end of transcription. - state = PAUSE_STATE - - ctrl.stream.pause(True) - - if last_rising - last_medium_press_end < 1.0: - # Type transcription - if ctrl.cfg["enable_local_beep"]: - winsound.PlaySound(waveform3, winsound.SND_FILENAME | winsound.SND_ASYNC) - pass - # TODO(yum) this is broken! Audio is not being collected - # while paused anymore. - #keyboard.write(ctrl.preview) - else: - if ctrl.cfg["enable_local_beep"]: - winsound.PlaySound(waveform1, winsound.SND_FILENAME | winsound.SND_ASYNC) - pass - - elif now - last_rising > 0.5: - # Medium press - print("CLEARING", file=sys.stderr) - last_medium_press_end = now - state = PAUSE_STATE - - if ctrl.cfg["enable_local_beep"]: - winsound.PlaySound(waveform2, winsound.SND_FILENAME | winsound.SND_ASYNC) - pass - - if not ctrl.cfg["use_builtin"]: - ctrl.pager.getSyncWindow() - ctrl.pager.toggleBoard(False) - - # Flush the *entire* pipeline. - ctrl.stream.pause(True) - ctrl.stream.getSamples() - ctrl.collector.dropAudio() - ctrl.pager.clear() - if ctrl.cfg["enable_lock_at_spawn"]: - # Give the board 0.5 seconds to disappear before unlocking from - # world space. - time.sleep(0.5) - ctrl.pager.lockWorld(False) - else: - # Short hold - if state == RECORD_STATE: - print("PAUSED", file=sys.stderr) - state = PAUSE_STATE - if not ctrl.cfg["use_builtin"] and not ctrl.cfg["enable_lock_at_spawn"]: - ctrl.pager.getSyncWindow() - ctrl.pager.lockWorld(True) - - ctrl.stream.pause(True) - - if ctrl.cfg["enable_local_beep"]: - winsound.PlaySound(waveform1, winsound.SND_FILENAME | winsound.SND_ASYNC) - pass - elif state == PAUSE_STATE: - print("RECORDING", file=sys.stderr) - state = RECORD_STATE - if not ctrl.cfg["use_builtin"]: - ctrl.pager.getSyncWindow() - ctrl.pager.toggleBoard(True) - ctrl.pager.lockWorld(ctrl.cfg["enable_lock_at_spawn"]) - ctrl.pager.ellipsis(True) - if ctrl.cfg["reset_on_toggle"]: - if ctrl.cfg["enable_debug_mode"]: - print("Toggle detected, dropping transcript (3)", - file=sys.stderr) - ctrl.transcript = "" - ctrl.preview = "" - #audio_state.drop_transcription = True - else: - if ctrl.cfg["enable_debug_mode"]: - print("Toggle detected, committing preview text (3)", file=sys.stderr) - #audio_state.text += audio_state.preview_text - - ctrl.stream.pause(False) - ctrl.pager.clear() - - if ctrl.cfg["enable_local_beep"]: - winsound.PlaySound(waveform0, winsound.SND_FILENAME | winsound.SND_ASYNC) - pass - -def kbInputThread(ctrl: ThreadControl): - machine = keybind_event_machine.KeybindEventMachine(ctrl.cfg["keybind"]) - last_press_time = 0 - - # double pressing the keybind - double_press_timeout = 0.5 - - RECORD_STATE = 0 - PAUSE_STATE = 1 - state = PAUSE_STATE - - waveform0 = os.path.abspath("Resources/Sounds/Noise_On_Quiet.wav") - waveform1 = os.path.abspath("Resources/Sounds/Noise_Off_Quiet.wav") - waveform2 = os.path.abspath("Resources/Sounds/Dismiss_Noise_Quiet.wav") - waveform3 = os.path.abspath("Resources/Sounds/KB_Noise_Off_Quiet.wav") - - while ctrl.run_app: - time.sleep(0.01) - - cur_press_time = machine.getNextPressTime() - if cur_press_time == 0: - continue - - EVENT_SINGLE_PRESS = 0 - EVENT_DOUBLE_PRESS = 1 - if last_press_time == 0: - event = EVENT_SINGLE_PRESS - elif cur_press_time - last_press_time < double_press_timeout: - event = EVENT_DOUBLE_PRESS - else: - event = EVENT_SINGLE_PRESS - last_press_time = cur_press_time - - if event == EVENT_DOUBLE_PRESS: - print("CLEARING", file=sys.stderr) - state = PAUSE_STATE - - if ctrl.cfg["enable_local_beep"]: - winsound.PlaySound(waveform2, winsound.SND_FILENAME | winsound.SND_ASYNC) - pass - - if not ctrl.cfg["use_builtin"]: - ctrl.pager.getSyncWindow() - ctrl.pager.toggleBoard(False) - - # Flush the *entire* pipeline. - ctrl.stream.pause(True) - ctrl.stream.getSamples() - ctrl.collector.dropAudio() - ctrl.pager.clear() - if ctrl.cfg["enable_lock_at_spawn"]: - # Give the board 0.5 seconds to disappear before unlocking from - # world space. - time.sleep(0.5) - ctrl.pager.lockWorld(False) - continue - - # Short hold - if state == RECORD_STATE: - print("PAUSED", file=sys.stderr) - state = PAUSE_STATE - if not ctrl.cfg["use_builtin"] and not ctrl.cfg["enable_lock_at_spawn"]: - ctrl.pager.getSyncWindow() - ctrl.pager.lockWorld(True) - - ctrl.stream.pause(True) - - if ctrl.cfg["enable_local_beep"]: - winsound.PlaySound(waveform1, winsound.SND_FILENAME | winsound.SND_ASYNC) - pass - elif state == PAUSE_STATE: - print("RECORDING", file=sys.stderr) - state = RECORD_STATE - if not ctrl.cfg["use_builtin"]: - ctrl.pager.getSyncWindow() - ctrl.pager.toggleBoard(True) - ctrl.pager.lockWorld(ctrl.cfg["enable_lock_at_spawn"]) - ctrl.pager.ellipsis(True) - if ctrl.cfg["reset_on_toggle"]: - if ctrl.cfg["enable_debug_mode"]: - print("Toggle detected, dropping transcript (2)", - file=sys.stderr) - ctrl.transcript = "" - ctrl.preview = "" - else: - if ctrl.cfg["enable_debug_mode"]: - print("Toggle detected, committing preview text (2)", - file=sys.stderr) - #audio_state.text += audio_state.preview_text - - ctrl.stream.pause(False) - ctrl.pager.clear() - - if ctrl.cfg["enable_local_beep"]: - winsound.PlaySound(waveform0, winsound.SND_FILENAME | winsound.SND_ASYNC) - pass - -def oscThread(ctrl: ThreadControl): - while ctrl.run_app: - ctrl.pager.getSyncWindow() - ctrl.pager.page(ctrl.preview) - time.sleep(0.01) - -def run(cfg): - stream = MicStream(cfg["microphone"]) - - collector = AudioCollector(stream) - #collector = LengthEnforcingAudioCollector(collector, 5.0) - collector = NormalizingAudioCollector(collector) - collector = CompressingAudioCollector(collector) - whisper = Whisper(collector, cfg) - segmenter = AudioSegmenter(min_silence_ms=cfg["min_silence_duration_ms"], - max_speech_s=cfg["max_speech_duration_s"]) - committer = VadCommitter(cfg, collector, whisper, segmenter) - pager = OscPager(cfg) - - ctrl = ThreadControl(cfg) - ctrl.stream = stream - ctrl.collector = collector - ctrl.whisper = whisper - ctrl.committer = committer - - ctrl.plugins = [] - ctrl.plugins.append(TranslationPlugin(cfg)) - ctrl.plugins.append(UppercasePlugin(cfg)) - ctrl.plugins.append(LowercasePlugin(cfg)) - ctrl.plugins.append(ProfanityPlugin(cfg)) - ctrl.plugins.append(UwuPlugin(cfg)) - ctrl.plugins.append(BrowserSource(cfg)) - - ctrl.filters = [] - ctrl.filters.append(TrailingPeriodFilter(cfg)) - - ctrl.pager = pager - ctrl.transcript = "" - ctrl.preview = "" - - transcribe_audio_thd = threading.Thread(target=transcriptionThread, args=[ctrl]) - transcribe_audio_thd.daemon = True - transcribe_audio_thd.start() - - vr_input_thd = threading.Thread(target=vrInputThread, args=[ctrl]) - vr_input_thd.daemon = True - vr_input_thd.start() - - kb_input_thd = threading.Thread(target=kbInputThread, args=[ctrl]) - kb_input_thd.daemon = True - kb_input_thd.start() - - osc_thd = threading.Thread(target=oscThread, args=[ctrl]) - osc_thd.daemon = True - osc_thd.start() - - for line in sys.stdin: - if "exit" in line or "quit" in line: - print("Exit requested", file=sys.stderr) - break - - ctrl.run_app = False - print("Join transcription thread", file=sys.stderr) - transcribe_audio_thd.join() - print("Join vr input thread", file=sys.stderr) - vr_input_thd.join() - print("Join kb input thread", file=sys.stderr) - kb_input_thd.join() - print("Join osc thread", file=sys.stderr) - osc_thd.join() - print("Done", file=sys.stderr) - -if __name__ == "__main__": - sys.stdout.reconfigure(encoding="utf-8") - - parser = argparse.ArgumentParser() - parser.add_argument("--config", type=str, help="Path to app config YAML file.") - args = parser.parse_args() - - cfg = app_config.getConfig(args.config) - - experiments = [ - ("Evaluate/declaration_short/audio.mp3", - "Evaluate/declaration_short/control.txt"), - ("Evaluate/moist/audio.mp3", - "Evaluate/moist/control.txt"), - ("Evaluate/vei/audio.mp3", - "Evaluate/vei/control.txt"), - ] - - if False: - sum = 0 - for audio, control in experiments: - print(f"Run experiment {audio} :: {control}", file=sys.stderr) - sum += evaluate(cfg, audio, control) - print(f"Total score: {sum}", file=sys.stderr) - else: - #optimize(cfg, experiments) - run(cfg) - diff --git a/Scripts/vad.py b/Scripts/vad.py deleted file mode 100644 index 25f0ad0..0000000 --- a/Scripts/vad.py +++ /dev/null @@ -1,315 +0,0 @@ -# MIT License -# -# Copyright (c) 2023 Guillaume Klein -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. - -import bisect -import functools -import os -import warnings - -from typing import List, NamedTuple, Optional - -import numpy as np - - -# The code below is adapted from https://github.com/snakers4/silero-vad. -class VadOptions(NamedTuple): - """VAD options. - - Attributes: - threshold: Speech threshold. Silero VAD outputs speech probabilities for each audio chunk, - probabilities ABOVE this value are considered as SPEECH. It is better to tune this - parameter for each dataset separately, but "lazy" 0.5 is pretty good for most datasets. - min_speech_duration_ms: Final speech chunks shorter min_speech_duration_ms are thrown out. - max_speech_duration_s: Maximum duration of speech chunks in seconds. Chunks longer - than max_speech_duration_s will be split at the timestamp of the last silence that - lasts more than 100ms (if any), to prevent aggressive cutting. Otherwise, they will be - split aggressively just before max_speech_duration_s. - min_silence_duration_ms: In the end of each speech chunk wait for min_silence_duration_ms - before separating it - window_size_samples: Audio chunks of window_size_samples size are fed to the silero VAD model. - WARNING! Silero VAD models were trained using 512, 1024, 1536 samples for 16000 sample rate. - Values other than these may affect model performance!! - speech_pad_ms: Final speech chunks are padded by speech_pad_ms each side - """ - - threshold: float = 0.5 - min_speech_duration_ms: int = 250 - max_speech_duration_s: float = float("inf") - min_silence_duration_ms: int = 2000 - window_size_samples: int = 1024 - speech_pad_ms: int = 400 - - -def get_speech_timestamps( - audio: np.ndarray, - vad_options: Optional[VadOptions] = None, - **kwargs, -) -> List[dict]: - """This method is used for splitting long audios into speech chunks using silero VAD. - - Args: - audio: One dimensional float array. - vad_options: Options for VAD processing. - kwargs: VAD options passed as keyword arguments for backward compatibility. - - Returns: - List of dicts containing begin and end samples of each speech chunk. - """ - if vad_options is None: - vad_options = VadOptions(**kwargs) - - threshold = vad_options.threshold - min_speech_duration_ms = vad_options.min_speech_duration_ms - max_speech_duration_s = vad_options.max_speech_duration_s - min_silence_duration_ms = vad_options.min_silence_duration_ms - window_size_samples = vad_options.window_size_samples - speech_pad_ms = vad_options.speech_pad_ms - - if window_size_samples not in [512, 1024, 1536]: - warnings.warn( - "Unusual window_size_samples! Supported window_size_samples:\n" - " - [512, 1024, 1536] for 16000 sampling_rate" - ) - - sampling_rate = 16000 - min_speech_samples = sampling_rate * min_speech_duration_ms / 1000 - speech_pad_samples = sampling_rate * speech_pad_ms / 1000 - max_speech_samples = ( - sampling_rate * max_speech_duration_s - - window_size_samples - - 2 * speech_pad_samples - ) - min_silence_samples = sampling_rate * min_silence_duration_ms / 1000 - min_silence_samples_at_max_speech = sampling_rate * 98 / 1000 - - audio_length_samples = len(audio) - - model = get_vad_model() - state = model.get_initial_state(batch_size=1) - - speech_probs = [] - for current_start_sample in range(0, audio_length_samples, window_size_samples): - chunk = audio[current_start_sample : current_start_sample + window_size_samples] - if len(chunk) < window_size_samples: - chunk = np.pad(chunk, (0, int(window_size_samples - len(chunk)))) - speech_prob, state = model(chunk, state, sampling_rate) - speech_probs.append(speech_prob) - - triggered = False - speeches = [] - current_speech = {} - neg_threshold = threshold - 0.15 - - # to save potential segment end (and tolerate some silence) - temp_end = 0 - # to save potential segment limits in case of maximum segment size reached - prev_end = next_start = 0 - - for i, speech_prob in enumerate(speech_probs): - if (speech_prob >= threshold) and temp_end: - temp_end = 0 - if next_start < prev_end: - next_start = window_size_samples * i - - if (speech_prob >= threshold) and not triggered: - triggered = True - current_speech["start"] = window_size_samples * i - continue - - if ( - triggered - and (window_size_samples * i) - current_speech["start"] > max_speech_samples - ): - if prev_end: - current_speech["end"] = prev_end - speeches.append(current_speech) - current_speech = {} - # previously reached silence (< neg_thres) and is still not speech (< thres) - if next_start < prev_end: - triggered = False - else: - current_speech["start"] = next_start - prev_end = next_start = temp_end = 0 - else: - current_speech["end"] = window_size_samples * i - speeches.append(current_speech) - current_speech = {} - prev_end = next_start = temp_end = 0 - triggered = False - continue - - if (speech_prob < neg_threshold) and triggered: - if not temp_end: - temp_end = window_size_samples * i - # condition to avoid cutting in very short silence - if (window_size_samples * i) - temp_end > min_silence_samples_at_max_speech: - prev_end = temp_end - if (window_size_samples * i) - temp_end < min_silence_samples: - continue - else: - current_speech["end"] = temp_end - if ( - current_speech["end"] - current_speech["start"] - ) > min_speech_samples: - speeches.append(current_speech) - current_speech = {} - prev_end = next_start = temp_end = 0 - triggered = False - continue - - if ( - current_speech - and (audio_length_samples - current_speech["start"]) > min_speech_samples - ): - current_speech["end"] = audio_length_samples - speeches.append(current_speech) - - for i, speech in enumerate(speeches): - if i == 0: - speech["start"] = int(max(0, speech["start"] - speech_pad_samples)) - if i != len(speeches) - 1: - silence_duration = speeches[i + 1]["start"] - speech["end"] - if silence_duration < 2 * speech_pad_samples: - speech["end"] += int(silence_duration // 2) - speeches[i + 1]["start"] = int( - max(0, speeches[i + 1]["start"] - silence_duration // 2) - ) - else: - speech["end"] = int( - min(audio_length_samples, speech["end"] + speech_pad_samples) - ) - speeches[i + 1]["start"] = int( - max(0, speeches[i + 1]["start"] - speech_pad_samples) - ) - else: - speech["end"] = int( - min(audio_length_samples, speech["end"] + speech_pad_samples) - ) - - return speeches - - -def collect_chunks(audio: np.ndarray, chunks: List[dict]) -> np.ndarray: - """Collects and concatenates audio chunks.""" - if not chunks: - return np.array([], dtype=np.float32) - - return np.concatenate([audio[chunk["start"] : chunk["end"]] for chunk in chunks]) - - -class SpeechTimestampsMap: - """Helper class to restore original speech timestamps.""" - - def __init__(self, chunks: List[dict], sampling_rate: int, time_precision: int = 2): - self.sampling_rate = sampling_rate - self.time_precision = time_precision - self.chunk_end_sample = [] - self.total_silence_before = [] - - previous_end = 0 - silent_samples = 0 - - for chunk in chunks: - silent_samples += chunk["start"] - previous_end - previous_end = chunk["end"] - - self.chunk_end_sample.append(chunk["end"] - silent_samples) - self.total_silence_before.append(silent_samples / sampling_rate) - - def get_original_time( - self, - time: float, - chunk_index: Optional[int] = None, - ) -> float: - if chunk_index is None: - chunk_index = self.get_chunk_index(time) - - total_silence_before = self.total_silence_before[chunk_index] - return round(total_silence_before + time, self.time_precision) - - def get_chunk_index(self, time: float) -> int: - sample = int(time * self.sampling_rate) - return min( - bisect.bisect(self.chunk_end_sample, sample), - len(self.chunk_end_sample) - 1, - ) - - -@functools.lru_cache -def get_vad_model(): - """Returns the VAD model instance.""" - abspath = os.path.abspath(__file__) - my_dir = os.path.dirname(abspath) - parent_dir = os.path.dirname(my_dir) - - path = os.path.join(parent_dir, "Models/silero_vad.onnx") - return SileroVADModel(path) - - -class SileroVADModel: - def __init__(self, path): - try: - import onnxruntime - except ImportError as e: - raise RuntimeError( - "Applying the VAD filter requires the onnxruntime package" - ) from e - - opts = onnxruntime.SessionOptions() - opts.inter_op_num_threads = 1 - opts.intra_op_num_threads = 1 - opts.log_severity_level = 4 - - self.session = onnxruntime.InferenceSession( - path, - providers=["CPUExecutionProvider"], - sess_options=opts, - ) - - def get_initial_state(self, batch_size: int): - h = np.zeros((2, batch_size, 64), dtype=np.float32) - c = np.zeros((2, batch_size, 64), dtype=np.float32) - return h, c - - def __call__(self, x, state, sr: int): - if len(x.shape) == 1: - x = np.expand_dims(x, 0) - if len(x.shape) > 2: - raise ValueError( - f"Too many dimensions for input audio chunk {len(x.shape)}" - ) - if sr / x.shape[1] > 31.25: - raise ValueError("Input audio chunk is too short") - - h, c = state - - ort_inputs = { - "input": x, - "h": h, - "c": c, - "sr": np.array(sr, dtype="int64"), - } - - out, h, c = self.session.run(None, ort_inputs) - state = (h, c) - - return out, state |
