summaryrefslogtreecommitdiffstats
path: root/Scripts
diff options
context:
space:
mode:
Diffstat (limited to 'Scripts')
-rw-r--r--Scripts/.gitignore2
-rw-r--r--Scripts/app_config.py39
-rw-r--r--Scripts/browser_src.py138
-rw-r--r--Scripts/cpp_transcribe.py197
-rw-r--r--Scripts/dump_mic_devices.py8
-rw-r--r--Scripts/emotes.py143
-rw-r--r--Scripts/emotes_v2.py149
-rw-r--r--Scripts/generate_fonts.py184
-rw-r--r--Scripts/generate_menu.py41
-rw-r--r--Scripts/generate_params.py131
-rw-r--r--Scripts/generate_shader.py161
-rw-r--r--Scripts/generate_utils.py134
-rw-r--r--Scripts/keybind_event_machine.py21
-rw-r--r--Scripts/lang_compat.py58
-rw-r--r--Scripts/libtastt.py1085
-rw-r--r--Scripts/libunity.py1432
-rw-r--r--Scripts/obfuscate.py92
-rw-r--r--Scripts/osc_ctrl.py185
-rw-r--r--Scripts/paging.py128
-rw-r--r--Scripts/profanity_filter.py43
-rw-r--r--Scripts/remove_audio_sources.py25
-rw-r--r--Scripts/requirements.txt18
-rw-r--r--Scripts/requirements_frozen.txt42
-rw-r--r--Scripts/set_texture_sz.py24
-rw-r--r--Scripts/steamvr.py89
-rw-r--r--Scripts/text_to_text_demo.py96
-rw-r--r--Scripts/text_wrapping.py55
-rw-r--r--Scripts/transcribe_pipeline.py35
-rw-r--r--Scripts/transcribe_v2.py1172
-rw-r--r--Scripts/vad.py315
30 files changed, 0 insertions, 6242 deletions
diff --git a/Scripts/.gitignore b/Scripts/.gitignore
deleted file mode 100644
index 3f2744f..0000000
--- a/Scripts/.gitignore
+++ /dev/null
@@ -1,2 +0,0 @@
-# Python generated files
-__pycache__
diff --git a/Scripts/app_config.py b/Scripts/app_config.py
deleted file mode 100644
index f911456..0000000
--- a/Scripts/app_config.py
+++ /dev/null
@@ -1,39 +0,0 @@
-import os
-import sys
-import typing
-
-def getConfig(path: str) -> typing.Dict[str, typing.Union[str, float, int, bool]]:
- # Helper functions to detect and convert the type
- def is_int(value: str) -> bool:
- try:
- int(value)
- return True
- except ValueError:
- return False
-
- def is_float(value: str) -> bool:
- try:
- float(value)
- return True
- except ValueError:
- return False
-
- def convert_value(key: str, value: str):
- if key.startswith(("enable_", "remove_", "use_", "clear_")):
- return bool(int(value))
- elif is_int(value):
- return int(value)
- elif is_float(value):
- return float(value)
- else:
- return value
-
- config = {}
- with open(path, 'r') as file:
- for line in file:
- key_value = line.strip().split(": ", maxsplit=1)
- key = key_value[0]
- value = key_value[1] if len(key_value) > 1 else ""
- config[key] = convert_value(key, value.strip())
- return config
-
diff --git a/Scripts/browser_src.py b/Scripts/browser_src.py
deleted file mode 100644
index 4ed3407..0000000
--- a/Scripts/browser_src.py
+++ /dev/null
@@ -1,138 +0,0 @@
-from transcribe_pipeline import StreamingPlugin, TranscriptCommit
-from urllib.parse import urlparse
-
-import copy
-import json
-import http.server
-import os
-import socketserver
-import threading
-import time
-import transcribe_pipeline
-import typing
-
-class HTTPServer:
- def __init__(self, port: int):
- self.port = port
- self.route_map = {}
- self.httpd = None
-
- def register_file_handler(self, http_method: str, path: str, file_path: str):
- print(f"File handler registered at {os.getcwd()}")
- def handler():
- if os.path.exists(file_path):
- with open(file_path, 'r', encoding='utf-8') as f:
- return 200, f.read().replace('%PORT%', str(self.port)), 'text/html'
- else:
- return 404, {'error': 'file not found'}, 'application/json'
- self.route_map[(http_method, path)] = handler
-
- def register_json_handler(self, http_method: str, path: str, handler):
- self.route_map[(http_method, path)] = handler
-
- def run(self):
- def handler(*args, **kwargs):
- MyHandler(http_server_instance=self, *args, **kwargs)
-
- with socketserver.TCPServer(("", self.port), handler) as httpd:
- self.httpd = httpd
- print(f"Webserver running at port {self.port}")
- httpd.serve_forever()
- print(f"Webserver exiting")
- self.httpd = None
-
- def stop(self):
- if self.httpd:
- self.httpd.shutdown()
-
-
-class MyHandler(http.server.BaseHTTPRequestHandler):
- def __init__(self, *args, http_server_instance=None, **kwargs):
- self.http_server_instance = http_server_instance
- super().__init__(*args, **kwargs)
-
- def log_message(self, format, *args):
- # TODO log if cfg["debug_mode_enabled"] is set
- return
-
- def do_GET(self):
- self.handle_request('GET')
-
- def handle_request(self, method: str):
- parsed_path = urlparse(self.path)
- if (method, parsed_path.path) in self.http_server_instance.route_map:
- status_code, response_content, content_type = \
- self.http_server_instance.route_map[(method, parsed_path.path)]()
- self.send_response(status_code)
- self.send_header('Content-Type', content_type)
- self.end_headers()
- if content_type == 'application/json':
- self.wfile.write(json.dumps(response_content).encode('utf-8'))
- else:
- self.wfile.write(response_content.encode('utf-8'))
- else:
- self.send_response(404)
- self.send_header('Content-Type', 'application/json')
- self.end_headers()
- self.wfile.write(json.dumps({'error': 'not found'}).encode('utf-8'))
-
-
-class BrowserSource(StreamingPlugin):
- def __init__(self, cfg: typing.Dict):
- port = cfg["browser_src_port"]
- print(f"Browser source running on port {port}")
- self.commits = []
- self.preview_commit = None
- self.http_server = HTTPServer(port)
- self.http_server.register_json_handler('GET', '/api/v0/transcript', self.get_transcript_json)
-
- index_html_path = os.path.join("Resources", "BrowserSource", "index.html")
- self.http_server.register_file_handler('GET', '/', index_html_path)
- self.http_server.register_file_handler('GET', '/index.html', index_html_path)
-
- # Start the HTTP server in a new thread
- self.server_thread = threading.Thread(target=self.run)
- self.server_thread.start()
-
- def transform(self, commit: TranscriptCommit) -> TranscriptCommit:
- original_commit = commit
- commit = copy.deepcopy(original_commit)
- del commit.audio
- if commit.delta:
- self.commits.append(commit)
- # Limit commits to last N.
- now = time.time()
- self.commits = [commit for commit in self.commits]
- max_commits = 10
- if len(self.commits) > max_commits:
- self.commits = self.commits[-int(max_commits/2):]
- self.preview_commit = commit
- return original_commit
-
- # return (http_code, body, content_type)
- def get_transcript_json(self) -> typing.Tuple[int, str, str]:
- processed_commits = [vars(commit) for commit in self.commits]
- transcript_data = {
- 'commits': processed_commits,
- 'preview': vars(self.preview_commit) if self.preview_commit else None,
- 'ts': time.time()
- }
- return 200, json.dumps(transcript_data), 'text/json'
-
- def run(self):
- self.http_server.run()
-
- def stop(self):
- self.http_server.stop()
- self.server_thread.join()
-
-
-# Example usage
-def my_callback() -> typing.Tuple[int, typing.Dict[str, str]]:
- return 200, {'message': 'Hello, world!'}, 'text/json'
-
-if __name__ == '__main__':
- server = HTTPServer(port=8080)
- server.register_json_handler('GET', '/api/v0/transcript', my_callback)
- server.run()
-
diff --git a/Scripts/cpp_transcribe.py b/Scripts/cpp_transcribe.py
deleted file mode 100644
index c499769..0000000
--- a/Scripts/cpp_transcribe.py
+++ /dev/null
@@ -1,197 +0,0 @@
-#!/usr/bin/env python3
-
-# The app loop does 2 things:
-# 1. Read lines from stdin and send them into the game via OSC.
-# 2. Write control info to stdout.
-# The app exits when stdin closes.
-
-from playsound import playsound
-
-import argparse
-import dataclasses
-import generate_utils
-import os
-import osc_ctrl
-import steamvr
-import sys
-import threading
-import time
-
-@dataclasses.dataclass
-class AudioState:
- text: str
- osc_state: osc_ctrl.OscState
- enable_local_beep: int
- use_builtin: int
- button: str
-
- send_transcript: bool
- run_app: bool
-
-def writeControlMessage(run: bool):
- msg = ""
- if run:
- msg += "1"
- else:
- msg += "0"
- print(f"{msg}")
-
-def readControllerInput(audio_state: AudioState):
- session = None
- first = True
- while session == None and audio_state.run_app == True:
- try:
- session = steamvr.SessionState()
- except:
- print("steamvr is off, no controller input", file=sys.stderr)
- session = None
- time.sleep(5)
-
- RECORD_STATE = 0
- PAUSE_STATE = 1
- state = PAUSE_STATE
- osc_ctrl.indicateSpeech(audio_state.osc_state.client, False)
- osc_ctrl.indicatePaging(audio_state.osc_state.client, False)
-
- hand_id = steamvr.hands[audio_state.button.split()[0]]
- button_id = steamvr.buttons[audio_state.button.split()[1]]
-
- last_rising = time.time()
- while audio_state.run_app == True:
- time.sleep(0.05)
-
- event = steamvr.pollButtonPress(session, hand_id=hand_id,
- button_id=button_id)
-
- if event == steamvr.EVENT_RISING_EDGE:
- last_rising = time.time()
- elif event == steamvr.EVENT_FALLING_EDGE:
- now = time.time()
- if now - last_rising > 0.3:
- # Long hold
- state = PAUSE_STATE
- if not audio_state.use_builtin:
- osc_ctrl.indicateSpeech(audio_state.osc_state.client, False)
- osc_ctrl.toggleBoard(audio_state.osc_state.client, False)
-
- osc_ctrl.send_transcript = False
- osc_ctrl.clear(audio_state.osc_state)
- else:
- # Short hold
- if state == RECORD_STATE:
- state = PAUSE_STATE
- if not audio_state.use_builtin:
- osc_ctrl.indicateSpeech(audio_state.osc_state.client, False)
- osc_ctrl.lockWorld(audio_state.osc_state.client, True)
-
- osc_ctrl.send_transcript = False
-
- if audio_state.enable_local_beep == 1:
- playsound(os.path.abspath("Resources/Sounds/Noise_Off_Quiet.wav"))
- elif state == PAUSE_STATE:
- state = RECORD_STATE
- if not audio_state.use_builtin:
- osc_ctrl.indicateSpeech(audio_state.osc_state.client, True)
- osc_ctrl.toggleBoard(audio_state.osc_state.client, True)
- osc_ctrl.lockWorld(audio_state.osc_state.client, False)
-
- osc_ctrl.send_transcript = True
- osc_ctrl.clear(audio_state.osc_state)
-
- audio_state.drop_transcription = True
- audio_state.audio_paused = False
-
- if audio_state.enable_local_beep == 1:
- playsound(os.path.abspath("Resources/Sounds/Noise_On_Quiet.wav"))
-
-def drainStdin(audio_state: AudioState):
- while True:
- try:
- line = input()
- except EOFError:
- # Invoking process closes the write end of their stdin to signal us
- # to exit.
- # TODO(yum) merge all threads
- audio_state.run_app = False
- return
- if len(line) > 0:
- print(f"stdin get: {line}", file=sys.stderr)
-
-def mainLoop(audio_state: AudioState):
- steamvr_input_thd = threading.Thread(target = readControllerInput,
- args = [audio_state])
- steamvr_input_thd.daemon = True
- steamvr_input_thd.start()
-
- drain_stdin_thd = threading.Thread(target = drainStdin,
- args = [audio_state])
- drain_stdin_thd.daemon = True
- drain_stdin_thd.start()
-
- writeControlMessage(False)
-
- while audio_state.run_app:
- time.sleep(0.01)
- writeControlMessage(audio_state.send_transcript)
-
-if __name__ == "__main__":
- print("args: {}".format(" ".join(sys.argv)), file=sys.stderr)
-
- # Set cwd to TaSTT/
- abspath = os.path.abspath(__file__)
- dname = os.path.dirname(abspath)
- dname = os.path.dirname(dname)
- dname = os.path.dirname(dname)
- os.chdir(dname)
- print(f"Set cwd to {os.getcwd()}", file=sys.stderr)
-
- parser = argparse.ArgumentParser()
- parser.add_argument("--bytes_per_char", type=str, help="The number of bytes to use to represent each character")
- parser.add_argument("--chars_per_sync", type=str, help="The number of characters to send on each sync event")
- parser.add_argument("--rows", type=int, help="The number of rows on the board")
- parser.add_argument("--cols", type=int, help="The number of columns on the board")
- parser.add_argument("--enable_local_beep", type=int,
- help=("Whether to play a local auditory indicator when "
- "transcription starts/stops."))
- parser.add_argument("--use_builtin", type=int,
- help=("If set to 1, use the text box built into the game."))
- parser.add_argument("--button", type=str,
- help=("The controller button used to start/stop transcription. "
- "E.g. \"left joystick\""))
- args = parser.parse_args()
-
- if args.bytes_per_char is None or args.chars_per_sync is None:
- print("--bytes_per_char and --chars_per_sync required", file=sys.stderr)
- sys.exit(1)
- if args.rows is None or args.cols is None:
- print("--rows and --cols required", file=sys.stderr)
- sys.exit(1)
- if args.button is None:
- print("--button required", file=sys.stderr)
- sys.exit(1)
- if args.enable_local_beep is None:
- print("--enable_local_beep required", file=sys.stderr)
- sys.exit(1)
- if args.use_builtin is None:
- print("--use_builtin required", file=sys.stderr)
- sys.exit(1)
-
- generate_utils.config.BYTES_PER_CHAR = int(args.bytes_per_char)
- generate_utils.config.CHARS_PER_SYNC = int(args.chars_per_sync)
- generate_utils.config.BOARD_ROWS = int(args.rows)
- generate_utils.config.BOARD_COLS = int(args.cols)
-
- audio_state = AudioState(
- text = "",
- osc_state = osc_ctrl.OscState(
- generate_utils.config.CHARS_PER_SYNC,
- generate_utils.config.BOARD_ROWS,
- generate_utils.config.BOARD_COLS),
- button = args.button,
- enable_local_beep = args.enable_local_beep,
- use_builtin = args.use_builtin,
- send_transcript = False,
- run_app = True)
-
- mainLoop(audio_state)
-
diff --git a/Scripts/dump_mic_devices.py b/Scripts/dump_mic_devices.py
deleted file mode 100644
index 874445c..0000000
--- a/Scripts/dump_mic_devices.py
+++ /dev/null
@@ -1,8 +0,0 @@
-#!/usr/bin/env python3
-
-from transcribe_v2 import MicStream
-
-if __name__ == "__main__":
- # This implicitly prints mic devices.
- s = MicStream(0)
-
diff --git a/Scripts/emotes.py b/Scripts/emotes.py
deleted file mode 100644
index 6ae0930..0000000
--- a/Scripts/emotes.py
+++ /dev/null
@@ -1,143 +0,0 @@
-#!/usr/bin/env python3
-
-import argparse
-from math import floor
-import os
-# python3 -m pip install pillow
-from PIL import Image
-import sys
-
-# (row, col)
-TEX_SZ = (2048, 2048)
-
-IMG_SZ_PX = 256
-IMG_PER_ROW = int(TEX_SZ[0] / IMG_SZ_PX)
-IMG_PER_COL = int(TEX_SZ[1] / IMG_SZ_PX)
-
-# TODO(yum) this should live in a config file.
-# Note: the name of the emote must be no longer than 6 characters.
-IMG_TEX_DATA = []
-IMG_TEX_DATA.append(("Images/Emotes/xdd.png", "xdd"))
-IMG_TEX_DATA.append(("Images/Emotes/pog.png", "pog"))
-IMG_TEX_DATA.append(("Images/Emotes/lulw.png", "laugh"))
-IMG_TEX_DATA.append(("Images/Emotes/bighardo.png", "hard"))
-IMG_TEX_DATA.append(("Images/Emotes/peepoHappy.png", "happy"))
-IMG_TEX_DATA.append(("Images/Emotes/peepoSad.png", "sad"))
-IMG_TEX_DATA.append(("Images/Emotes/bedge.png", "bed"))
-IMG_TEX_DATA.append(("Images/Emotes/reallymad.png", "mad"))
-IMG_TEX_DATA.append(("Images/Emotes/clueless.png", "surely"))
-IMG_TEX_DATA.append(("Images/Emotes/what.png", "what"))
-IMG_TEX_DATA.append(("Images/Emotes/based.png", "based"))
-IMG_TEX_DATA.append(("Images/Emotes/chad.png", "chad"))
-IMG_TEX_DATA.append(("Images/Emotes/aware.png", "aware"))
-IMG_TEX_DATA.append(("Images/Emotes/girl.png", "girl"))
-IMG_TEX_DATA = []
-
-IMG_TEX_KEYWORD_TO_COORD = {}
-for i in range(0, len(IMG_TEX_DATA)):
- IMG_TEX_KEYWORD_TO_COORD[IMG_TEX_DATA[i][1]] = i
-
-# We treat images like words. To keep things simple, they're the same height as
-# a word, and they're a fixed width.
-IMG_SZ_LETTER_ROWS = 1
-IMG_SZ_LETTER_COLS = 6
-
-def lookup(word: str):
- word = word.lower()
- word = ''.join(c for c in word.lower() if c.isalpha())
- if word in IMG_TEX_KEYWORD_TO_COORD.keys():
- return word, IMG_TEX_KEYWORD_TO_COORD[word]
- return None, None
-
-def openTexture(tex_path: str):
- if not os.path.exists(args.texture_path):
- return Image.new("RGB", TEX_SZ)
- tex = Image.open(args.texture_path)
- if tex.size != TEX_SZ:
- print("Texture at {} has mismatching size {}, creating new texture".format(
- tex_path, tex.size), file=sys.stderr)
- return Image.new("RGB", TEX_SZ)
- return tex
-
-# Add an image to the texture at the coordinates (x, y). x and y should be in
-# the range [0, IMG_PER_COL) and [0, IMG_PER_ROW) respectively.
-def addImageToTexture(tex: Image, img_path: str, x: int, y:int):
- # Transparent images will be composited on top of a black background.
- img = Image.open(img_path).convert('RGBA')
- img_bg = Image.new('RGBA', img.size, (0, 0, 0))
- img = Image.alpha_composite(img_bg, img).convert('RGB')
-
- max_px = IMG_SZ_PX
-
- # Scale the image up so it uses as much space as is given to it.
- # I originally planned to support multiple scales, but this proved to be
- # too much work - getting line wrapping to work with this would be a pain.
- # So for now, all images are the same height as words.
- scale = 1
- img_x, img_y = img.size
- max_dim = max(img_x, img_y)
- img_scale = (max_px / max_dim) * scale
- new_sz = (int(floor(img.size[0] * img_scale)),
- int(floor(img.size[1] * img_scale)))
- print("Add image {}".format(img_path))
- print(" Original size: {}".format(img.size))
- print(" Scaled size: {}".format(new_sz))
- img = img.resize(new_sz)
-
- # Center the image within its new coordinate space.
- padded_img_sz = (IMG_SZ_PX * scale, IMG_SZ_PX * scale)
- padded_img = Image.new("RGB", padded_img_sz)
- centered_x = int(floor((padded_img_sz[0] - new_sz[0]) / 2))
- centered_y = int(floor((padded_img_sz[1] - new_sz[1]) / 2))
- padded_img.paste(img, box=(centered_x, centered_y))
- img = padded_img
-
- # Break the image into tiles and write them into the texture.
- for slot in range(0, scale * scale):
- tile_x = slot % scale
- tile_y = int(floor(slot / scale))
- tile_bbox = (tile_x * IMG_SZ_PX, tile_y * IMG_SZ_PX, (tile_x + 1) * IMG_SZ_PX, (tile_y + 1) * IMG_SZ_PX)
- tile = img.crop(tile_bbox)
- print(" tile {},{} (bbox={})".format(tile_x, tile_y, tile_bbox))
-
- slot_x = x + slot % IMG_PER_ROW
- slot_y = y + int(floor(slot / IMG_PER_ROW))
- slot_x_px = slot_x * IMG_SZ_PX
- slot_y_px = slot_y * IMG_SZ_PX
- print(" Add img at {},{} (px {},{})".format(slot_x, slot_y, slot_x_px, slot_y_px))
-
- tex.paste(tile, box=(slot_x_px, slot_y_px))
-
-def parseArgs():
- parser = argparse.ArgumentParser()
- parser.add_argument("--texture_path", type=str, help="Path to save the generated texture.")
- parser.add_argument("--rows", type=str, help="The number of rows on the board")
- parser.add_argument("--cols", type=str, help="The number of columns on the board")
- args = parser.parse_args()
-
- if not args.texture_path or not args.rows or not args.cols:
- print("--texture_path, --rows, --cols required", file=sys.stderr)
- sys.exit(1)
-
- return args
-
-if __name__ == "__main__":
- args = parseArgs()
-
- rows = int(args.rows)
- cols = int(args.cols)
- # board is this much wider than tall
- board_aspect_ratio = 2
- # each cell a square divided into `rows`x`cols` is this much wider than tall
- cell_aspect_ratio = rows / cols
- # each cell is this much wider than tall
- board_cell_aspect_ratio = board_aspect_ratio * cell_aspect_ratio
-
- tex = openTexture(args.texture_path)
- for i in range(0, len(IMG_TEX_DATA)):
- filename = IMG_TEX_DATA[i][0]
- x = i % IMG_PER_ROW
- y = int(floor(i / IMG_PER_ROW))
- addImageToTexture(tex, filename, x, y)
- tex.save(args.texture_path)
-
diff --git a/Scripts/emotes_v2.py b/Scripts/emotes_v2.py
deleted file mode 100644
index a9c037f..0000000
--- a/Scripts/emotes_v2.py
+++ /dev/null
@@ -1,149 +0,0 @@
-#!/usr/bin/env python3
-
-import argparse
-import os
-import pickle
-import sys
-
-from math import floor
-from PIL import Image
-from typing import Any, Dict, List, Tuple
-
-# The character range [0x0000, 0xDFFF] is reserved for text.
-# The range [0xE000, infinity) is left over for emotes.
-EMOTES_LETTER_OFFSET = 0xE000
-EMOTES_HEIGHT = 512
-EMOTES_TEX_SZ = 4096
-
-def superimpose_image(base_img: Image, overlay_img: Image, position: Tuple[int, int]) -> Image:
- base_img.paste(overlay_img, position, overlay_img)
- return base_img
-
-def i_to_pos(i, sm_wd, sm_ht, big_wd, big_ht) -> Tuple[int, int]:
- x = i * sm_wd % big_wd
- row = floor((i * sm_wd) / big_wd)
- y = row * sm_ht
- return int(x), int(y)
-
-def get_images_from_directory(directory_path: str) -> List[Tuple[Any, str]]:
- images = []
- for filename in os.listdir(directory_path):
- file_path = os.path.join(directory_path, filename)
- if os.path.isfile(file_path) and file_path.endswith(".png"):
- image = Image.open(file_path).convert("RGBA")
- name = os.path.basename(filename).split('.')[0]
- images.append((image, name))
- return images
-
-def split_resized_image(img, wd: int, ht: int) -> List[Any]:
- aspect_ratio = img.width / img.height
- width = int(ht * aspect_ratio)
- img = img.resize((width, ht))
-
- split_images = []
- for i in range(0, img.width, wd):
- split_image = img.crop((i, 0, i + wd, ht))
- split_images.append(split_image)
-
- return split_images
-
-def resize_image_with_aspect_ratio(img: Image, aspect_ratio: float) -> Image:
- original_width, original_height = img.size
- new_width = int(original_height * aspect_ratio)
- new_height = original_height
- return img.resize((new_width, new_height))
-
-def resize_image_to_height(img: Image, height: int) -> Image:
- aspect_ratio = img.width / img.height
- new_width = int(height * aspect_ratio)
- return img.resize((new_width, height))
-
-class EmotesState:
- def __init__(self):
- self.bits = {}
-
- def load(self, pickle_path):
- try:
- with open(pickle_path, 'rb') as f:
- self.bits = pickle.load(f)
- except FileNotFoundError:
- print(f"Emotes map does not exist at {pickle_path}",
- file=sys.stderr)
-
- # This is quite slow since we do a search and replace (O(n))
- # for each keyword O(m) times each variant of said keyword (O(k)).
- # Thus total complexity is O(m*n*k). All three of these numbers are
- # typically small: m and k typically < 10, n typically < 200.
- #
- # Naively one might split the input into words, but this only works for
- # English-like languages. Eastern Asian languages like Japanese don't
- # really divide into words AFAIK so this wouldn't work for them.
- #
- # Unless the performance becomes a user-reported problem, stick with this
- # inefficient but reliable method.
- def encode_emotes(self, msg: str):
- for keyword, bits in self.bits.items():
- bits_str = ""
- for bit in bits:
- bits_str += chr(bit)
- # ALL CAPS
- tmp = keyword.upper()
- msg = msg.replace(tmp, bits_str)
- # lowercase
- tmp = keyword.lower()
- msg = msg.replace(tmp, bits_str)
- # Capitalized
- tmp = keyword.lower().capitalize()
- msg = msg.replace(tmp, bits_str)
- # dashes inserted
- tmp = '-'.join(keyword.upper())
- msg = msg.replace(tmp, bits_str)
- # uppercase, spaces inserted
- tmp = ' '.join(keyword.upper())
- msg = msg.replace(tmp, bits_str)
- # lowercase, spaces inserted
- tmp = ' '.join(keyword.lower())
- msg = msg.replace(tmp, bits_str)
- # uppercase, commas and spaces inserted
- tmp = ', '.join(keyword.upper())
- msg = msg.replace(tmp, bits_str)
- return msg
-
-if __name__ == "__main__":
- parser = argparse.ArgumentParser()
- parser.add_argument("dir", type=str, help="directory to get images from")
- parser.add_argument("board_aspect_ratio", help="aspect ratio of a cell in the board")
- parser.add_argument("texture_aspect_ratio", help="aspect ratio of a cell in the texture")
- parser.add_argument("tex_path", type=str, help="path to save the texture to")
- parser.add_argument("pickle_path", type=str, help="path to save the texture index to")
- args = parser.parse_args()
-
- directory_path = args.dir
- board_aspect_ratio = int(args.board_aspect_ratio)
- texture_aspect_ratio = int(args.texture_aspect_ratio)
-
- base_img = Image.new("RGBA", (EMOTES_TEX_SZ, EMOTES_TEX_SZ), (0, 0, 0, 0))
- images_and_filenames = get_images_from_directory(directory_path)
- i = 0
- bits = {} # Dict[str, List[int]]
- for img, filename in images_and_filenames:
- print(f"Adding {filename}")
- img = resize_image_with_aspect_ratio(img, board_aspect_ratio)
- img = resize_image_to_height(img, EMOTES_HEIGHT)
- img_fragments = split_resized_image(img, int(EMOTES_HEIGHT / texture_aspect_ratio), EMOTES_HEIGHT)
- img_bits = [] # List[int]
- for img_fragment in img_fragments:
- i = i + 1
- img_pos = i_to_pos(i,
- EMOTES_HEIGHT / texture_aspect_ratio, EMOTES_HEIGHT,
- EMOTES_TEX_SZ, EMOTES_TEX_SZ)
- print(f"{img_pos}")
- superimpose_image(base_img, img_fragment, img_pos)
- img_bits.append(EMOTES_LETTER_OFFSET + i)
- emote_name = os.path.basename(filename).split('.')[0]
- print(f"{emote_name} -> {img_bits}")
- bits[emote_name] = img_bits
- base_img.save(args.tex_path)
- with open(args.pickle_path, 'wb') as f:
- pickle.dump(bits, f)
-
diff --git a/Scripts/generate_fonts.py b/Scripts/generate_fonts.py
deleted file mode 100644
index 8dc8a89..0000000
--- a/Scripts/generate_fonts.py
+++ /dev/null
@@ -1,184 +0,0 @@
-#!/usr/bin/env python3
-
-# python3 -m pip install pillow
-# License: HPND license.
-from PIL import Image, ImageFont, ImageDraw
-
-import math
-
-# Use a power of 2 pixels per character so we can evenly divide the plane.
-font_pixels = 128
-full_ratio = 0.75
-half_ratio = 0.5
-full_sz = int(font_pixels * full_ratio)
-half_sz = int(font_pixels * half_ratio)
-layout_engine = ImageFont.Layout.BASIC
-
-unifont = ImageFont.truetype("Fonts/unifont-15.0.01.ttf", full_sz, layout_engine=layout_engine)
-unifont_half = ImageFont.truetype("Fonts/unifont-15.0.01.ttf", half_sz, layout_engine=layout_engine)
-
-noto_sans_mono = ImageFont.truetype(
- "Fonts/Noto_Sans_Mono/static/NotoSansMono/NotoSansMono-Bold.ttf",
- full_sz, index=0, layout_engine=layout_engine)
-
-noto_sans_sc_half = ImageFont.truetype("Fonts/Noto_Sans_Simplified_Chinese/NotoSansSC-Regular.otf", half_sz, layout_engine=layout_engine)
-
-noto_sans_kr_half = ImageFont.truetype("Fonts/Noto_Sans_Korean/NotoSansKR-Regular.otf", half_sz, layout_engine=layout_engine)
-
-n_rows = 64
-n_cols = 128
-
-class FontInfo:
- def __init__(self, font, dy):
- self.font = font
- self.dy = dy
-
-def allow_range(allowlist, lo_hi, font = None, dy = 0):
- for i in range(lo_hi[0], lo_hi[1] + 1):
- allowlist[i] = FontInfo(font, dy)
-def ban_range(allowlist, lo, hi):
- for i in range(lo, hi + 1):
- del allowlist[i]
-allowlist = {}
-# ASCII
-basic_latin = (32, 126)
-allow_range(allowlist, basic_latin, font=noto_sans_mono, dy = -20)
-# Latin-1 supplement
-latin_1_supplement = (0x00A1, 0x00ff)
-allow_range(allowlist, latin_1_supplement, font = noto_sans_mono)
-# Latin extended-A
-latin_extended_a = (0x0100, 0x017f)
-allow_range(allowlist, latin_extended_a, font = noto_sans_mono)
-# Latin extended-B
-latin_extended_b = (0x0180, 0x024f)
-allow_range(allowlist, latin_extended_b, font = noto_sans_mono)
-# Spacing modifier letters
-ipa_extensions = (0x0250, 0x02af)
-allow_range(allowlist, ipa_extensions, font = unifont)
-# Greek and Coptic
-greek = (0x0370, 0x03ff)
-allow_range(allowlist, greek, font = noto_sans_mono)
-ban_range(allowlist, 0x0378, 0x03a2)
-# Cyrillic
-cyrillic = (0x0400, 0x04ff)
-allow_range(allowlist, cyrillic, font = unifont)
-# Currency symbols
-currency_symbols = (0x20a0, 0x20c0)
-allow_range(allowlist, currency_symbols, font = noto_sans_mono)
-
-# CJK
-#
-hangul_jamo = (0x1100, 0x11FF)
-allow_range(allowlist, hangul_jamo, font = noto_sans_kr_half)
-#
-general_punctuation = (0x2000, 0x206f)
-allow_range(allowlist, general_punctuation, font = noto_sans_mono)
-#
-kangxi_radicals = (0x2f00, 0x2fdf)
-allow_range(allowlist, kangxi_radicals, font = noto_sans_sc_half)
-#
-cjk_symbols_and_punctuation = (0x3000, 0x303f)
-allow_range(allowlist, cjk_symbols_and_punctuation, font = noto_sans_sc_half)
-#
-hiragana = (0x3041, 0x309f)
-allow_range(allowlist, hiragana, font = noto_sans_sc_half)
-ban_range(allowlist, 0x3097, 0x3098)
-#
-katakana = (0x30a0, 0x30ff)
-allow_range(allowlist, katakana, font = noto_sans_sc_half)
-#
-hangul_compatibility_jamo = (0x3130, 0x318f)
-allow_range(allowlist, hangul_compatibility_jamo, font = noto_sans_sc_half)
-#
-enclosed_cjk_letters_and_months = (0x3200, 0x32FF)
-allow_range(allowlist, enclosed_cjk_letters_and_months, font = noto_sans_sc_half)
-#
-cjk_compatibility = (0x3300, 0x33ff)
-allow_range(allowlist, cjk_compatibility, font = noto_sans_sc_half)
-#
-cjk_unified_extension_a = (0x3400, 0x4dbf)
-allow_range(allowlist, cjk_unified_extension_a, font = noto_sans_sc_half)
-#
-cjk_ideographs = (0x4e00, 0x9fff)
-allow_range(allowlist, cjk_ideographs, font = noto_sans_sc_half)
-#
-hangul_syllables = (0xAC00, 0xD7A3)
-allow_range(allowlist, hangul_syllables, font = noto_sans_kr_half)
-#
-halfwidth_and_fullwidth = (0xff00, 0xffef)
-allow_range(allowlist, halfwidth_and_fullwidth, font = noto_sans_sc_half)
-
-def in_range(x, range_pair) -> bool:
- return x >= range_pair[0] and x <= range_pair[1]
-
-max_char = max(allowlist)
-print("max char: {}".format(max_char))
-print("num chars: {}".format(len(allowlist)))
-
-def genUnicode():
- total_rows = math.ceil(max_char / n_cols)
- print("total rows {}".format(total_rows))
- total_textures = math.ceil(total_rows / n_rows)
- print("total textures {}".format(total_textures))
-
- for nth_texture in range(0, total_textures):
- # Create an 8K grayscale ("L") or black and white ("1") image
- # Unity will re-encode b&w to grayscale, so using b&w just helps keep
- # the package size low (we vendor these, we don't generate them
- # client-side).
- image = Image.new(mode="1", size=(8192,8192), color=0)
- draw = ImageDraw.Draw(image)
-
- row_begin = nth_texture * n_rows
-
- for row in range(row_begin, row_begin + n_rows):
- line = ""
- for col in range(0, n_cols):
- # Generate the unicode character for this spot.
- n = row * n_cols + col
- char = None
- font_info = None
- if n in allowlist.keys():
- char = chr(n)
- font_info = allowlist[n]
- else:
- char = " "
- font_info = FontInfo(unifont, 0)
- # Hack: Chinese, Japanese, and Korean characters are all double
- # width and are all on textures [1,6]. To fit them in the same
- # grid, we use a half-size font.
- draw.text((col * font_pixels / 2, (row - row_begin) * font_pixels +
- font_info.dy), char, font=font_info.font, fill=255)
-
- image.save("Fonts/Bitmaps/font-%01d.png" % nth_texture)
-
-def genASCII():
- # Create an 8k grayscale image. 16 glyphs wide, 8 glyphs tall.
- # Only characters on the range [0, 128).
- image = Image.new(mode="RGBA", size=(8192,8192), color=0)
- draw = ImageDraw.Draw(image)
- n_rows = 8
- n_cols = 16
-
- font = ImageFont.truetype(
- "Fonts/Noto_Sans_Mono/static/NotoSansMono/NotoSansMono-Bold.ttf",
- int((8192 / 8) * 0.75), index=0, layout_engine=layout_engine)
-
- for row in range(0, n_rows):
- for col in range(0, n_cols):
- n = row * n_cols + col
- char = None
- font_info = None
- if n in allowlist.keys():
- char = chr(n)
- else:
- char = " "
- draw.text((col * font_pixels * 8 / 2, row * font_pixels * 8 - 20),
- char, font=font, fill=(255,255,255))
- image.save("Fonts/Bitmaps/font-ascii.png")
-
-if __name__ == "__main__":
- print("Generating unicode fonts")
- #genUnicode()
- print("Generating ASCII fonts")
- genASCII()
diff --git a/Scripts/generate_menu.py b/Scripts/generate_menu.py
deleted file mode 100644
index 2da50b2..0000000
--- a/Scripts/generate_menu.py
+++ /dev/null
@@ -1,41 +0,0 @@
-#!/usr/bin/env python3
-
-import argparse
-import sys
-
-MENU_SUFFIX = """
- - name: TaSTT
- icon: {fileID: 0}
- type: 103
- parameter:
- name:
- value: 1
- style: 0
- subMenu: {fileID: 11400000, guid: 111d8d5f909f534429bfe46268723200, type: 2}
- subParameters: []
- labels: []
-"""[1:]
-
-def append(old_path, new_path):
- merged = ""
- with open(old_path, "r") as f:
- merged = f.read()
- merged += MENU_SUFFIX
- with open(new_path, "w") as f:
- f.write(merged)
-
-if __name__ == "__main__":
-
- parser = argparse.ArgumentParser()
- parser.add_argument("--old_menu", type=str, help="The menu to append to")
- parser.add_argument("--new_menu", type=str, help="The menu to create")
- args = parser.parse_args()
-
- if not args.old_menu or not args.new_menu:
- print("--old_menu and --new_menu are both required",
- file=sys.stderr)
- parser.print_help()
- parser.exit(1)
-
- append(args.old_menu, args.new_menu)
-
diff --git a/Scripts/generate_params.py b/Scripts/generate_params.py
deleted file mode 100644
index 0d47fde..0000000
--- a/Scripts/generate_params.py
+++ /dev/null
@@ -1,131 +0,0 @@
-#!/usr/bin/env python3
-
-import app_config
-import argparse
-import generate_utils
-import sys
-
-PARAM_HEADER = """
-%YAML 1.1
-%TAG !u! tag:unity3d.com,2011:
---- !u!114 &11400000
-MonoBehaviour:
- m_ObjectHideFlags: 0
- m_CorrespondingSourceObject: {fileID: 0}
- m_PrefabInstance: {fileID: 0}
- m_PrefabAsset: {fileID: 0}
- m_GameObject: {fileID: 0}
- m_Enabled: 1
- m_EditorHideFlags: 0
- m_Script: {fileID: -1506855854, guid: 67cc4cb7839cd3741b63733d5adf0442, type: 3}
- m_Name: TaSTT_params
- m_EditorClassIdentifier:
- parameters:
-"""[1:]
-
-INT_PARAM = """
- - name: %PARAM_NAME%
- valueType: 0
- saved: 0
- defaultValue: 0
- networkSynced: %SYNCED%
-"""[1:]
-
-BOOL_PARAM = """
- - name: %PARAM_NAME%
- valueType: 2
- saved: %SAVED%
- defaultValue: 0
- networkSynced: %SYNCED%
-"""[1:]
-
-FLOAT_PARAM = """
- - name: %PARAM_NAME%
- valueType: 1
- saved: 0
- defaultValue: %DEFAULT_FLOAT%
- networkSynced: %SYNCED%
-"""[1:]
-
-def generate(cfg):
- result = ""
-
- # We're working with an 84-character board, and each FX layer is responsible
- # for 8 of those characters.
- params = {}
- params["SAVED"] = "0"
- params["DEFAULT_FLOAT"] = "0"
- params["SYNCED"] = "1"
-
- params["PARAM_NAME"] = generate_utils.getDummyParam()
- result += generate_utils.replaceMacros(BOOL_PARAM, params)
-
- params["PARAM_NAME"] = generate_utils.getEnableParam()
- result += generate_utils.replaceMacros(BOOL_PARAM, params)
-
- params["PARAM_NAME"] = generate_utils.getEllipsisParam()
- result += generate_utils.replaceMacros(BOOL_PARAM, params)
-
- if not cfg["enable_phonemes"]:
- params["SYNCED"] = "0"
- for i in range(5):
- params["PARAM_NAME"] = generate_utils.getSoundParam(i+1)
- result += generate_utils.replaceMacros(BOOL_PARAM, params)
- params["PARAM_NAME"] = generate_utils.getEnablePhonemeParam()
- result += generate_utils.replaceMacros(BOOL_PARAM, params)
- params["SYNCED"] = "1"
-
- params["PARAM_NAME"] = generate_utils.getScaleParam()
- params["DEFAULT_FLOAT"] = "0.05"
- result += generate_utils.replaceMacros(FLOAT_PARAM, params)
- params["DEFAULT_FLOAT"] = "0"
-
- params["PARAM_NAME"] = generate_utils.getToggleParam()
- result += generate_utils.replaceMacros(BOOL_PARAM, params)
-
- params["PARAM_NAME"] = generate_utils.getLockWorldParam()
- result += generate_utils.replaceMacros(BOOL_PARAM, params)
-
- params["PARAM_NAME"] = generate_utils.getClearBoardParam()
- result += generate_utils.replaceMacros(BOOL_PARAM, params)
-
- params["PARAM_NAME"] = generate_utils.getSelectParam()
- result += generate_utils.replaceMacros(INT_PARAM, params)
-
- for byte in range(0, generate_utils.config.BYTES_PER_CHAR):
- for i in range(0, generate_utils.config.CHARS_PER_SYNC):
- params["PARAM_NAME"] = generate_utils.getBlendParam(i, byte)
- result += generate_utils.replaceMacros(FLOAT_PARAM, params)
-
- return result
-
-def append(old_path, params, new_path):
- merged = ""
- with open(old_path, "r") as f:
- merged = f.read()
- merged += params
- with open(new_path, "w") as f:
- f.write(merged)
-
-if __name__ == "__main__":
-
- parser = argparse.ArgumentParser()
- parser.add_argument("--old_params", type=str, help="The parameters to append to")
- parser.add_argument("--new_params", type=str, help="The parameters to create")
- parser.add_argument("--config", type=str, help="The path to the app config.")
- parser.add_argument("--chars_per_sync", type=str, help="The number of characters to send on each sync event")
- args = parser.parse_args()
-
- if not args.old_params or not args.new_params:
- print("--old_params and --new_params are both required",
- file=sys.stderr)
- parser.print_help()
- parser.exit(1)
-
- cfg = app_config.getConfig(args.config)
-
- generate_utils.config.BYTES_PER_CHAR = int(cfg["bytes_per_char"])
- generate_utils.config.CHARS_PER_SYNC = int(cfg["chars_per_sync"])
-
- append(args.old_params, generate(cfg), args.new_params)
-
diff --git a/Scripts/generate_shader.py b/Scripts/generate_shader.py
deleted file mode 100644
index 80f6704..0000000
--- a/Scripts/generate_shader.py
+++ /dev/null
@@ -1,161 +0,0 @@
-#!/usr/bin/env python3
-
-import argparse
-import generate_utils
-import os
-import sys
-
-# A single parameter looks like this:
-# _Letter_Row00_Col00_Byte0("_Letter_Row00_Col00_Byte0", float) = 0
-def generateUnityParams(nbytes: int, nrows: int, ncols: int, prefix: str = "") -> str:
- lines = []
- lines.append(prefix + "// BEGIN GENERATED CODE BLOCK")
- for byte in range(0, nbytes):
- for row in range(0, nrows):
- for col in range(0, ncols):
- param_name = generate_utils.getShaderParamByRowColByte(row, col, byte)
- line = prefix + """{}("{}", float) = 0""".format(param_name, param_name)
- lines.append(line)
- lines.append(prefix + "// END GENERATED CODE BLOCK")
- return '\n'.join(lines)
-
-# A single parameter looks like this:
-# float _Letter_Row00_Col00_Byte0;
-def generateCgParams(nbytes: int, nrows: int, ncols: int, prefix: str = "") -> str:
- lines = []
- lines.append(prefix + "// BEGIN GENERATED CODE BLOCK")
- for byte in range(0, nbytes):
- for row in range(0, nrows):
- for col in range(0, ncols):
- param_name = generate_utils.getShaderParamByRowColByte(row, col, byte)
- line = prefix + """float {};""".format(param_name)
- lines.append(line)
- lines.append(prefix + "// END GENERATED CODE BLOCK")
- return '\n'.join(lines)
-
-# Define 3 constants:
-# uniform int BYTES_PER_CHAR = $nbytes;
-# uniform int NROWS = $nrows;
-# uniform int NCOLS = $ncols;
-def generateCgConstants(nbytes: int, board_nrows: int, board_ncols: int,
- texture_nrows: int, texture_ncols: int, prefix: str = "") -> str:
- lines = []
- lines.append(prefix + "// BEGIN GENERATED CODE BLOCK")
- lines.append(prefix + "#define BYTES_PER_CHAR {}".format(nbytes))
- lines.append(prefix + "#define BOARD_NROWS {}".format(board_nrows))
- lines.append(prefix + "#define BOARD_NCOLS {}".format(board_ncols))
- lines.append(prefix + "#define TEXTURE_NROWS {}".format(texture_nrows))
- lines.append(prefix + "#define TEXTURE_NCOLS {}".format(texture_ncols))
- lines.append(prefix + "// END GENERATED CODE BLOCK")
- return '\n'.join(lines)
-
-# This is the basic idea of what we're generating:
-# // Get the value of the parameter for the cell we're in.
-# uint GetLetterParameter(float2 uv)
-# {
-# float CHAR_COL = floor(uv.x * Cols);
-# float CHAR_ROW = floor(uv.y * Rows);
-# uint res = 0;
-#
-# [forcecase] switch(CHAR_ROW) {
-# case n:
-# case n-1:
-# ...
-#
-# [forcecase] switch (CHAR_COL) {
-# case 0:
-# case 1:
-# ...
-#
-# res |= ((uint) _Letter_Row00_Col00_Byte0) << (0 * 8);
-# res |= ((uint) _Letter_Row00_Col00_Byte1) << (1 * 8);
-# continue;
-# }
-# }
-# return res;
-# }
-# In English, this provides an accessor to the many (possibly thousands)
-# float parameters which hold the text on the board.
-def generateLetterAccessor(nbytes: int, nrows: int, ncols: int, prefix: str = "") -> str:
- lines = []
- lines.append(prefix + "// BEGIN GENERATED CODE BLOCK")
- lines.append(prefix + "[forcecase] switch (CHAR_ROW) {")
- for row in range(0, nrows):
- lines.append(prefix + " case {}:".format(nrows - (row + 1)))
- lines.append(prefix + " [forcecase] switch (CHAR_COL) {")
- for col in range(0, ncols):
- lines.append(prefix + " case {}:".format(col))
- for byte in range(0, nbytes):
- param_name = generate_utils.getShaderParamByRowColByte(row, col, byte)
- lines.append(prefix + " res |= ((uint) {}) << ({} * 8);".format(param_name, byte))
- lines.append(prefix + " return res;")
- lines.append(prefix + " default:")
- lines.append(prefix + " return 0;")
- lines.append(prefix + " }")
- lines.append(prefix + "}")
- lines.append(prefix + "// END GENERATED CODE BLOCK")
- return '\n'.join(lines)
-
-# Replace any line containing `macro` with `replacement`.
-def applyLineMacro(old_path: str, new_path: str, macro: str, replacement: str) -> bool:
- new_lines = []
- times_applied = 0
- with open(old_path, 'r', encoding="utf-8") as f:
- for line in f:
- if line[-1] == '\n':
- line = line[0:len(line)-1]
- if macro in line:
- new_lines.append(replacement)
- times_applied += 1
- else:
- new_lines.append(line)
- with open(new_path, 'w', encoding="utf-8") as f:
- f.write('\n'.join(new_lines))
- return times_applied
-
-if __name__ == "__main__":
- print("args: {}".format(" ".join(sys.argv)))
-
- parser = argparse.ArgumentParser()
- parser.add_argument("--bytes_per_char", type=str, help="The number of bytes to use to represent each character")
- parser.add_argument("--board_rows", type=str, help="The number of rows on the board")
- parser.add_argument("--board_cols", type=str, help="The number of columns on the board")
- parser.add_argument("--texture_rows", type=str, help="The number of rows on the font textures")
- parser.add_argument("--texture_cols", type=str, help="The number of columns on the font textures")
- parser.add_argument("--shader_template", type=str, help="The path to the shader template")
- parser.add_argument("--shader_path", type=str, help="The path where the generated shader will be written")
- args = parser.parse_args()
-
- if not args.bytes_per_char or not args.board_rows or not args.board_cols \
- or not args.texture_rows or not args.texture_cols \
- or not args.shader_template or not args.shader_path:
- print(("--bytes_per_char, --board_rows, --board_cols, --texture_rows, "
- "--texture_cols, --shader_template, --shader_path required"), file=sys.stderr)
- sys.exit(1)
-
- nbytes = int(args.bytes_per_char)
- board_nrows = int(args.board_rows)
- board_ncols = int(args.board_cols)
- texture_nrows = int(args.texture_rows)
- texture_ncols = int(args.texture_cols)
-
- replacement = generateUnityParams(nbytes, board_nrows, board_ncols, prefix = "")
- #print(replacement)
- macro = "// %TEMPLATE__UNITY_ROW_COL_PARAMS%"
- applyLineMacro(args.shader_template, args.shader_path, macro, replacement)
-
- replacement = generateCgParams(nbytes, board_nrows, board_ncols, prefix = " ")
- #print(replacement)
- macro = "// %TEMPLATE__CG_ROW_COL_PARAMS%"
- applyLineMacro(args.shader_path, args.shader_path, macro, replacement)
-
- replacement = generateCgConstants(nbytes, board_nrows, board_ncols,
- texture_nrows, texture_ncols, prefix = " ")
- #print(replacement)
- macro = "// %TEMPLATE__CG_ROW_COL_CONSTANTS%"
- applyLineMacro(args.shader_path, args.shader_path, macro, replacement)
-
- replacement = generateLetterAccessor(nbytes, board_nrows, board_ncols, prefix = " ")
- #print(replacement)
- macro = "// %TEMPLATE__CG_LETTER_ACCESSOR%"
- applyLineMacro(args.shader_path, args.shader_path, macro, replacement)
diff --git a/Scripts/generate_utils.py b/Scripts/generate_utils.py
deleted file mode 100644
index ccc92fc..0000000
--- a/Scripts/generate_utils.py
+++ /dev/null
@@ -1,134 +0,0 @@
-from math import ceil
-from math import floor
-
-def replaceMacros(lines, macro_defs):
- for k,v in macro_defs.items():
- lines = lines.replace("%" + k + "%", v)
- return lines
-
-class Config():
- def __init__(self):
- self.BOARD_ROWS=4
- self.BOARD_COLS=48
- self.CHARS_PER_CELL=256
- self.BYTES_PER_CHAR=2
- self.CHARS_PER_SYNC=10
-
- def numRegions(self, which_layer):
- num_cells = self.BOARD_ROWS * self.BOARD_COLS
- layers_in_last_region = num_cells % self.CHARS_PER_SYNC
- float_result = num_cells / self.CHARS_PER_SYNC
- if which_layer >= layers_in_last_region:
- return floor(float_result)
- else:
- return ceil(float_result)
-
- def layerNeedsParity(self, which_layer):
- num_cells = self.BOARD_ROWS * self.BOARD_COLS
- layers_in_last_region = num_cells % self.CHARS_PER_SYNC
- if layers_in_last_region > 0 and which_layer >= layers_in_last_region:
- return True
- else:
- return False
-
-config = Config()
-
-# Implementation detail. We use this parameter to return from the terminal
-# state of the FX layer to the starting state.
-def getDummyParam():
- return "TaSTT_Dummy"
-
-def getToggleParam():
- return "TaSTT_Toggle"
-
-def getScaleParam():
- return "TaSTT_Scale"
-
-def getEnablePhonemeParam():
- return "TaSTT_Enable_Phoneme"
-
-# When this is set to true, the board clears.
-def getClearBoardParam():
- return "TaSTT_Clear_Board"
-
-def getLockWorldParam():
- return "TaSTT_Lock_World"
-
-# Each layer controls a group of cells. There's only one letter per layer, thus
-# this is also the name of the parameter which sets the letter for a layer.
-def getLayerParam(which_layer: int, byte: int) -> str:
- return "TaSTT_L%02dB%01d" % (which_layer, byte)
-
-def getLayerName(which_layer: int, byte: int) -> str:
- return getLayerParam(which_layer, byte)
-
-def getBlendParam(which_layer: int, byte: int) -> str:
- return "TaSTT_L%02dB%01d_Blend" % (which_layer, byte)
-
-def getDefaultStateName(which_layer:int , byte: int):
- return "TaSTT_L%02dB%01d_Do_Nothing" % (which_layer, byte)
-
-def getActiveStateName(which_layer: int, byte: int):
- return "TaSTT_L%02dB%01d_Active" % (which_layer, byte)
-
-def getSelectStateName(which_layer, select):
- return "TaSTT_L%02d_S%02d_B%01d" % (which_layer, select, byte)
-
-def getBlendStateName(which_layer, select, byte):
- return "TaSTT_L%02d_S%02d_B%01d_Blend" % (which_layer, select, byte)
-
-def getLetterStateName(which_layer, select, letter, byte):
- return "TaSTT_L%02d_S%02d_L%03d_B%01d" % (which_layer, select, letter, byte)
-
-def getSelectParam() -> str:
- return "TaSTT_Select"
-
-def getEnableParam():
- return "TaSTT_Enable"
-
-def getSoundParam(i: int):
- return f"TaSTT_Sound{str(i)}"
-
-def getEllipsisParam():
- return "TaSTT_Ellipsis"
-
-def getBoardIndex(which_layer, select):
- # Because we divide the board into a multiple of 8 cells, some cells may
- # describe animations which don't exist, depending on the size of the board.
- # We work around this by simply wrapping those animations back to the top
- # of the board, and rely on the OSC controller to simply not reference
- # those cells.
- return (select * config.CHARS_PER_SYNC + which_layer) % (config.BOARD_ROWS * config.BOARD_COLS)
-
-def getShaderParamByRowColByte(row, col, byte):
- return "_Letter_Row%02d_Col%02d_Byte%01d" % (row, col, byte)
-
-# Mapping from layer to shader param.
-def getShaderParam(which_layer, select, byte):
- index = getBoardIndex(which_layer, select)
-
- col = index % config.BOARD_COLS
- row = floor(index / config.BOARD_COLS)
-
- return getShaderParamByRowCol(row, col, byte)
-
-# The name of the animation which writes `letter` at a specific position in the
-# display.
-def getLetterAnimationName(row, col, letter, nth_byte):
- return "R%02dC%02dL%02dB%01d" % (row, col, letter, nth_byte)
-
-# The name of the animation which clears the entire board.
-def getClearAnimationName():
- return "TaSTT_Clear_Board"
-
-def getAnimationNameByLayerAndIndex(which_layer, select, letter, nth_byte):
- index = getBoardIndex(which_layer, select)
-
- col = index % config.BOARD_COLS
- row = floor(index / config.BOARD_COLS)
-
- return "R%02dC%02dL%02dB%01d" % (row, col, letter, nth_byte)
-
-# Returns the path to the animation for the given shader parameter + letter.
-def getAnimationPath(shader_param, letter):
- return "generated/animations/%s_Letter%02d.anim" % (shader_param, letter)
diff --git a/Scripts/keybind_event_machine.py b/Scripts/keybind_event_machine.py
deleted file mode 100644
index 3ce6794..0000000
--- a/Scripts/keybind_event_machine.py
+++ /dev/null
@@ -1,21 +0,0 @@
-import keyboard
-import time
-
-class KeybindEventMachine:
- def __init__(self, keybind: str):
- self.keybind = keybind
- self.events = []
- keyboard.add_hotkey(keybind, self.onPress)
-
- def onPress(self) -> None:
- self.events.append(time.time())
-
- # Returns the timestamp when the keybind was pressed, or 0 if no keypresses
- # are queued.
- def getNextPressTime(self) -> int:
- if len(self.events) == 0:
- return 0
- ret = self.events[0]
- self.events = self.events[1:]
- return ret
-
diff --git a/Scripts/lang_compat.py b/Scripts/lang_compat.py
deleted file mode 100644
index af35921..0000000
--- a/Scripts/lang_compat.py
+++ /dev/null
@@ -1,58 +0,0 @@
-# This file provides mappings between language codes used by different
-# third-party libraries.
-
-# Whisper to NLLB.
-whisper_to_nllb = {
- "catalan": "cat_Ltn", # catalan
- "czech": "ces_Latn", # czech
- "danish": "dan_Latn", # danish
- "dutch": "nld_Latn", # dutch
- "english": "eng_Latn", # english
- "finnish": "fin_Latn", # finnish
- "french": "fra_Latn", # french
- "german": "deu_Latn", # german
- "greek": "ell_Grek", # greek
- "hungarian": "hun_Latn", # hungarian
- "icelandic": "isl_Latn", # icelandic
- "italian": "ita_Latn", # italian
- "latvian": "lvs_Latn", # latvian
- "lithuanian": "lit_Latn", # lithuanian
- "norwegian": "nob_Latn", # norwegian (bokmal)
- "polish": "pol_Latn", # polish
- "portugese": "por_Latn", # portugese
- "romanian": "ron_Latn", # romanian
- "russian": "rus_Cyrl", # russian
- "slovak": "slk_Latn", # slovak
- "slovene": "slv_Latn", # slovene
- "spanish": "spa_Latn", # spanish
- "swedish": "swe_Latn", # swedish
- "turkish": "tur_Latn", # turkish
- }
-
-# NLLB to sentence_splitter (SS).
-nllb_to_ss = {
- "cat_Ltn": "ca", # catalan
- "ces_Latn": "cs", # czech
- "dan_Latn": "da", # danish
- "nld_Latn": "nl", # dutch
- "eng_Latn": "en", # english
- "fin_Latn": "fi", # finnish
- "fra_Latn": "fr", # french
- "deu_Latn": "de", # german
- "ell_Grek": "el", # greek
- "hun_Latn": "hu", # hungarian
- "isl_Latn": "is", # icelandic
- "ita_Latn": "it", # italian
- "lvs_Latn": "lv", # latvian
- "lit_Latn": "lt", # lithuanian
- "nob_Latn": "no", # norwegian (bokmal)
- "pol_Latn": "pl", # polish
- "por_Latn": "pt", # portugese
- "ron_Latn": "ro", # romanian
- "rus_Cyrl": "ru", # russian
- "slk_Latn": "sk", # slovak
- "slv_Latn": "sl", # slovene
- "spa_Latn": "es", # spanish
- "swe_Latn": "sv", # swedish
- "tur_Latn": "tr", # turkish
- }
diff --git a/Scripts/libtastt.py b/Scripts/libtastt.py
deleted file mode 100644
index 81baa8b..0000000
--- a/Scripts/libtastt.py
+++ /dev/null
@@ -1,1085 +0,0 @@
-#!/usr/bin/env python3
-
-import app_config
-import argparse
-import array
-import generate_utils
-import libunity
-import os
-import pickle
-import sys
-import typing
-
-# TODO(yum) we're getting the encoding scheme from here, but I think it should
-# be in a different layer.
-import osc_ctrl
-
-SCALE_ANIMATION_TEMPLATE = """
-%YAML 1.1
-%TAG !u! tag:unity3d.com,2011:
---- !u!74 &7400000
-AnimationClip:
- m_ObjectHideFlags: 0
- m_CorrespondingSourceObject: {fileID: 0}
- m_PrefabInstance: {fileID: 0}
- m_PrefabAsset: {fileID: 0}
- m_Name: TaSTT_Scale_0
- serializedVersion: 6
- m_Legacy: 0
- m_Compressed: 0
- m_UseHighQualityCurve: 1
- m_RotationCurves: []
- m_CompressedRotationCurves: []
- m_EulerCurves: []
- m_PositionCurves: []
- m_ScaleCurves:
- - curve:
- serializedVersion: 2
- m_Curve:
- - serializedVersion: 3
- time: 0
- value: {x: 5, y: 5, z: 5}
- inSlope: {x: 0, y: 0, z: 0}
- outSlope: {x: 0, y: 0, z: 0}
- tangentMode: 0
- weightedMode: 0
- inWeight: {x: 0, y: 0.33333334, z: 0.33333334}
- outWeight: {x: 0, y: 0.33333334, z: 0.33333334}
- m_PreInfinity: 2
- m_PostInfinity: 2
- m_RotationOrder: 4
- path: World Constraint/Container/TaSTT
- m_FloatCurves: []
- m_PPtrCurves: []
- m_SampleRate: 60
- m_WrapMode: 0
- m_Bounds:
- m_Center: {x: 0, y: 0, z: 0}
- m_Extent: {x: 0, y: 0, z: 0}
- m_ClipBindingConstant:
- genericBindings:
- - serializedVersion: 2
- path: 1272388438
- attribute: 3
- script: {fileID: 0}
- typeID: 4
- customType: 0
- isPPtrCurve: 0
- - serializedVersion: 2
- path: 1272388438
- attribute: 1225223716
- script: {fileID: 0}
- typeID: 23
- customType: 0
- isPPtrCurve: 0
- pptrCurveMapping: []
- m_AnimationClipSettings:
- serializedVersion: 2
- m_AdditiveReferencePoseClip: {fileID: 0}
- m_AdditiveReferencePoseTime: 0
- m_StartTime: 0
- m_StopTime: 0.016666668
- m_OrientationOffsetY: 0
- m_Level: 0
- m_CycleOffset: 0
- m_HasAdditiveReferencePose: 0
- m_LoopTime: 1
- m_LoopBlend: 0
- m_LoopBlendOrientation: 0
- m_LoopBlendPositionY: 0
- m_LoopBlendPositionXZ: 0
- m_KeepOriginalOrientation: 0
- m_KeepOriginalPositionY: 1
- m_KeepOriginalPositionXZ: 0
- m_HeightFromFeet: 0
- m_Mirror: 0
- m_EditorCurves:
- - curve:
- serializedVersion: 2
- m_Curve:
- - serializedVersion: 3
- time: 0
- value: 5
- inSlope: 0
- outSlope: 0
- tangentMode: 136
- weightedMode: 0
- inWeight: 0
- outWeight: 0
- m_PreInfinity: 2
- m_PostInfinity: 2
- m_RotationOrder: 4
- attribute: m_LocalScale.x
- path: World Constraint/Container/TaSTT
- classID: 4
- script: {fileID: 0}
- - curve:
- serializedVersion: 2
- m_Curve:
- - serializedVersion: 3
- time: 0
- value: 5
- inSlope: 0
- outSlope: 0
- tangentMode: 136
- weightedMode: 0
- inWeight: 0
- outWeight: 0
- m_PreInfinity: 2
- m_PostInfinity: 2
- m_RotationOrder: 4
- attribute: m_LocalScale.y
- path: World Constraint/Container/TaSTT
- classID: 4
- script: {fileID: 0}
- - curve:
- serializedVersion: 2
- m_Curve:
- - serializedVersion: 3
- time: 0
- value: 5
- inSlope: 0
- outSlope: 0
- tangentMode: 136
- weightedMode: 0
- inWeight: 0
- outWeight: 0
- m_PreInfinity: 2
- m_PostInfinity: 2
- m_RotationOrder: 4
- attribute: m_LocalScale.z
- path: World Constraint/Container/TaSTT
- classID: 4
- script: {fileID: 0}
- m_EulerEditorCurves: []
- m_HasGenericRootTransform: 0
- m_HasMotionFloatCurves: 0
- m_Events: []
-"""
-
-SOUND_ANIMATION_TEMPLATE = """
-%YAML 1.1
-%TAG !u! tag:unity3d.com,2011:
---- !u!74 &7400000
-AnimationClip:
- m_ObjectHideFlags: 0
- m_CorrespondingSourceObject: {fileID: 0}
- m_PrefabInstance: {fileID: 0}
- m_PrefabAsset: {fileID: 0}
- m_Name: Sound1_On
- serializedVersion: 6
- m_Legacy: 0
- m_Compressed: 0
- m_UseHighQualityCurve: 1
- m_RotationCurves: []
- m_CompressedRotationCurves: []
- m_EulerCurves: []
- m_PositionCurves: []
- m_ScaleCurves: []
- m_FloatCurves:
- - curve:
- serializedVersion: 2
- m_Curve:
- - serializedVersion: 3
- time: 0
- value: 1
- inSlope: Infinity
- outSlope: Infinity
- tangentMode: 103
- weightedMode: 0
- inWeight: 0
- outWeight: 0
- m_PreInfinity: 2
- m_PostInfinity: 2
- m_RotationOrder: 4
- attribute: m_IsActive
- path: World Constraint/Container/TaSTT/Audio 1
- classID: 1
- script: {fileID: 0}
- m_PPtrCurves: []
- m_SampleRate: 60
- m_WrapMode: 0
- m_Bounds:
- m_Center: {x: 0, y: 0, z: 0}
- m_Extent: {x: 0, y: 0, z: 0}
- m_ClipBindingConstant:
- genericBindings:
- - serializedVersion: 2
- path: 2267216663
- attribute: 2086281974
- script: {fileID: 0}
- typeID: 1
- customType: 0
- isPPtrCurve: 0
- pptrCurveMapping: []
- m_AnimationClipSettings:
- serializedVersion: 2
- m_AdditiveReferencePoseClip: {fileID: 0}
- m_AdditiveReferencePoseTime: 0
- m_StartTime: 0
- m_StopTime: 0
- m_OrientationOffsetY: 0
- m_Level: 0
- m_CycleOffset: 0
- m_HasAdditiveReferencePose: 0
- m_LoopTime: 0
- m_LoopBlend: 0
- m_LoopBlendOrientation: 0
- m_LoopBlendPositionY: 0
- m_LoopBlendPositionXZ: 0
- m_KeepOriginalOrientation: 0
- m_KeepOriginalPositionY: 1
- m_KeepOriginalPositionXZ: 0
- m_HeightFromFeet: 0
- m_Mirror: 0
- m_EditorCurves: []
- m_EulerEditorCurves: []
- m_HasGenericRootTransform: 0
- m_HasMotionFloatCurves: 0
- m_Events: []
-"""
-
-LETTER_ANIMATION_TEMPLATE = """
-%YAML 1.1
-%TAG !u! tag:unity3d.com,2011:
---- !u!74 &7400000
-AnimationClip:
- m_ObjectHideFlags: 0
- m_CorrespondingSourceObject: {fileID: 0}
- m_PrefabInstance: {fileID: 0}
- m_PrefabAsset: {fileID: 0}
- m_Name: REPLACEME_ANIMATION_NAME
- serializedVersion: 6
- m_Legacy: 0
- m_Compressed: 0
- m_UseHighQualityCurve: 1
- m_RotationCurves: []
- m_CompressedRotationCurves: []
- m_EulerCurves: []
- m_PositionCurves: []
- m_ScaleCurves: []
- m_FloatCurves:
- - curve:
- serializedVersion: 2
- m_Curve:
- - serializedVersion: 3
- time: 0
- value: REPLACEME_LETTER_VALUE
- inSlope: 0
- outSlope: 0
- tangentMode: 136
- weightedMode: 0
- inWeight: 0
- outWeight: 0
- - serializedVersion: 3
- time: 0.016666668
- value: REPLACEME_LETTER_VALUE
- inSlope: 0
- outSlope: 0
- tangentMode: 136
- weightedMode: 0
- inWeight: 0
- outWeight: 0
- m_PreInfinity: 2
- m_PostInfinity: 2
- m_RotationOrder: 4
- attribute: material.REPLACEME_LETTER_PARAM
- path: TaSTT
- classID: 23
- script: {fileID: 0}
- - curve:
- serializedVersion: 2
- m_Curve:
- - serializedVersion: 3
- time: 0
- value: REPLACEME_LETTER_VALUE
- inSlope: 0
- outSlope: 0
- tangentMode: 136
- weightedMode: 0
- inWeight: 0
- outWeight: 0
- - serializedVersion: 3
- time: 0.016666668
- value: REPLACEME_LETTER_VALUE
- inSlope: 0
- outSlope: 0
- tangentMode: 136
- weightedMode: 0
- inWeight: 0
- outWeight: 0
- m_PreInfinity: 2
- m_PostInfinity: 2
- m_RotationOrder: 4
- attribute: material.REPLACEME_LETTER_PARAM
- path: TaSTT
- classID: 137
- script: {fileID: 0}
- m_PPtrCurves: []
- m_SampleRate: 60
- m_WrapMode: 0
- m_Bounds:
- m_Center: {x: 0, y: 0, z: 0}
- m_Extent: {x: 0, y: 0, z: 0}
- m_ClipBindingConstant:
- genericBindings:
- - serializedVersion: 2
- path: 2794480623
- attribute: 2284639795
- script: {fileID: 0}
- typeID: 137
- customType: 22
- isPPtrCurve: 0
- pptrCurveMapping: []
- m_AnimationClipSettings:
- serializedVersion: 2
- m_AdditiveReferencePoseClip: {fileID: 0}
- m_AdditiveReferencePoseTime: 0
- m_StartTime: 0
- m_StopTime: 0
- m_OrientationOffsetY: 0
- m_Level: 0
- m_CycleOffset: 0
- m_HasAdditiveReferencePose: 0
- m_LoopTime: 1
- m_LoopBlend: 0
- m_LoopBlendOrientation: 0
- m_LoopBlendPositionY: 0
- m_LoopBlendPositionXZ: 0
- m_KeepOriginalOrientation: 0
- m_KeepOriginalPositionY: 1
- m_KeepOriginalPositionXZ: 0
- m_HeightFromFeet: 0
- m_Mirror: 0
- m_EditorCurves:
- - curve:
- serializedVersion: 2
- m_Curve:
- - serializedVersion: 3
- time: 0
- value: REPLACEME_LETTER_VALUE
- inSlope: 0
- outSlope: 0
- tangentMode: 136
- weightedMode: 0
- inWeight: 0
- outWeight: 0
- - serializedVersion: 3
- time: 0.016666668
- value: REPLACEME_LETTER_VALUE
- inSlope: 0
- outSlope: 0
- tangentMode: 136
- weightedMode: 0
- inWeight: 0
- outWeight: 0
- m_PreInfinity: 2
- m_PostInfinity: 2
- m_RotationOrder: 4
- attribute: material.REPLACEME_LETTER_PARAM
- path: TaSTT
- classID: 23
- script: {fileID: 0}
- - curve:
- serializedVersion: 2
- m_Curve:
- - serializedVersion: 3
- time: 0
- value: REPLACEME_LETTER_VALUE
- inSlope: 0
- outSlope: 0
- tangentMode: 136
- weightedMode: 0
- inWeight: 0
- outWeight: 0
- - serializedVersion: 3
- time: 0.016666668
- value: REPLACEME_LETTER_VALUE
- inSlope: 0
- outSlope: 0
- tangentMode: 136
- weightedMode: 0
- inWeight: 0
- outWeight: 0
- m_PreInfinity: 2
- m_PostInfinity: 2
- m_RotationOrder: 4
- attribute: material.REPLACEME_LETTER_PARAM
- path: TaSTT
- classID: 137
- script: {fileID: 0}
- m_EulerEditorCurves: []
- m_HasGenericRootTransform: 0
- m_HasMotionFloatCurves: 0
- m_Events: []
-"""
-
-ANIMATOR_TEMPLATE = """
---- !u!91 &9100000
-AnimatorController:
- m_ObjectHideFlags: 0
- m_CorrespondingSourceObject: {fileID: 0}
- m_PrefabInstance: {fileID: 0}
- m_PrefabAsset: {fileID: 0}
- m_Name: TaSTT_fx
- serializedVersion: 5
- m_AnimatorParameters: []
- m_AnimatorLayers: []
-"""
-
-# For whatever reason, running unrelated animations s.a.
-# facial expressions can have a slight effect on supposedly
-# unrelated parameters, causing letter to flip. Add a
-# little buffer to reduce the odds that this effect causes
-# a letter to change after it has been written.
-UNITY_ANIMATION_FUDGE_MARGIN = 0.1
-
-def generateClearAnimation(anim_dir: str, guid_map: typing.Dict[str, str]):
- print("Generating board clearing animation", file=sys.stderr)
-
- parser = libunity.UnityParser()
- parser.parse(LETTER_ANIMATION_TEMPLATE)
-
- anim_node = parser.nodes[0]
- anim_clip = anim_node.mapping['AnimationClip']
- curve_template = anim_clip.mapping['m_FloatCurves'].sequence[0]
- anim_clip.mapping['m_FloatCurves'].sequence = []
- anim_clip.mapping['m_EditorCurves'].sequence = []
-
- letter = 0
-
- for byte in range(0, generate_utils.config.BYTES_PER_CHAR):
- for row in range(0, generate_utils.config.BOARD_ROWS):
- for col in range(0, generate_utils.config.BOARD_COLS):
- curve = curve_template.copy()
- for keyframe in curve.mapping['curve'].mapping['m_Curve'].sequence:
- keyframe.mapping['value'] = str(letter +
- UNITY_ANIMATION_FUDGE_MARGIN)
- curve.mapping['attribute'] = "material.{}".format(generate_utils.getShaderParamByRowColByte(row, col, byte))
- curve.mapping['path'] = "World Constraint/Container/TaSTT"
- # Add curve to animation
- anim_clip.mapping['m_FloatCurves'].sequence.append(curve)
- anim_clip.mapping['m_EditorCurves'].sequence.append(curve)
- # Serialize animation to file
- anim_name = generate_utils.getClearAnimationName()
- anim_path = os.path.join(anim_dir, anim_name + ".anim")
- print("Generating clear animation at {}".format(anim_path), file=sys.stderr)
- with open(anim_path, "w", encoding="utf-8") as f:
- f.write(libunity.unityYamlToString([anim_node]))
- # Generate metadata
- meta = libunity.Metadata()
- with open(anim_path + ".meta", "w", encoding="utf-8") as f:
- f.write(str(meta))
- # Add metadata to guid map
- guid_map[anim_path] = meta.guid
- guid_map[meta.guid] = anim_path
-
-# sound_chord: whether to play a, e, i, o, u
-# value: 0 or 1
-def generateSoundAnimation(sound_chord: typing.Tuple[int,int,int,int,int],
- value: int,
- anim_name: str,
- anim_dir: str, guid_map: typing.Dict[str, str],
- anim_delay_frames = 2):
- print(f"Generating sound animation {sound_chord} / {anim_name}", file=sys.stderr)
-
- parser = libunity.UnityParser()
- parser.parse(SOUND_ANIMATION_TEMPLATE)
-
- anim_node = parser.nodes[0]
- anim_clip = anim_node.mapping['AnimationClip']
- curve_template = anim_clip.mapping['m_FloatCurves'].sequence[0]
- anim_clip.mapping['m_FloatCurves'].sequence = []
- anim_clip.mapping['m_EditorCurves'].sequence = []
-
- # Animate all notes.
- for note_i in range(len(sound_chord)):
- curve = curve_template.copy()
-
- keyframe_template = curve.mapping['curve'].mapping['m_Curve'].sequence[0]
- curve.mapping['curve'].mapping['m_Curve'].sequence = []
-
- # First keyframe: zero all but first note
- if note_i != 0:
- keyframe = keyframe_template.copy()
- keyframe.mapping['time'] = 0
- keyframe.mapping['value'] = 0
- curve.mapping['path'] = f"World Constraint/Container/TaSTT/Audio {note_i + 1}"
- curve.mapping['curve'].mapping['m_Curve'].sequence.append(keyframe)
-
- # Subsequent keyframes: animate as normal
- keyframe = keyframe_template.copy()
- keyframe.mapping['time']= str(note_i * anim_delay_frames * 1.0 / 60.0)
- keyframe.mapping['value'] = str(sound_chord[note_i])
- curve.mapping['path'] = f"World Constraint/Container/TaSTT/Audio {note_i + 1}"
- curve.mapping['curve'].mapping['m_Curve'].sequence.append(keyframe)
-
- # Add curve to animation
- anim_clip.mapping['m_FloatCurves'].sequence.append(curve)
- anim_clip.mapping['m_EditorCurves'].sequence.append(curve)
-
- anim_clip.mapping['m_AnimationClipSettings'].mapping['m_StopTime'] = str((len(sound_chord)-1) * anim_delay_frames * 1.0 / 60.0)
-
- # Serialize animation to file
- anim_path = os.path.join(anim_dir, anim_name + ".anim")
- with open(anim_path, "w", encoding="utf-8") as f:
- f.write(libunity.unityYamlToString([anim_node]))
- # Generate metadata
- meta = libunity.Metadata()
- with open(anim_path + ".meta", "w", encoding="utf-8") as f:
- f.write(str(meta))
- # Add metadata to guid map
- guid_map[anim_path] = meta.guid
- guid_map[meta.guid] = anim_path
-
-# Generate a toggle animation for a shader parameter.
-def generateToggleAnimations(anim_dir, shader_param, guid_map):
- print("Generating shader toggle animation", file=sys.stderr)
-
- parser = libunity.UnityParser()
- parser.parse(LETTER_ANIMATION_TEMPLATE)
-
- # 0.0 represents false, 1.0 represents true. Don't forget that we add
- # `UNITY_ANIMATION_FUDGE_MARGIN` to everything.
- for shader_value in range(0, 2):
- anim_node = parser.nodes[0]
- anim_clip = anim_node.mapping['AnimationClip']
- curve_template = anim_clip.mapping['m_FloatCurves'].sequence[0]
- anim_clip.mapping['m_FloatCurves'].sequence = []
- anim_clip.mapping['m_EditorCurves'].sequence = []
-
- curve = curve_template.copy()
- for keyframe in curve.mapping['curve'].mapping['m_Curve'].sequence:
- keyframe.mapping['value'] = str(float(shader_value) +
- UNITY_ANIMATION_FUDGE_MARGIN)
- curve.mapping['attribute'] = "material.{}".format(shader_param)
- curve.mapping['path'] = "World Constraint/Container/TaSTT"
- # Add curve to animation
- anim_clip.mapping['m_FloatCurves'].sequence.append(curve)
- anim_clip.mapping['m_EditorCurves'].sequence.append(curve)
-
- # Serialize animation to file
- anim_name = generate_utils.getClearAnimationName()
- anim_suffix = "_Off"
- if shader_value == 1:
- anim_suffix = "_On"
- anim_path = os.path.join(anim_dir, shader_param + anim_suffix +
- ".anim")
- with open(anim_path, "w", encoding="utf-8") as f:
- f.write(libunity.unityYamlToString([anim_node]))
- # Generate metadata
- meta = libunity.Metadata()
- with open(anim_path + ".meta", "w", encoding="utf-8") as f:
- f.write(str(meta))
- # Add metadata to guid map
- guid_map[anim_path] = meta.guid
- guid_map[meta.guid] = anim_path
-
-# Generate a toggle animation for a shader parameter.
-def generateScaleAnimation(anim_name: str, anim_dir: str,
- path: str,
- value: float,
- guid_map: typing.Dict[str, str]) -> str:
- print("Generating scale animation {}".format(path),
- file=sys.stderr)
-
- parser = libunity.UnityParser()
- parser.parse(SCALE_ANIMATION_TEMPLATE)
-
- #print("kill me", file=sys.stderr)
- #print(libunity.unityYamlToString([parser.nodes[0]]), file=sys.stdout)
- #print("NOW", file=sys.stdout)
-
- # 0.0 represents false, 1.0 represents true. Don't forget that we add
- # `UNITY_ANIMATION_FUDGE_MARGIN` to everything.
- anim_node = parser.nodes[0]
- anim_clip = anim_node.mapping['AnimationClip']
- #print("here 3", file=sys.stderr)
- for curve in anim_clip.mapping['m_ScaleCurves'].sequence:
- for keyframe in curve.mapping['curve'].mapping['m_Curve'].sequence:
- keyframe.mapping['value'].mapping['x'] = str(value)
- keyframe.mapping['value'].mapping['y'] = str(value)
- keyframe.mapping['value'].mapping['z'] = str(value)
- #print("here 4", file=sys.stderr)
- for curve in anim_clip.mapping['m_EditorCurves'].sequence:
- for keyframe in curve.mapping['curve'].mapping['m_Curve'].sequence:
- keyframe.mapping['value'] = value
-
- #print("here 5", file=sys.stderr)
-
- # Serialize animation to file
- anim_path = os.path.join(anim_dir, anim_name + ".anim")
- with open(anim_path, "w", encoding="utf-8") as f:
- f.write(libunity.unityYamlToString([anim_node]))
- # Generate metadata
- meta = libunity.Metadata()
- with open(anim_path + ".meta", "w", encoding="utf-8") as f:
- f.write(str(meta))
- # Add metadata to guid map
- guid_map[anim_path] = meta.guid
- guid_map[meta.guid] = anim_path
-
- return meta.guid
-
-def generateAnimations(anim_dir: str, guid_map: typing.Dict[str, str]):
- generateClearAnimation(anim_dir, guid_map)
-
- for chord_bits in range(2**5):
- chord = [0, 0, 0, 0, 0]
- for i in range(5):
- if (chord_bits >> i) % 2 == 1:
- chord[i] = 1
- print(f"Generating chord {chord}", file=sys.stderr)
- anim_name = f"Sound_a{chord[0]}_e{chord[1]}_i{chord[2]}_o{chord[3]}_u{chord[4]}"
- generateSoundAnimation(chord, 0, anim_name, anim_dir, guid_map)
-
- print("Generating letter animations", file=sys.stderr)
-
- parser = libunity.UnityParser()
- parser.parse(LETTER_ANIMATION_TEMPLATE)
-
- anim_node = parser.nodes[0]
- anim_clip = anim_node.mapping['AnimationClip']
- curve_template = anim_clip.mapping['m_FloatCurves'].sequence[0]
- anim_clip.mapping['m_FloatCurves'].sequence = []
- anim_clip.mapping['m_EditorCurves'].sequence = []
-
- # To support more languages, we use 2 bytes per character, giving us a 64K character set.
- for byte in range(0, generate_utils.config.BYTES_PER_CHAR):
- for row in range(0, generate_utils.config.BOARD_ROWS):
- print("Generating letter animations (row {}/{}) (byte {}/2)".format(row,
- generate_utils.config.BOARD_ROWS, byte), file=sys.stderr)
- for col in range(0, generate_utils.config.BOARD_COLS):
- for letter in range(0, 2):
- if letter == 1:
- letter = generate_utils.config.CHARS_PER_CELL - 1
-
- # Make a deep copy of the templates
- node = anim_node.copy()
- curve = curve_template.copy()
- clip = node.mapping['AnimationClip']
- # Populate animation name
- anim_name = generate_utils.getLetterAnimationName(row, col, letter, byte)
- clip.mapping['m_Name'] = anim_name
- # Populate letter value
- for keyframe in curve.mapping['curve'].mapping['m_Curve'].sequence:
- keyframe.mapping['value'] = str(letter + UNITY_ANIMATION_FUDGE_MARGIN)
- # Populate path to letter parameter
- curve.mapping['attribute'] = "material.{}".format(generate_utils.getShaderParamByRowColByte(row, col, byte))
- curve.mapping['path'] = "World Constraint/Container/TaSTT"
- # Add curve to animation
- clip.mapping['m_FloatCurves'].sequence.append(curve)
- clip.mapping['m_EditorCurves'].sequence.append(curve)
- # Serialize animation to file
- anim_path = os.path.join(anim_dir, anim_name + ".anim")
- with open(anim_path, "w", encoding="utf-8") as f:
- f.write(libunity.unityYamlToString([node]))
- # Generate metadata
- meta = libunity.Metadata()
- with open(anim_path + ".meta", "w", encoding="utf-8") as f:
- f.write(str(meta))
- # Add metadata to guid map
- guid_map[anim_path] = meta.guid
- guid_map[meta.guid] = anim_path
-
-def generateFXController(anim: libunity.UnityAnimator) -> typing.Dict[int, libunity.UnityDocument]:
- parser = libunity.UnityParser()
- parser.parse(ANIMATOR_TEMPLATE)
- anim.addNodes(parser.nodes)
-
- anim.addParameter(generate_utils.getEnableParam(), bool)
- anim.addParameter(generate_utils.getDummyParam(), bool)
- anim.addParameter(generate_utils.getToggleParam(), bool)
- anim.addParameter(generate_utils.getClearBoardParam(), bool)
- anim.addParameter(generate_utils.getScaleParam(), float)
- anim.addParameter(generate_utils.getEnablePhonemeParam(), bool)
-
- for i in range(5):
- anim.addParameter(generate_utils.getSoundParam(i+1), bool)
-
- anim.addLayer("=== TaSTT ===", weight=0.0)
-
- layers = {}
- for byte in range(0, generate_utils.config.BYTES_PER_CHAR):
- layers[byte] = {}
- for i in range(0, generate_utils.config.CHARS_PER_SYNC):
- anim.addParameter(generate_utils.getBlendParam(i, byte), float)
-
- layer = anim.addLayer(generate_utils.getLayerName(i, byte))
- layers[byte][i] = layer
- anim.addParameter(generate_utils.getSelectParam(), int)
-
- return layers
-
-def generateFXLayer(which_layer: int, anim: libunity.UnityAnimator, layer:
- libunity.UnityDocument, gen_anim_dir: str, byte: int):
- is_default_state = True
- default_state = anim.addAnimatorState(layer,
- generate_utils.getDefaultStateName(which_layer, byte), is_default_state)
-
- dy = 100
- active_state = anim.addAnimatorState(layer,
- generate_utils.getActiveStateName(which_layer, byte), dy = dy)
-
- active_state_transition = anim.addTransition(active_state)
- enable_param = generate_utils.getEnableParam()
- anim.addTransitionBooleanCondition(default_state, active_state_transition,
- enable_param, True)
-
- select_states = {}
- for i in range(0, generate_utils.config.numRegions(which_layer)):
- dx = i * 200
- dy = 200
-
- # Create blend tree for this region.
- anim_lo_path = os.path.join(gen_anim_dir,
- generate_utils.getAnimationNameByLayerAndIndex(
- which_layer, i, 0, byte) + \
- ".anim")
- guid_lo = guid_map[anim_lo_path]
- anim_hi_path = os.path.join(gen_anim_dir,
- generate_utils.getAnimationNameByLayerAndIndex(
- which_layer, i, generate_utils.config.CHARS_PER_CELL - 1, byte) + \
- ".anim")
- guid_hi = guid_map[anim_hi_path]
-
- select_states[i] = anim.addAnimatorBlendTree(layer,
- generate_utils.getBlendStateName(which_layer, i, byte),
- generate_utils.getBlendParam(which_layer, byte),
- guid_lo, guid_hi, dx = dx, dy = dy)
- state = select_states[i]
-
- # Create transition to state.
- select_state_transition = anim.addTransition(state)
- select_param = generate_utils.getSelectParam()
- anim.addTransitionIntegerEqualityCondition(active_state,
- select_state_transition, select_param, i)
-
- # Create return-home transition.
- home_state_transition = anim.addTransition(default_state)
- home_state_transition.mapping['AnimatorStateTransition'].mapping['m_InterruptionSource'] = '0'
- dummy_param = generate_utils.getDummyParam()
- anim.addTransitionBooleanCondition(state,
- home_state_transition, dummy_param, False)
-
- if generate_utils.config.layerNeedsParity(which_layer):
- # There may be layers which never write to the text box. In this case,
- # when those layers are turned on to write to that last region, they
- # simply transition back to the default (idle) state.
- home_state_transition = anim.addTransition(default_state)
- select_param = generate_utils.getSelectParam()
- i = generate_utils.config.numRegions(0) - 1
- anim.addTransitionIntegerEqualityCondition(active_state,
- home_state_transition, select_param, i)
-
-# Generic toggle adding utility.
-# Generates the layer and parameter.
-# Returns a map containing the off and on states, as well as the
-# transitions between them.
-def generateToggle(layer_name: str,
- parameter_name: str,
- gen_anim_dir: str,
- off_anim_basename: str,
- on_anim_basename: str,
- anim: libunity.UnityAnimator,
- guid_map: typing.Dict[str, str],
- duration_s: float = 0.0) -> typing.Dict[str,
- libunity.UnityDocument]:
- layer = anim.addLayer(layer_name)
-
- # For simplicity, use the layer name as the parameter name.
- anim.addParameter(parameter_name, bool)
-
- off_state = anim.addAnimatorState(layer, layer_name + "_Off",
- is_default_state = True)
- on_state = anim.addAnimatorState(layer, layer_name + "_On", dy=100)
-
- if off_anim_basename:
- off_anim_path = os.path.join(gen_anim_dir, off_anim_basename)
- off_anim_meta = libunity.Metadata()
- off_anim_meta.loadOrCreate(off_anim_path, guid_map)
- anim.setAnimatorStateAnimation(off_state, off_anim_meta.guid)
-
- if on_anim_basename:
- on_anim_path = os.path.join(gen_anim_dir, on_anim_basename)
- on_anim_meta = libunity.Metadata()
- on_anim_meta.loadOrCreate(on_anim_path, guid_map)
- anim.setAnimatorStateAnimation(on_state, on_anim_meta.guid)
-
- off_to_on_trans = anim.addTransition(on_state, duration_s)
- anim.addTransitionBooleanCondition(off_state,
- off_to_on_trans, parameter_name, True)
-
- on_to_off_trans = anim.addTransition(off_state, duration_s)
- anim.addTransitionBooleanCondition(on_state,
- on_to_off_trans, parameter_name, False)
-
- result = {}
- result["off"] = off_state
- result["on"] = on_state
- result["off_to_on"] = off_to_on_trans
- result["on_to_off"] = on_to_off_trans
-
- return result
-
-def generateScaleLayer(anim: libunity.UnityAnimator,
- gen_anim_dir: str,
- guid_map: typing.Dict[str, str]):
-
- scale_layer = anim.addLayer(generate_utils.getScaleParam())
-
- path = "World Constraint/Container/TaSTT"
- attribute = "blendShape.Scale"
-
- guid_lo = generateScaleAnimation("TaSTT_Scale_0", gen_anim_dir,
- path,
- 5.0, guid_map)
- guid_hi = generateScaleAnimation("TaSTT_Scale_100", gen_anim_dir,
- path,
- 100.0, guid_map)
-
- anim.addAnimatorBlendTree(scale_layer,
- generate_utils.getScaleParam(),
- generate_utils.getScaleParam(),
- guid_lo, guid_hi,
- lo_threshold = 0.0, hi_threshold = 1.0);
-
- pass
-
-def generateSoundLayer(anim: libunity.UnityAnimator,
- gen_anim_dir: str,
- guid_map: typing.Dict[str, str],
- anim_len_s = 12.0/60.0):
-
- layer = anim.addLayer("TaSTT_Sound")
-
- idle_state = anim.addAnimatorState(layer, "Idle", is_default_state=True, dy=-100)
- a_state = anim.addAnimatorState(layer, "a")
-
- trans = anim.addTransition(a_state)
- param = generate_utils.getEnablePhonemeParam()
- anim.addTransitionBooleanCondition(idle_state, trans, param, True)
-
- for a_bool in range(2):
- dy = 100
- dx = a_bool * 800
- # Create `e` state.
- ax_e_state = anim.addAnimatorState(layer,
- f"a{a_bool}_e",
- dy=dy, dx=dx)
- # Create transition based on whether `a` is set.
- trans = anim.addTransition(ax_e_state)
- param = generate_utils.getSoundParam(1)
- anim.addTransitionBooleanCondition(a_state, trans, param, a_bool)
-
- for e_bool in range(2):
- dy = 200
- dx = a_bool * 800 + e_bool * 400
-
- # Create `i` state.
- ax_ex_i_state = anim.addAnimatorState(layer,
- f"a{a_bool}_e{e_bool}_i",
- dy=dy, dx=dx)
-
- # Create transition based on whether `e` is set.
- trans = anim.addTransition(ax_ex_i_state)
- param = generate_utils.getSoundParam(2)
- anim.addTransitionBooleanCondition(ax_e_state, trans, param, e_bool)
-
- for i_bool in range(2):
- dy = 300
- dx = a_bool * 800 + e_bool * 400 + i_bool * 200
-
- # Create `o` state.
- ax_ex_ix_o_state = anim.addAnimatorState(layer,
- f"a{a_bool}_e{e_bool}_i{i_bool}_o",
- dy=dy, dx=dx)
- # Create transition based on whether `i` is set.
- trans = anim.addTransition(ax_ex_ix_o_state)
- param = generate_utils.getSoundParam(3)
- anim.addTransitionBooleanCondition(ax_ex_i_state, trans, param, i_bool)
-
- for o_bool in range(2):
- dy = 400
- dx = a_bool * 800 + e_bool * 400 + i_bool * 200 + o_bool * 100
-
- # Create `u` state.
- ax_ex_ix_ox_u_state = anim.addAnimatorState(layer,
- f"a{a_bool}_e{e_bool}_i{i_bool}_o{o_bool}_u",
- dy=dy, dx=dx)
- # Create transition based on whether `o` is set.
- trans = anim.addTransition(ax_ex_ix_ox_u_state)
- param = generate_utils.getSoundParam(4)
- anim.addTransitionBooleanCondition(ax_ex_ix_o_state,
- trans, param, o_bool)
-
- for u_bool in range(2):
- dy = 500
- dx = a_bool * 800 + e_bool * 400 + i_bool * 200 + o_bool * 100 + u_bool * 50
- if u_bool == 1:
- dy = 550
-
- # Create `u` state.
- ax_ex_ix_ox_ux_state = anim.addAnimatorState(layer,
- f"a{a_bool}_e{e_bool}_i{i_bool}_o{o_bool}_u{u_bool}",
- dy=dy, dx=dx)
- # Create transition based on whether `u` is set.
- trans = anim.addTransition(ax_ex_ix_ox_ux_state)
- param = generate_utils.getSoundParam(5)
- anim.addTransitionBooleanCondition(ax_ex_ix_ox_u_state,
- trans, param, u_bool)
-
- chord = [a_bool, e_bool, i_bool, o_bool, u_bool]
- anim_name = f"Sound_a{chord[0]}_e{chord[1]}_i{chord[2]}_o{chord[3]}_u{chord[4]}"
- anim_path = os.path.join(gen_anim_dir, anim_name + ".anim")
- anim_guid = guid_map[anim_path]
- anim.setAnimatorStateAnimation(ax_ex_ix_ox_ux_state, anim_guid)
-
- # Create return-home transitions.
- trans = anim.addTransition(idle_state, dur_s = anim_len_s)
- trans.mapping['AnimatorStateTransition'].mapping['m_InterruptionSource'] = '0'
- param = generate_utils.getSoundParam(1)
- anim.addTransitionBooleanCondition(ax_ex_ix_ox_ux_state, trans, param, 1 - a_bool)
-
- trans = anim.addTransition(idle_state, dur_s = anim_len_s)
- trans.mapping['AnimatorStateTransition'].mapping['m_InterruptionSource'] = '0'
- param = generate_utils.getSoundParam(2)
- anim.addTransitionBooleanCondition(ax_ex_ix_ox_ux_state, trans, param, 1 - e_bool)
-
- trans = anim.addTransition(idle_state, dur_s = anim_len_s)
- trans.mapping['AnimatorStateTransition'].mapping['m_InterruptionSource'] = '0'
- param = generate_utils.getSoundParam(3)
- anim.addTransitionBooleanCondition(ax_ex_ix_ox_ux_state, trans, param, 1 - i_bool)
-
- trans = anim.addTransition(idle_state, dur_s = anim_len_s)
- trans.mapping['AnimatorStateTransition'].mapping['m_InterruptionSource'] = '0'
- param = generate_utils.getSoundParam(4)
- anim.addTransitionBooleanCondition(ax_ex_ix_ox_ux_state, trans, param, 1 - o_bool)
-
- trans = anim.addTransition(idle_state, dur_s = anim_len_s)
- trans.mapping['AnimatorStateTransition'].mapping['m_InterruptionSource'] = '0'
- param = generate_utils.getSoundParam(5)
- anim.addTransitionBooleanCondition(ax_ex_ix_ox_ux_state, trans, param, 1 - u_bool)
-
-def generateFX(guid_map, gen_anim_dir):
- anim = libunity.UnityAnimator()
-
- layers = generateFXController(anim)
-
- # TODO(yum) parallelize
- for byte in range(0, generate_utils.config.BYTES_PER_CHAR):
- for which_layer, layer in layers[byte].items():
- print("Generating layer {}/{}".format(which_layer, len(layers[byte].items())), file=sys.stderr)
- generateFXLayer(which_layer, anim, layer, gen_anim_dir, byte)
-
- generateToggle(generate_utils.getToggleParam(),
- generate_utils.getToggleParam(),
- gen_anim_dir,
- "TaSTT_Toggle_Off.anim",
- "TaSTT_Toggle_On.anim",
- anim, guid_map)
- generateToggle(generate_utils.getLockWorldParam(),
- generate_utils.getLockWorldParam(),
- gen_anim_dir,
- "TaSTT_Lock_World_Disable.anim",
- "TaSTT_Lock_World_Enable.anim",
- anim, guid_map)
- generateToggle(generate_utils.getEllipsisParam(),
- generate_utils.getEllipsisParam(),
- gen_anim_dir,
- "TaSTT_Ellipsis_Off.anim",
- "TaSTT_Ellipsis_On.anim",
- anim, guid_map)
- generateToggle(
- generate_utils.getClearBoardParam(),
- generate_utils.getClearBoardParam(),
- gen_anim_dir,
- None, # No animation in the `off` state.
- generate_utils.getClearAnimationName() + ".anim",
- anim, guid_map)
- generateToggle("TaSTT_Expand",
- generate_utils.getToggleParam(),
- gen_anim_dir,
- "TaSTT_Emerge_000.anim",
- "TaSTT_Emerge_100.anim",
- anim, guid_map, 0.5)
-
- generateScaleLayer(anim, gen_anim_dir, guid_map)
- generateSoundLayer(anim, gen_anim_dir, guid_map)
-
- return anim
-
-def parseArgs():
- print("args: {}".format(" ".join(sys.argv)))
-
- parser = argparse.ArgumentParser()
- parser.add_argument("cmd", type=str, help="")
- parser.add_argument("--config", type=str, help="The app config.")
- parser.add_argument("--gen_dir", type=str, help="The directory under " +
- "which all generated assets are placed")
- parser.add_argument("--gen_anim_dir", type=str, help="The directory under " +
- "which all generated animations are placed.")
- parser.add_argument("--guid_map", type=str, help="The path to a file which will store guids")
- parser.add_argument("--fx_dest", type=str, help="The path at which to save the generated FX controller")
- args = parser.parse_args()
-
- if not args.gen_dir:
- args.gen_dir = "generated/"
-
- if not args.config:
- print("--config required")
- sys.exit(1)
-
- if not args.gen_anim_dir:
- args.gen_anim_dir = args.gen_dir + "animations/"
-
- if not args.guid_map:
- args.guid_map = "guid.map"
-
- if not args.fx_dest:
- args.fx_dest = args.gen_dir + "TaSTT_fx.controller"
-
- return args
-
-if __name__ == "__main__":
- args = parseArgs()
- cfg = app_config.getConfig(args.config)
-
- print(f"chdir to {os.path.dirname(os.path.abspath(__file__))}")
- os.chdir(os.path.dirname(os.path.abspath(__file__)))
-
- if args.cmd == "gen_anims":
- generate_utils.config.BYTES_PER_CHAR = int(cfg["bytes_per_char"])
- generate_utils.config.CHARS_PER_SYNC = int(cfg["chars_per_sync"])
- generate_utils.config.BOARD_ROWS = int(cfg["rows"])
- generate_utils.config.BOARD_COLS = int(cfg["cols"])
-
- guid_map = {}
- with open(args.guid_map, 'rb') as f:
- guid_map = pickle.load(f)
-
- os.makedirs(args.gen_anim_dir, exist_ok=True)
- generateAnimations(args.gen_anim_dir, guid_map)
-
- with open(args.guid_map, 'wb') as f:
- pickle.dump(guid_map, f)
- elif args.cmd == "gen_fx":
- generate_utils.config.BYTES_PER_CHAR = int(cfg["bytes_per_char"])
- generate_utils.config.CHARS_PER_SYNC = int(cfg["chars_per_sync"])
- generate_utils.config.BOARD_ROWS = int(cfg["rows"])
- generate_utils.config.BOARD_COLS = int(cfg["cols"])
-
- guid_map = {}
- with open(args.guid_map, 'rb') as f:
- guid_map = pickle.load(f)
- os.makedirs(os.path.dirname(args.fx_dest), exist_ok=True)
- with open(args.fx_dest, "w", encoding="utf-8") as f:
- f.write(str(generateFX(guid_map, args.gen_anim_dir)))
- with open(args.guid_map, 'wb') as f:
- pickle.dump(guid_map, f)
-
- # If we don't do this, then VRChat will fail to update the animator
- # when users update their avatars.
- if os.path.exists(args.fx_dest + ".meta"):
- os.remove(args.fx_dest + ".meta")
-
diff --git a/Scripts/libunity.py b/Scripts/libunity.py
deleted file mode 100644
index 77eeb95..0000000
--- a/Scripts/libunity.py
+++ /dev/null
@@ -1,1432 +0,0 @@
-#!/usr/bin/env python3
-
-from functools import partial
-
-import argparse
-import copy
-import enum
-import math
-import os
-import pickle
-import random
-import sys
-import typing
-# python3 -m pip install pyyaml
-# License: MIT.
-import yaml
-
-import multiprocessing as mp
-
-WRITE_DEFAULTS_ANIM_TEMPLATE = """
-%YAML 1.1
-%TAG !u! tag:unity3d.com,2011:
---- !u!74 &7400000
-AnimationClip:
- m_ObjectHideFlags: 0
- m_CorrespondingSourceObject: {fileID: 0}
- m_PrefabInstance: {fileID: 0}
- m_PrefabAsset: {fileID: 0}
- m_Name: TaSTT_Reset_Animations
- serializedVersion: 6
- m_Legacy: 0
- m_Compressed: 0
- m_UseHighQualityCurve: 1
- m_RotationCurves: []
- m_CompressedRotationCurves: []
- m_EulerCurves: []
- m_PositionCurves: []
- m_ScaleCurves: []
- m_FloatCurves:
- - curve:
- serializedVersion: 2
- m_Curve:
- - serializedVersion: 3
- time: 0
- value: 0
- inSlope: 0
- outSlope: 0
- tangentMode: 136
- weightedMode: 0
- inWeight: 0
- outWeight: 0
- m_PreInfinity: 2
- m_PostInfinity: 2
- m_RotationOrder: 4
- attribute: REPLACEME_ATTRIBUTE
- path: REPLACEME_PATH
- classID: 137
- script: {fileID: 0}
- m_PPtrCurves: []
- m_SampleRate: 60
- m_WrapMode: 0
- m_Bounds:
- m_Center: {x: 0, y: 0, z: 0}
- m_Extent: {x: 0, y: 0, z: 0}
- m_ClipBindingConstant:
- genericBindings:
- - serializedVersion: 2
- path: 2794480623
- attribute: 2284639795
- script: {fileID: 0}
- typeID: 137
- customType: 22
- isPPtrCurve: 0
- pptrCurveMapping: []
- m_AnimationClipSettings:
- serializedVersion: 2
- m_AdditiveReferencePoseClip: {fileID: 0}
- m_AdditiveReferencePoseTime: 0
- m_StartTime: 0
- m_StopTime: 0
- m_OrientationOffsetY: 0
- m_Level: 0
- m_CycleOffset: 0
- m_HasAdditiveReferencePose: 0
- m_LoopTime: 1
- m_LoopBlend: 0
- m_LoopBlendOrientation: 0
- m_LoopBlendPositionY: 0
- m_LoopBlendPositionXZ: 0
- m_KeepOriginalOrientation: 0
- m_KeepOriginalPositionY: 1
- m_KeepOriginalPositionXZ: 0
- m_HeightFromFeet: 0
- m_Mirror: 0
- m_EditorCurves:
- - curve:
- serializedVersion: 2
- m_Curve:
- - serializedVersion: 3
- time: 0
- value: 0
- inSlope: 0
- outSlope: 0
- tangentMode: 136
- weightedMode: 0
- inWeight: 0
- outWeight: 0
- m_PreInfinity: 2
- m_PostInfinity: 2
- m_RotationOrder: 4
- attribute: REPLACEME_ATTRIBUTE
- path: REPLACEME_PATH
- classID: 137
- script: {fileID: 0}
- m_EulerEditorCurves: []
- m_HasGenericRootTransform: 0
- m_HasMotionFloatCurves: 0
- m_Events: []
-"""[1:][:-1]
-
-METADATA_TEMPLATE = """
-fileFormatVersion: 2
-guid: REPLACEME_GUID
-NativeFormatImporter:
- externalObjects: {}
- mainObjectFileID: 7400000
- userData:
- assetBundleName:
- assetBundleVariant:
-"""[1:][:-1]
-
-ANIMATION_STATE_TEMPLATE = """
---- !u!1102 &110200000
-AnimatorState:
- serializedVersion: 6
- m_ObjectHideFlags: 1
- m_CorrespondingSourceObject: {fileID: 0}
- m_PrefabInstance: {fileID: 0}
- m_PrefabAsset: {fileID: 0}
- m_Name: REPLACEME_ANIMATION_NAME
- m_Speed: 1
- m_CycleOffset: 0
- m_Transitions: []
- m_StateMachineBehaviours: []
- m_Position: {x: 50, y: 50, z: 0}
- m_IKOnFeet: 0
- m_WriteDefaultValues: 0
- m_Mirror: 0
- m_SpeedParameterActive: 0
- m_MirrorParameterActive: 0
- m_CycleOffsetParameterActive: 0
- m_TimeParameterActive: 0
- m_Motion: {}
- m_Tag:
- m_SpeedParameter:
- m_MirrorParameter:
- m_CycleOffsetParameter:
- m_TimeParameter:
-"""[1:][:-1]
-
-TRANSITION_TEMPLATE = """
---- !u!1101 &110100000
-AnimatorStateTransition:
- m_ObjectHideFlags: 1
- m_CorrespondingSourceObject: {fileID: 0}
- m_PrefabInstance: {fileID: 0}
- m_PrefabAsset: {fileID: 0}
- m_Name:
- m_Conditions: []
- m_DstStateMachine: {fileID: 0}
- m_DstState: {fileID: 0}
- m_Solo: 0
- m_Mute: 0
- m_IsExit: 0
- serializedVersion: 3
- m_TransitionDuration: 0
- m_TransitionOffset: 0
- m_ExitTime: 0.0
- m_HasExitTime: 0
- m_HasFixedDuration: 1
- m_InterruptionSource: 2
- m_OrderedInterruption: 1
- m_CanTransitionToSelf: 1
-"""[1:][:-1]
-
-BLEND_TREE_TEMPLATE = """
---- !u!206 &1071664566462684110
-BlendTree:
- m_ObjectHideFlags: 1
- m_CorrespondingSourceObject: {fileID: 0}
- m_PrefabInstance: {fileID: 0}
- m_PrefabAsset: {fileID: 0}
- m_Name: REPLACEME_BLEND_TREE_NAME
- m_Childs:
- - serializedVersion: 2
- m_Motion: {fileID: 7400000, guid: REPLACEME_GUID_LO, type: 2}
- m_Threshold: -1
- m_Position: {x: 0, y: 0}
- m_TimeScale: 1
- m_CycleOffset: 0
- m_DirectBlendParameter: REPLACEME_BLEND_PARAMETER
- m_Mirror: 0
- - serializedVersion: 2
- m_Motion: {fileID: 7400000, guid: REPLACEME_GUID_HI, type: 2}
- m_Threshold: 1
- m_Position: {x: 0, y: 0}
- m_TimeScale: 1
- m_CycleOffset: 0
- m_DirectBlendParameter: REPLACEME_BLEND_PARAMETER
- m_Mirror: 0
- m_BlendParameter: REPLACEME_BLEND_PARAMETER
- m_BlendParameterY: REPLACEME_BLEND_PARAMETER
- m_MinThreshold: -1
- m_MaxThreshold: 1
- m_UseAutomaticThresholds: 0
- m_NormalizedBlendValues: 0
- m_BlendType: 0
-"""[1:][:-1]
-
-class Metadata:
- def __init__(self):
- self.guid = "%032x" % random.randrange(16 ** 32)
-
- def load(self, path):
- if not path.endswith(".meta"):
- path = path + ".meta"
-
- self.guid = None
- with open(path, "r", encoding="utf-8") as f:
- for line in f:
- if line.startswith("guid"):
- self.guid = line.split()[1]
-
- def loadOrCreate(self, path, guid_map):
- if not path.endswith(".meta"):
- path = path + ".meta"
-
- if os.path.exists(path):
- self.load(path)
- return
-
- self.persist(path, guid_map)
-
- def persist(self, path, guid_map):
- with open(path, "w", encoding="utf-8") as f:
- f.write(str(self))
-
- guid_map[self.guid] = path
- guid_map[path] = self.guid
-
- def __str__(self):
- return METADATA_TEMPLATE.replace("REPLACEME_GUID", self.guid)
-
-class Node:
- def __init__(self):
- # Optional. In Unity, this is the fileID of an object. Not all YAML
- # mappings have an anchor.
- self.anchor = None
-
- # Pointer to the Node containing this one.
- self.parent = None
-
-class Sequence(Node):
- def __init__(self):
- super().__init__()
- self.sequence = []
-
- def copy(self):
- new = Sequence()
- new.anchor = self.anchor
- new.parent = self.parent
-
- for v in self.sequence:
- if hasattr(v, "copy"):
- new.sequence.append(v.copy())
- new.sequence[-1].parent = new
- else:
- new.sequence.append(v)
-
- return new
-
- def prettyPrint(self, first_indent=None, leading_newline=None):
- depth = 0
- p = self.parent
- while p != None:
- depth += 1
- p = p.parent
- indent = " " * depth
-
- lines = []
- first = True
- for item in self.sequence:
- cur_indent = indent
- if first:
- if first_indent != None:
- cur_indent = first_indent
- first = False
- if hasattr(item, "prettyPrint"):
- lines.append("{}- {}".format(cur_indent, item.prettyPrint(first_indent="", leading_newline=False)))
- else:
- lines.append("{}- {}".format(cur_indent, item))
-
- if len(lines) == 0:
- return "[]"
-
- return "\n" + '\n'.join(lines)
-
- def __str__(self):
- return self.prettyPrint()
-
- def addChildMapping(self, anchor = None, add_to_head = False):
- child = Mapping()
- child.anchor = anchor
- child.parent = self
- child.sequence = []
-
- if add_to_head:
- self.sequence = [child] + self.sequence
- else:
- self.sequence.append(child)
-
- return child
-
- def addChildSequence(self, anchor = None):
- child = Sequence()
- child.anchor = anchor
- child.parent = self
- child.sequence = []
-
- self.sequence.append(child)
-
- return child
-
- def forEach(self, cb):
- for k in self.sequence:
- cb(k)
-
-class Mapping(Node):
- def __init__(self):
- super().__init__()
- self.mapping = {}
-
- def copy(self):
- new = Mapping()
- new.anchor = self.anchor
- new.parent = self.parent
-
- for k, v in self.mapping.items():
- if hasattr(v, "copy"):
- new.mapping[k] = v.copy()
- new.mapping[k].parent = new
- else:
- new.mapping[k] = v
-
- return new
-
- def prettyPrint(self, first_indent=None, leading_newline=True):
- depth = 0
- p = self.parent
- while p != None:
- depth += 1
- p = p.parent
- indent = " " * depth
-
- lines = []
- first = True
- for k, v in self.mapping.items():
- cur_indent = indent
- if first:
- if first_indent != None:
- cur_indent = first_indent
- first = False
- lines.append("{}{}: {}".format(cur_indent, k, v))
-
- result = '\n'.join(lines)
-
- # Inline 1-item mappings, matching Unity behavior.
- if len(self.mapping.keys()) == 1 and len(result.split("\n")) == 1:
- if first_indent == None:
- return self.prettyPrint(first_indent="")
- return "{" + lines[0] + "}"
-
- # Empty mappings are represented by '{}'. If we don't do this, Unity
- # will assume that they are Sequences and get very sad.
- if len(self.mapping.keys()) == 0:
- return "{}"
-
- if leading_newline:
- result = "\n" + result
-
- return result
-
- def __str__(self):
- return self.prettyPrint()
-
- def addChildMapping(self, key, anchor = None):
- child = Mapping()
- child.anchor = anchor
- child.parent = self
- child.mapping = {}
-
- self.mapping[key] = child
-
- return child
-
- def addChildSequence(self, key, anchor = None):
- child = Sequence()
- child.anchor = anchor
- child.parent = self
- child.mapping = {}
-
- self.mapping[key] = child
-
- return child
-
- def forEach(self, cb):
- for k, v in self.mapping.items():
- cb(v)
-
-class UnityDocument(Mapping):
- def __init__(self):
- super().__init__()
- self.class_id = None
-
- def __str__(self):
- return super().__str__()
-
- def copy(self):
- result = super().copy()
- result.class_id = self.class_id
- return result
-
-# Class representing a Unity AnimatorController. Implements manipulations, like
-# merging and reanchoring.
-class UnityAnimator():
- def __init__(self):
- self.nodes = []
- self.id_to_node = {}
- self.next_id = 1000 * 1000
-
- def __str__(self):
- return unityYamlToString(self.nodes)
-
- def addNodes(self, nodes):
- for node in nodes:
- self.nodes.append(node)
- anchor = node.anchor
- if anchor == None:
- anchor = self.allocateId()
- if anchor in self.id_to_node:
- raise Exception("Duplicate anchor: {}, node 1: {}, node 2: {}".format(anchor, str(node), str(self.id_to_node[anchor])))
- self.id_to_node[anchor] = node
-
- if int(anchor) > self.next_id:
- self.next_id = int(anchor) + 1
- # I don't know why but this fixes a bug in the `fixWriteDefaults`
- # codepath: two documents wind up with the same anchor.
- self.next_id += 1
-
- def allocateId(self) -> int:
- result = self.next_id
- self.next_id += 1
- return result
-
- # Checks if `old_id` is in `self.id_mapping`, and if so, returns the
- # already-generated ID. Otherwise this allocates a new ID and
- # records it in `self.id_mapping`.
- def mapId(self, old_id: str) -> int:
- new_id = None
- if old_id in self.id_mapping.keys():
- new_id = self.id_mapping[old_id]
- else:
- new_id = self.allocateId()
- self.id_mapping[old_id] = new_id
- return new_id
-
- # Recursively iterate every mapping under `node` and assign new IDs to
- # every identifier. Mappings are recorded in `self.id_mapping`.
- def mergeIterator(self, node):
- if hasattr(node, "mapping"):
- # Don't relabel anything that's defined in an external file.
- # TODO(yum) do this.
- if 'fileID' in node.mapping and not 'guid' in node.mapping:
- if node.mapping['fileID'] != '0':
- old_id = node.mapping['fileID']
- new_id = self.mapId(old_id)
- node.mapping['fileID'] = str(new_id)
- if hasattr(node, "forEach"):
- node.forEach(self.mergeIterator)
-
- # Delete any key-value pairs where the value == the value.
- def scrubReferencesByValue(self, node, values: typing.Set[str]):
- if hasattr(node, "mapping"):
- node.mapping = {k: v for k, v in node.mapping.items() if v not in values}
- if hasattr(node, "forEach"):
- node.forEach(partial(self.scrubReferencesByValue, values=values))
-
- def peekNodeOfClass(self, classId):
- for node in self.nodes:
- if node.class_id == classId:
- return node
- return None
-
- def popNodeOfClass(self, classId):
- result = None
- for node in self.nodes:
- if node.class_id == classId:
- result = node
- self.nodes.remove(result)
- break
- if result:
- del self.id_to_node[result.anchor]
- return result
-
- def pushNode(self, node):
- self.nodes.append(node)
- self.id_to_node[node.anchor] = node
-
- # Merges two animator controllers and returns the result. Any identifiers
- # in the animators are reassigned in a new namespace. The mappings from old
- # identifiers to new identifiers are recorded in `self.id_mapping0` and
- # `self.id_mapping1`.
- def mergeAnimatorControllers(self, ctrl0, ctrl1):
- ctrl0 = copy.deepcopy(ctrl0)
- ctrl1 = copy.deepcopy(ctrl1)
-
- self.id_mapping0 = {}
- self.id_mapping1 = {}
-
- p0 = ctrl0.mapping['AnimatorController'].mapping['m_AnimatorParameters']
- p1 = ctrl1.mapping['AnimatorController'].mapping['m_AnimatorParameters']
-
- a0 = ctrl0.mapping['AnimatorController'].mapping['m_AnimatorLayers']
- a1 = ctrl1.mapping['AnimatorController'].mapping['m_AnimatorLayers']
-
- self.id_mapping = self.id_mapping0
- p0.forEach(self.mergeIterator)
- a0.forEach(self.mergeIterator)
-
- # Hack to prevent ctrl1 from getting a new ID for the animator.
- # TODO(yum) delete this?
- #del self.class_to_next_id['91']
-
- self.id_mapping = self.id_mapping1
- p1.forEach(self.mergeIterator)
- a1.forEach(self.mergeIterator)
-
- p0.sequence += p1.sequence
- a0.sequence += a1.sequence
-
- for elm in p0.sequence:
- elm.mapping['m_Controller'].mapping['fileID'] = ctrl0.anchor
- for elm in a0.sequence:
- elm.mapping['m_Controller'].mapping['fileID'] = ctrl0.anchor
-
- return ctrl0
-
- def merge(self, other):
- ctrl0 = self.popNodeOfClass('91')
- ctrl1 = other.popNodeOfClass('91')
- # Merge animators and populate `self.id_mapping0` and
- # `self.id_mapping1.
- merged_anim = self.mergeAnimatorControllers(ctrl0, ctrl1)
-
- # Mapping from class ID (string) to new class ID (int)
- self.id_mapping = self.id_mapping0
- for node in self.nodes:
- new_id = self.mapId(node.anchor)
- node.anchor = str(new_id)
- node.forEach(self.mergeIterator)
-
- self.id_mapping = self.id_mapping1
- for node in other.nodes:
- new_id = self.mapId(node.anchor)
- node.anchor = str(new_id)
- node.forEach(self.mergeIterator)
-
- nodes = self.nodes
- self.nodes = []
- self.id_to_node = {}
- self.pushNode(merged_anim)
- self.addNodes(nodes)
- self.addNodes(other.nodes)
-
- # TODO(yum) support overwriting duplicates
- def addParameter(self, param_name, param_type):
- unity_type = None
- if param_type == float:
- unity_type = '1'
- elif param_type == int:
- unity_type = '3'
- elif param_type == bool:
- unity_type = '4'
-
- anim = self.peekNodeOfClass('91')
- params = anim.mapping['AnimatorController'].mapping['m_AnimatorParameters']
-
- for p in params.sequence:
- if p.mapping['m_Name'] == param_name:
- return
-
- param = params.addChildMapping()
- param.mapping['m_Name'] = param_name
- param.mapping['m_Type'] = unity_type
- param.mapping['m_DefaultFloat'] = '0'
- param.mapping['m_DefaultInt'] = '0'
- param.mapping['m_DefaultBool'] = '0'
- ctrl = param.addChildMapping('m_Controller')
- ctrl.mapping['fileID'] = anim.anchor
-
- def addLayer(self, layer_name, add_to_head = False, weight: float = 1.0) -> UnityDocument:
- # Add layer to controller
- anim = self.peekNodeOfClass('91')
- layers = anim.mapping['AnimatorController'].mapping['m_AnimatorLayers']
- layer = layers.addChildMapping(add_to_head = add_to_head)
- layer.mapping['serializedVersion'] = '5'
- layer.mapping['m_Name'] = layer_name
- new_id = self.allocateId()
- layer.addChildMapping('m_StateMachine').mapping['fileID'] = str(new_id)
- layer.addChildMapping('m_Mask').mapping['fileID'] = '0'
- layer.addChildSequence('m_Motions')
- layer.addChildSequence('m_Behaviours')
- layer.mapping['m_BlendingMode'] = '0'
- layer.mapping['m_SyncedLayerIndex'] = '-1'
- layer.mapping['m_DefaultWeight'] = str(weight)
- layer.mapping['m_IKPass'] = '0'
- layer.mapping['m_SyncedLayerAffectsTiming'] = '0'
- layer.addChildMapping('m_Controller').mapping['fileID'] = anim.anchor
-
- # Create layer object
- layer = UnityDocument()
- layer.class_id = "1107"
- layer.anchor = str(new_id)
- mach = layer.addChildMapping('AnimatorStateMachine')
-
- mach.mapping['serializedVersion'] = '6'
-
- mach.mapping['m_ObjectHideFlags'] = '1'
- mach.addChildMapping('m_CorrespondingSourceObject').mapping['fileID'] = '0'
- mach.addChildMapping('m_PrefabInstance').mapping['fileID'] = '0'
- mach.addChildMapping('m_PrefabAsset').mapping['fileID'] = '0'
- mach.mapping['m_Name'] = layer_name
- mach.addChildSequence('m_ChildStates')
- mach.addChildSequence('m_ChildStateMachines')
- mach.addChildSequence('m_AnyStateTransitions')
- mach.addChildSequence('m_EntryTransitions')
- mach.addChildMapping('m_StateMachineTransitions')
- mach.addChildSequence('m_StateMachineBehaviours')
- pos = mach.addChildMapping('m_AnyStatePosition')
- pos.mapping['x'] = '50'
- pos.mapping['y'] = '20'
- pos.mapping['z'] = '0'
- pos = mach.addChildMapping('m_EntryPosition')
- pos.mapping['x'] = '50'
- pos.mapping['y'] = '120'
- pos.mapping['z'] = '0'
- pos = mach.addChildMapping('m_ExitPosition')
- pos.mapping['x'] = '800'
- pos.mapping['y'] = '120'
- pos.mapping['z'] = '0'
- pos = mach.addChildMapping('m_ParentStateMachinePosition')
- pos.mapping['x'] = '800'
- pos.mapping['y'] = '20'
- pos.mapping['z'] = '0'
- mach.addChildMapping('m_DefaultState')
-
- self.nodes.append(layer)
- return layer
-
- def addAnimatorState(self, layer, state_name, is_default_state = False,
- dx = 0, dy = 0) -> UnityDocument:
- # Create animation state
- parser = UnityParser()
- parser.parse(ANIMATION_STATE_TEMPLATE)
- new_anim = UnityAnimator()
- new_anim.addNodes(parser.nodes)
- node = new_anim.nodes[0]
-
- new_id = self.allocateId()
- node.class_id = "1102"
- node.anchor = str(new_id)
- state = node.mapping['AnimatorState']
- state.mapping['m_Name'] = state_name
- #state.mapping['m_Motion'].mapping['guid'] = anim_guid
- self.nodes.append(node)
-
- # Add state to layer
- child_state = layer.mapping['AnimatorStateMachine'].mapping['m_ChildStates'].addChildMapping()
- child_state.mapping['serializedVersion'] = '1'
- child_state.addChildMapping('m_State').mapping['fileID'] = str(new_id)
- state_pos = child_state.addChildMapping('m_Position')
- state_pos.mapping['x'] = str(280 + dx)
- state_pos.mapping['y'] = str(80 + dy)
- state_pos.mapping['z'] = '0'
-
- if is_default_state:
- layer.mapping['AnimatorStateMachine'].mapping['m_DefaultState'].mapping['fileID'] = str(new_id)
-
- return node
-
- def setAnimatorStateAnimation(self, anim_state, anim_guid):
- anim_state.mapping['AnimatorState'].mapping['m_Motion'].mapping['guid'] = anim_guid
- anim_state.mapping['AnimatorState'].mapping['m_Motion'].mapping['fileID'] = '7400000'
- anim_state.mapping['AnimatorState'].mapping['m_Motion'].mapping['type'] = '2'
-
- # Adds a blend tree which uses the parameter named `param_name` to blend
- # between anim_lo and anim_hi. Also creates the corresponding animation
- # state.
- def addAnimatorBlendTree(self, layer, state_name, param_name,
- anim_guid_lo, anim_guid_hi, dx = 0, dy = 0,
- lo_threshold = -1.0, hi_threshold = 1.0,
- is_default_state = False) -> UnityDocument:
- # Create the blend tree.
- parser = UnityParser()
- parser.parse(BLEND_TREE_TEMPLATE)
- new_anim = UnityAnimator()
- new_anim.addNodes(parser.nodes)
- node = new_anim.nodes[0]
-
- new_id = self.allocateId()
- node.class_id = "206"
- node.anchor = str(new_id)
- tree = node.mapping['BlendTree']
- tree.mapping['m_Name'] = state_name
- # Low animation
- tree.mapping['m_Childs'].sequence[0].mapping['m_Motion'].mapping['guid'] = anim_guid_lo
- tree.mapping['m_Childs'].sequence[0].mapping['m_DirectBlendParameter'] = param_name
- tree.mapping['m_Childs'].sequence[0].mapping['m_Threshold'] = str(lo_threshold)
- # High animation
- tree.mapping['m_Childs'].sequence[1].mapping['m_Motion'].mapping['guid'] = anim_guid_hi
- tree.mapping['m_Childs'].sequence[1].mapping['m_DirectBlendParameter'] = param_name
- tree.mapping['m_Childs'].sequence[1].mapping['m_Threshold'] = str(hi_threshold)
-
- tree.mapping['m_BlendParameter'] = param_name
- tree.mapping['m_BlendParameterY'] = param_name
-
- self.nodes.append(node)
-
- # Create the corresponding animation state.
- anim_state = self.addAnimatorState(layer, state_name, is_default_state, dx = dx, dy =
- dy)
- anim_state.mapping['AnimatorState'].mapping['m_Motion'].mapping['fileID'] = node.anchor
-
- return anim_state
-
- def addTransition(self, dst_state, dur_s = 0.0):
- # Create animation state
- parser = UnityParser()
- parser.parse(TRANSITION_TEMPLATE)
- new_transition = UnityAnimator()
- new_transition.addNodes(parser.nodes)
- node = new_transition.nodes[0]
-
- new_id = self.allocateId()
- node.class_id = "1101"
- node.anchor = str(new_id)
- state = node.mapping['AnimatorStateTransition']
- state.mapping['m_DstState'].mapping['fileID'] = copy.copy(dst_state.anchor)
- state.mapping['m_TransitionDuration'] = dur_s
- self.nodes.append(node)
-
- return node
-
- def fixWriteDefaults(self, guid_map, generated_anim_path):
- # TODO(yum) we should have an Animation class which encapsulates all
- # this stuff.
- parser = UnityParser()
- parser.parse(WRITE_DEFAULTS_ANIM_TEMPLATE)
- new_anim = UnityAnimator()
- new_anim.addNodes(parser.nodes)
-
- new_clip = new_anim.peekNodeOfClass('74').mapping['AnimationClip']
- curve_template = new_clip.mapping['m_FloatCurves'].sequence[0]
- new_clip.mapping['m_FloatCurves'].sequence = []
- new_clip.mapping['m_EditorCurves'].sequence = []
-
- # Keep track of the (attribute, path) tuples we've already set to avoid
- # animating the same thing twice.
- attributes_set = set()
-
- animator_state_id = '1102'
- for node in self.nodes:
- if node.class_id != animator_state_id:
- continue
-
- # Looking at an animator state.
- if node.mapping['AnimatorState'].mapping['m_WriteDefaultValues'] != '1':
- continue
-
- # Disable write defaults.
- node.mapping['AnimatorState'].mapping['m_WriteDefaultValues'] = '0'
-
- # Looking at an animator state with write defaults.
- motion = node.mapping['AnimatorState'].mapping['m_Motion']
- # Some animations have write defaults but don't trigger an
- # animation. No idea what that's about. For now, just ignore.
- if not 'guid' in motion.mapping:
- continue
- guid = motion.mapping['guid']
-
- # Again, not really sure what's going on here, just ignore and
- # revisit if we hit problems.
- if not guid in guid_map.keys():
- continue
-
- # OK, we found an animation with write defaults, and we know where
- # the animation lives. Crack it open and see what it's writing.
- animation_path = guid_map[guid]
- print("Animation has write defaults: {}".format(animation_path), file=sys.stderr)
- parser = UnityParser()
- parser.parseFile(animation_path)
- anim = UnityAnimator()
- anim.addNodes(parser.nodes)
-
- clip = anim.peekNodeOfClass('74')
-
- for curve in clip.mapping['AnimationClip'].mapping['m_FloatCurves'].sequence:
- attr = curve.mapping['attribute']
- path = curve.mapping['path']
- if (attr, path) in attributes_set:
- continue
- #print("Fix attr/path {}/{}".format(attr, path), file=sys.stderr)
- attributes_set.add((attr, path))
-
- new_curve = curve_template.copy()
- new_curve.mapping['attribute'] = attr
- new_curve.mapping['path'] = path
-
- new_clip.mapping['m_FloatCurves'].sequence.append(new_curve)
- new_clip.mapping['m_EditorCurves'].sequence.append(new_curve)
-
- #print("len float curves: {}".format(len(new_clip.mapping['m_FloatCurves'].sequence)), file=sys.stderr)
-
- def generateOffAnimationForGuid(self, guid_map, generated_anim_dir, guid):
- # Looking at an animation.
- if not guid in guid_map.keys():
- return
-
- animation_path = guid_map[guid]
- print("Checking animation at {}".format(animation_path), file=sys.stderr)
- parser = UnityParser()
- parser.parseFile(animation_path)
- anim = UnityAnimator()
- anim.addNodes(parser.nodes)
-
- clip = anim.peekNodeOfClass('74')
-
- has_nonzero = False
- curve_members = ["m_FloatCurves", "m_EditorCurves"]
- for memb in curve_members:
- for curve in clip.mapping['AnimationClip'].mapping[memb].sequence:
- attr = curve.mapping['attribute']
- path = curve.mapping['path']
-
- for m_curve in curve.mapping['curve'].mapping['m_Curve'].sequence:
- if m_curve.mapping['value'] != '0':
- has_nonzero = True
- m_curve.mapping['value'] = '0'
-
- if not has_nonzero:
- print("Animation does not set anything nonzero")
- return
-
- print("Animation sets things nonzero, fixing")
-
- new_anim_path = "OFF_{}".format(os.path.basename(animation_path))
- new_anim_path = "{}/{}".format(generated_anim_dir, new_anim_path)
-
- with open(new_anim_path, "w", encoding="utf-8") as f:
- f.write(str(anim))
-
- meta = Metadata()
- with open(new_anim_path + ".meta", "w", encoding="utf-8") as f:
- f.write(str(meta))
-
- def generateOffAnimationsAnimStates(self, guid_map, generated_anim_dir):
- animator_state_id = '1102'
- for node in self.nodes:
- if node.class_id != animator_state_id:
- continue
-
- # Looking at an animation state.
- motion = node.mapping['AnimatorState'].mapping['m_Motion']
- if not 'guid' in motion.mapping:
- continue
- guid = motion.mapping['guid']
- self.generateOffAnimationForGuid(guid_map, generated_anim_dir, guid)
-
-
- def generateOffAnimationsBlendTrees(self, guid_map, generated_anim_dir):
- animator_state_id = '206'
- for node in self.nodes:
- if node.class_id != animator_state_id:
- continue
-
- # Looking at an animation state.
- for child in node.mapping['BlendTree'].mapping['m_Childs'].sequence:
- motion = child.mapping['m_Motion']
-
- if not 'guid' in motion.mapping:
- continue
- guid = motion.mapping['guid']
- self.generateOffAnimationForGuid(guid_map, generated_anim_dir, guid)
-
- def generateOffAnimations(self, guid_map, generated_anim_dir):
- self.generateOffAnimationsAnimStates(guid_map, generated_anim_dir)
- self.generateOffAnimationsBlendTrees(guid_map, generated_anim_dir)
-
- def addTransitionBooleanCondition(self, from_state, trans, param, branch):
- # Populate the transition's condition logic.
- cond = trans.mapping['AnimatorStateTransition'].mapping['m_Conditions'].addChildMapping()
- if branch:
- cond.mapping['m_ConditionMode'] = '1'
- else:
- cond.mapping['m_ConditionMode'] = '2'
- cond.mapping['m_ConditionEvent'] = param
- cond.mapping['m_EventThreshold'] = '0'
- # Register the transition with the `from_state`.
- if from_state:
- from_state_trans = from_state.mapping['AnimatorState'].mapping['m_Transitions'].addChildMapping()
- from_state_trans.mapping['fileID'] = copy.copy(trans.anchor)
-
- def addTransitionIntegerEqualityCondition(self, from_state, trans, param, param_val):
- # Populate the transition's condition logic.
- cond = trans.mapping['AnimatorStateTransition'].mapping['m_Conditions'].addChildMapping()
- cond.mapping['m_ConditionMode'] = '6'
- cond.mapping['m_ConditionEvent'] = param
- # Curiously, the typo ("treshold" only has 1 'h') is needed for this to
- # work, but not for boolean conditions to work.
- cond.mapping['m_EventTreshold'] = str(param_val)
- # Register the transition with the `from_state`.
- if from_state:
- from_state_trans = from_state.mapping['AnimatorState'].mapping['m_Transitions'].addChildMapping()
- from_state_trans.mapping['fileID'] = trans.anchor
-
- def addTransitionIntegerGreaterCondition(self, from_state, trans, param, param_val):
- # Populate the transition's condition logic.
- cond = trans.mapping['AnimatorStateTransition'].mapping['m_Conditions'].addChildMapping()
- cond.mapping['m_ConditionMode'] = '3'
- cond.mapping['m_ConditionEvent'] = param
- cond.mapping['m_EventThreshold'] = str(param_val)
- # Register the transition with the `from_state`.
- if from_state:
- from_state_trans = from_state.mapping['AnimatorState'].mapping['m_Transitions'].addChildMapping()
- from_state_trans.mapping['fileID'] = trans.anchor
-
- # TODO(yum) this should be factored out into generate_fx.py
- def addTasttToggle(self, off_anim_path, on_anim_path, toggle_param,
- guid_map):
- self.addParameter(toggle_param, bool)
-
- off_anim_meta = Metadata()
- off_anim_meta.loadOrCreate(off_anim_path, guid_map)
-
- on_anim_meta = Metadata()
- on_anim_meta.loadOrCreate(on_anim_path, guid_map)
-
- layer = self.addLayer('TaSTT_Toggle')
- off_anim = self.addAnimatorState(layer, 'TaSTT_Toggle_Off', is_default_state = True)
- self.setAnimatorStateAnimation(off_anim, off_anim_meta.guid)
- on_anim = self.addAnimatorState(layer, 'TaSTT_Toggle_On')
- self.setAnimatorStateAnimation(on_anim, on_anim_meta.guid)
-
- # TODO(yum) make a Transition class with methods for adding boolean
- # conditions
- off_to_on = self.addTransition(on_anim)
- self.addTransitionBooleanCondition(off_anim, off_to_on, toggle_param, True)
-
- on_to_off = self.addTransition(off_anim)
- self.addTransitionBooleanCondition(on_anim, on_to_off, toggle_param, False)
-
- def setNoopAnimations(self, guid_map, noop_anim_path):
- noop_anim_meta = Metadata()
- noop_anim_meta.loadOrCreate(noop_anim_path, guid_map)
-
- for node in self.nodes:
- if node.class_id != "1102":
- continue
- motion = node.mapping['AnimatorState'].mapping['m_Motion']
- replace = False
-
- name = node.mapping['AnimatorState'].mapping['m_Name']
- anchor = node.anchor
-
- # As of 8 May 2023, idle states look like this:
- # m_Motion: {fileID: 7400000, guid: e5881c5b0c09be854b0fd6fd8144333f, type: 2}
- # Before that, they looked like this:
- # m_Motion: {fileID: 0}
- # The first predicate looks for the new pattern.
- # The second predicate looks for the second pattern.
- if "fileID" in motion.mapping.keys() and \
- "guid" in motion.mapping.keys() and \
- not motion.mapping["guid"] in guid_map:
- motion.mapping["fileID"] = "7400000"
- print(f"Set noop animation to guid {noop_anim_meta.guid} in state {node.anchor}")
- motion.mapping["guid"] = noop_anim_meta.guid
- motion.mapping["type"] = "2"
- elif not ("fileID" in motion.mapping.keys() and
- motion.mapping["fileID"] != "0") and not ("guid" in
- motion.mapping.keys() and motion.mapping["guid"] in
- guid_map):
- motion.mapping["fileID"] = "7400000"
- print(f"Set noop animation to guid {noop_anim_meta.guid} in state {node.anchor}")
- motion.mapping["guid"] = noop_anim_meta.guid
- motion.mapping["type"] = "2"
- else:
- #print(f"Skipping state {anchor} / {name}")
- pass
-
-def unityYamlToString(nodes):
- lines = []
- preamble = """
-%YAML 1.1
-%TAG !u! tag:unity3d.com,2011:
-"""[1:][:-1]
- if len(nodes) > 1 or (len(nodes) == 1 and nodes[0].anchor):
- lines.append(preamble)
- for doc in nodes:
- if len(nodes) > 1 or (len(nodes) == 1 and nodes[0].anchor):
- lines.append("--- !u!" + doc.class_id + " &" + doc.anchor)
- lines.append(str(doc))
- result = '\n'.join(lines)
-
- for i in range(0,10):
- result = result.replace("\n\n", "\n")
-
- return result
-
-class UnityParser:
- STREAM_START = 100
- STREAM_END = 199
-
- DOCUMENT_START = 200
- DOCUMENT_END = 299
-
- MAPPING_START = 300
- MAPPING_KEY = 301
-
- SEQUENCE_VALUE = 400
-
- def __init__(self):
- self.state = self.STREAM_START
- self.cur_scalar = None
- self.cur_node = None
-
- # Simple list of parsed documents. Populated by parse().
- self.nodes = []
- self.prev_states = []
-
- def __str__(self):
- return unityYamlToString(self.nodes)
-
- def pushState(self, state):
- self.prev_states.append(self.state)
- self.state = state
- #print("state {} ({})".format(self.state, len(self.prev_states)))
-
- def popState(self):
- self.state = self.prev_states[-1]
- self.prev_states = self.prev_states[0:len(self.prev_states) - 1]
- #print("state {} ({})".format(self.state, len(self.prev_states)))
- return self.state
-
- def cleanYaml(self, yaml_str):
- lines = []
- first_document = True
- got_document = False
- for line in yaml_str.split("\n"):
- # Add end-of-document indicators.
- if line.startswith("---"):
- got_document = True
- if not first_document:
- lines.append("...\n")
- first_document = False
-
- # Remove class ID tag from each block.
- if line.startswith("---"):
- parts = line.split()
- lines.append(parts[0] + " " + parts[2] + "\n")
- continue
- lines.append(line)
-
- if got_document:
- lines.append("...\n")
- return '\n'.join(lines)
-
- def getClassIds(self, yaml_str):
- anchor_to_class_id = {}
- for line in yaml_str.split("\n"):
- if not line.startswith("---"):
- continue
-
- parts = line.split()
- class_id = parts[1][3:]
- anchor = parts[2][1:]
- anchor_to_class_id[anchor] = class_id
-
- return anchor_to_class_id
-
- def parseFile(self, yaml_file):
- yaml_str = ""
- with open(yaml_file, "r", encoding="utf-8") as f:
- yaml_str = f.read()
- return self.parse(yaml_str)
-
- def parse(self, yaml_str):
- anchor_to_class_id = self.getClassIds(yaml_str)
- yaml_str = self.cleanYaml(yaml_str)
-
- for event in yaml.parse(yaml_str):
- if isinstance(event, yaml.StreamStartEvent):
- if len(self.prev_states) > 0:
- raise Exception("Multiple StreamStartEvents received")
- self.pushState(self.STREAM_START)
-
- elif isinstance(event, yaml.StreamEndEvent):
- if self.state != self.STREAM_START:
- raise Exception("Document end received after state {}".format(self.state))
- self.popState()
- if len(self.prev_states) > 0:
- raise Exception("Extra states at stream end")
-
- elif isinstance(event, yaml.DocumentStartEvent):
- if self.state != self.STREAM_START and self.state != self.DOCUMENT_END:
- raise Exception("Document start received after state {}".format(self.state))
- self.pushState(self.DOCUMENT_START)
-
- elif isinstance(event, yaml.DocumentEndEvent):
- if self.state != self.DOCUMENT_START:
- raise Exception("Document end received after state {}".format(self.state))
- self.popState()
- self.nodes.append(self.cur_node)
- self.cur_node = None
-
- elif isinstance(event, yaml.MappingStartEvent):
- if self.cur_node == None:
- self.cur_node = UnityDocument()
- self.cur_node.anchor = event.anchor
- if event.anchor:
- self.cur_node.class_id = anchor_to_class_id[event.anchor]
- else:
- self.cur_node = self.cur_node.addChildMapping(self.cur_scalar)
- self.pushState(self.MAPPING_START)
-
- elif isinstance(event, yaml.MappingEndEvent):
- if self.state != self.MAPPING_START:
- raise Exception("Mapping end received after state {}".format(self.state))
- self.popState()
- if self.state == self.MAPPING_KEY:
- self.popState()
- if self.cur_node.parent != None:
- self.cur_node = self.cur_node.parent
-
- elif isinstance(event, yaml.SequenceStartEvent):
- self.cur_node = self.cur_node.addChildSequence(self.cur_scalar)
- self.pushState(self.SEQUENCE_VALUE)
-
- elif isinstance(event, yaml.SequenceEndEvent):
- if self.state != self.SEQUENCE_VALUE:
- raise Exception("Sequence end received after state {}".format(self.state))
- self.popState()
- if self.state == self.MAPPING_KEY:
- self.popState()
- self.cur_node = self.cur_node.parent
-
- elif isinstance(event, yaml.ScalarEvent):
- if self.state == self.MAPPING_START:
- self.cur_scalar = event.value
- self.pushState(self.MAPPING_KEY)
- elif self.state == self.MAPPING_KEY:
- self.cur_node.mapping[self.cur_scalar] = event.value
- self.popState()
- elif self.state == self.SEQUENCE_VALUE:
- self.cur_node.sequence.append(event.value)
- else:
- raise Exception("Scalar event received after state {}".format(self.state))
- else:
- raise Exception("Unhandled event {}".format(event))
- continue
-
-class MulticoreUnityParser:
- def parseFile(self, yaml_file):
- yaml_str = ""
- with open(yaml_file, "r", encoding="utf-8") as f:
- yaml_str = f.read()
- return self.parse(yaml_str)
-
- def parse(self, yaml_str):
- lines = []
- documents = []
- first = True
- n_lines = 0
- for line in yaml_str.split("\n"):
- n_lines += 1
- if line.startswith("---"):
- if not first:
- documents.append("\n".join(lines))
- lines = []
- first = False
- lines.append(line)
- if len(lines) > 0:
- documents.append("\n".join(lines))
- lines = []
- print("Got {} documents out of {} lines".format(len(documents), n_lines), file=sys.stderr)
-
- # Divide the work evenly among the # of CPUs we have available.
- n_threads = os.cpu_count()
- window_size = int(math.ceil(len(documents) / n_threads))
- merge_window = []
- merged_documents = []
- for i in range(0, len(documents)):
- if i > 0 and i % window_size == 0:
- merged_documents.append("\n".join(merge_window))
- merge_window = []
- merge_window.append(documents[i])
- if len(merge_window) > 0:
- merged_documents.append("\n".join(merge_window))
- merge_window = []
- documents = merged_documents
-
- mgr = mp.Manager()
-
- print("Spawning {} threads".format(len(documents)), file=sys.stderr)
- threads = []
- for document in documents:
- res = mgr.dict()
- thread = mp.Process(target = self.parseOneSerial, args = (document, res,))
- threads.append((thread, res))
- thread.start()
-
- print("Joining threads", file=sys.stderr)
- nodes = []
- for thread, res in threads:
- thread.join()
- nodes += res['nodes']
-
- print("Creating animator", file=sys.stderr)
- result = UnityAnimator()
- result.addNodes(nodes)
-
- return result
-
- def parseOneSerial(self, document, res):
- parser = UnityParser()
- parser.parse(document)
- res['nodes'] = parser.nodes
-
- def parseFile(self, yaml_file):
- yaml_str = ""
- with open(yaml_file, "r", encoding="utf-8") as f:
- yaml_str = f.read()
- return self.parse(yaml_str)
-
-def getGuidMap(d):
- result = {}
- for f in os.scandir(d):
- path = f.path
- if f.is_dir():
- result.update(getGuidMap(path))
- if not f.is_file():
- continue
- suffix = ".meta"
- if path.endswith(suffix):
- with open(path, "r", encoding="utf-8") as f:
- for line in f:
- if line.startswith("guid"):
- guid = line.split()[1]
- result[guid] = path[:-len(suffix)]
- return result
-
-if __name__ == "__main__":
- os.chdir(os.path.dirname(os.path.abspath(__file__)))
-
- parser = argparse.ArgumentParser()
- parser.add_argument("cmd", type=str, help="One of merge, guid_map, fix_write_defaults")
- parser.add_argument("--fx0", type=str, help="The first animator to merge")
- parser.add_argument("--fx1", type=str, help="The second animator to merge")
- parser.add_argument("--fx_dest", type=str, help="The path at which to " +
- "save the generated/merged animator")
- parser.add_argument("--project_root", type=str, help="The path to the " +
- "Unity project Assets folder")
- parser.add_argument("--save_to", type=str, help="The path to save the " +
- "result of the computation")
- parser.add_argument("--guid_map", type=str, help="Path to guid.map, " +
- "generated by a previous call to `guid_map`")
- parser.add_argument("--guid_map_append", type=bool, help="If set, " +
- "append to GUID map instead of overwriting.")
- parser.add_argument("--gen_anim_dir", type=str, help="The folder under which generated animations are stored")
- args = parser.parse_args()
-
- if args.cmd == "merge":
- if not args.fx0 or not args.fx1 or not args.fx_dest:
- print("--fx0, --fx1, and --fx_dest required", file=sys.stderr)
- parser.print_help()
- parser.exit(1)
-
- print("Parsing {}".format(args.fx0), file=sys.stderr)
- parser0 = MulticoreUnityParser()
- anim0 = parser0.parseFile(args.fx0)
-
- arg1 = "TaSTT_fx.controller"
- print("Parsing {}".format(args.fx1), file=sys.stderr)
- parser1 = MulticoreUnityParser()
- anim1 = parser1.parseFile(args.fx1)
-
- print("Merging animators", file=sys.stderr)
- anim0.merge(anim1)
-
- print("Serializing to {}".format(args.fx_dest), file=sys.stderr)
- with open(args.fx_dest, "w", encoding="utf-8") as f:
- f.write(unityYamlToString(anim0.nodes))
-
- elif args.cmd == "guid_map":
- if not args.project_root or not args.save_to:
- print("--project_root and --save_to required")
- parser.print_help()
- parser.exit(1)
-
- print("Looking up GUIDs under {}".format(args.project_root),
- file=sys.stderr)
- guid_map = getGuidMap(args.project_root)
-
- save_to_dir = os.path.dirname(args.save_to)
- os.makedirs(save_to_dir, exist_ok=True)
-
- if args.guid_map_append:
- tmp_map = {}
- with open(args.save_to, "rb") as f:
- tmp_map = pickle.load(f)
- # combine guid_map and tmp_map
- guid_map = {**guid_map, **tmp_map}
- print("Saving to {}".format(args.save_to), file=sys.stderr)
- with open(args.save_to, 'wb') as f:
- pickle.dump(guid_map, f)
- elif args.cmd == "fix_write_defaults":
- if not args.fx0 or not args.guid_map:
- print("--fx0 and --guid_map required")
- parser.print_help()
- parser.exit(1)
-
- guid_map = {}
- with open(args.guid_map, 'rb') as f:
- guid_map = pickle.load(f)
-
- print("Parsing {}".format(args.fx0), file=sys.stderr)
- parser0 = MulticoreUnityParser()
- anim = parser0.parseFile(args.fx0)
-
- print("Fixing write defaults", file=sys.stderr)
- anim_dir = "generated/animations/"
- os.makedirs(anim_dir, exist_ok=True)
- anim.fixWriteDefaults(guid_map, anim_dir + "TaSTT_Reset_Animation.anim")
- print(str(anim))
-
- elif args.cmd == "gen_off_anims":
- if not args.fx0 or not args.guid_map:
- print("--fx0 and --guid_map required")
- parser.print_help()
- parser.exit(1)
-
- guid_map = {}
- with open(args.guid_map, 'rb') as f:
- guid_map = pickle.load(f)
-
- print("Parsing {}".format(args.fx0), file=sys.stderr)
- parser0 = MulticoreUnityParser()
- anim = parser0.parseFile(args.fx0)
-
- print("Generating off animations", file=sys.stderr)
- anim_dir = "generated/animations/"
- os.makedirs(anim_dir, exist_ok=True)
- anim.generateOffAnimations(guid_map, "generated/animations")
-
- elif args.cmd == "add_toggle":
- if not args.fx0 or not args.fx_dest or not args.gen_anim_dir or not args.guid_map:
- print("--fx0, --fx_dest, --gen_anim_dir and --guid_map required")
- parser.print_help()
- parser.exit(1)
-
- guid_map = {}
- with open(args.guid_map, 'rb') as f:
- guid_map = pickle.load(f)
-
- print("Parsing {}".format(args.fx0), file=sys.stderr)
- parser0 = MulticoreUnityParser()
- anim = parser0.parseFile(args.fx0)
-
- print("Adding toggle", file=sys.stderr)
- anim.addTasttToggle(args.gen_anim_dir + "/TaSTT_Toggle_Off.anim",
- args.gen_anim_dir + "/TaSTT_Toggle_On.anim", "TaSTT_Toggle",
- guid_map)
-
- print("Serializing to {}".format(args.fx_dest), file=sys.stderr)
- with open(args.fx_dest, "w", encoding="utf-8") as f:
- f.write(str(anim))
-
- with open(args.guid_map, 'wb') as f:
- pickle.dump(guid_map, f)
-
- elif args.cmd == "fast_parse_test":
- if not args.fx0:
- print("--fx0 required")
- parser.print_help()
- parser.exit(1)
-
- print("Parsing {}".format(args.fx0), file=sys.stderr)
- parser0 = MulticoreUnityParser()
- anim = parser0.parseFile(args.fx0)
- print(str(anim))
-
- elif args.cmd == "set_noop_anim":
- if not args.fx0 or not args.fx_dest or not args.gen_anim_dir or not args.guid_map:
- print("--fx0, --fx_dest, --gen_anim_dir and --guid_map required")
- parser.print_help()
- parser.exit(1)
-
- guid_map = {}
- with open(args.guid_map, 'rb') as f:
- guid_map = pickle.load(f)
-
- print("Parsing {}".format(args.fx0), file=sys.stderr)
- parser = MulticoreUnityParser()
- anim = parser.parseFile(args.fx0)
-
- anim.setNoopAnimations(guid_map, args.gen_anim_dir + "/TaSTT_Do_Nothing.anim")
-
- with open(args.fx_dest, "w", encoding="utf-8") as f:
- f.write(str(anim))
-
- else:
- print("Unrecognized command: {}".format(args.cmd))
-
diff --git a/Scripts/obfuscate.py b/Scripts/obfuscate.py
deleted file mode 100644
index 8d01e10..0000000
--- a/Scripts/obfuscate.py
+++ /dev/null
@@ -1,92 +0,0 @@
-#!/usr/bin/env python3
-
-# This module is used to implement obfuscation of TaSTT network
-# speech data. At a high level, TaSTT is simply streaming N bits of
-# arbitrary data to a shader via VRChat's parameter sync mechanism.
-#
-# It would be trivial to mine this data for speech information, since
-# we're sending unicode (or ASCII) characters to peers.
-#
-# To raise the cost for the casual data collector, we can obfuscate
-# this data using a one-time pad in cipher-block chaining mode.
-#
-# Making things interesting, encrypted data will arrive at the Unity
-# animator, which processes them in 8 bit chunks. They are written
-# into contiguous blocks of the animator. Thus the shader can decrypt
-# the board by decrypting each block. This is thus stronger than
-# applying a one-time pad to each byte of the plaintext, since the
-# statistical distribution of individual letters is destroyed.
-# Obviously due to the lack of an initialization vector, the
-# distribution of phrases (blocks) is preserved.
-
-import math
-import os
-
-def genKey(n_bits = 128) -> bytearray:
- return os.urandom(int(n_bits / 8))
-
-def saveKey(filename: str, key: str):
- with open(filename, "wb") as f:
- f.write(key)
-
-def loadKey(filename: str) -> bytearray:
- with open(filename, "rb") as f:
- return f.read()
-
-# Apply a symmetric cypher to `data` using cypher-block chaining.
-def obfuscate(data: bytearray, key: bytearray) -> str:
- n_blocks = int(math.ceil(len(data) / len(key)))
- # This is a misnomer. A true IV would be randomized, but we can't
- # do that since the shader doesn't have access to it. We just use
- # this to implement the "chaining" aspect of CBC.
- iv = bytearray(b'\x00') * len(key)
- result = bytearray()
- for i in range(0, n_blocks):
- block_begin = i * len(key)
- block_end = (i + 1) * len(key)
- block_plain = data[block_begin:block_end]
- block_cypher = block_plain.copy()
- for i in range(0, len(block_cypher)):
- block_cypher[i] ^= iv[i]
- block_cypher[i] ^= key[i]
- result += block_cypher
- iv = block_cypher
- return result
-
-def deobfuscate(data: bytearray, key: bytearray) -> str:
- n_blocks = int(math.ceil(len(data) / len(key)))
- # This is a misnomer. A true IV would be randomized, but we can't
- # do that since the shader doesn't have access to it. We just use
- # this to implement the "chaining" aspect of CBC.
- iv = bytearray(b'\x00') * len(key)
- result = bytearray()
- for i in range(0, n_blocks):
- block_begin = i * len(key)
- block_end = (i + 1) * len(key)
- block_cypher = data[block_begin:block_end]
- block_plain = block_cypher.copy()
- for i in range(0, len(block_plain)):
- block_plain[i] ^= key[i]
- block_plain[i] ^= iv[i]
- result += block_plain
- iv = block_cypher
- return result
-
-def test():
- key = genKey()
- saveKey("test.key", key)
- new_key = loadKey("test.key")
- os.remove("test.key")
- assert(key == new_key)
-
- plaintext_original = "Lorem ipsum dolor sit amet, consectetur adipiscing elit."
- plaintext_bytes = bytearray(plaintext_original, "utf-8")
- cyphertext = obfuscate(plaintext_bytes, key)
- assert(len(plaintext_bytes) == len(cyphertext))
- plaintext_recovered = deobfuscate(cyphertext, key).decode("utf-8")
- assert(plaintext_original == plaintext_recovered)
- assert(plaintext_bytes != cyphertext)
-
-if __name__ == "__main__":
- test()
-
diff --git a/Scripts/osc_ctrl.py b/Scripts/osc_ctrl.py
deleted file mode 100644
index c077b2b..0000000
--- a/Scripts/osc_ctrl.py
+++ /dev/null
@@ -1,185 +0,0 @@
-#!/usr/bin/env python3
-
-from emotes_v2 import EmotesState
-from generate_utils import config
-from math import ceil
-from paging import MultiLinePager
-from pythonosc import udp_client
-
-import argparse
-import generate_utils
-import random
-import time
-
-# 5 Hz usually works, but 3 Hz is more reliable in busy lobbies. Feel free to
-# dial this up if you want faster paging, but know that it might break for
-# remote users.
-SYNC_FREQ_HZ = 3.0
-SYNC_DELAY_S = 1.0 / SYNC_FREQ_HZ
-
-def getClient(ip = "127.0.0.1", port = 9000):
- return udp_client.SimpleUDPClient(ip, port)
-
-# The characters in the TaSTT are all numbered from top left to bottom right.
-# This function provides a mapping from letter ('a') to index (26).
-def generateEncoding():
- encoding = {}
- for i in range(0, 65535):
- encoding[chr(i)] = (i % 256, int(i / 256))
- return encoding
-
-class OscState:
- def __init__(self, chars_per_sync: int, rows: int, cols: int,
- bytes_per_char: int,
- ip = "127.0.0.1", port = 9000):
- self.client = getClient(ip, port)
- self.pager = MultiLinePager(chars_per_sync, rows, cols)
- self.encoding= generateEncoding()
- self.bytes_per_char = bytes_per_char
- self.client.bytes_per_char = bytes_per_char
- self.builtin_msg = "" # The last message sent to the built-in chatbox
-
- def reset(self):
- self.pager.reset()
-
-def encodeMessage(encoding, msg):
- encoded = []
- for char in msg:
- encoded.append(encoding[char])
- return encoded
-
-def lockWorld(client, lock: bool):
- addr = "/avatar/parameters/" + generate_utils.getLockWorldParam()
- client.send_message(addr, lock)
-
-def toggleBoard(client, show: bool):
- addr = "/avatar/parameters/" + generate_utils.getToggleParam()
- client.send_message(addr, show)
-
-def enable(client):
- addr="/avatar/parameters/" + generate_utils.getEnableParam()
- client.send_message(addr, True)
-
-def disable(client):
- addr="/avatar/parameters/" + generate_utils.getEnableParam()
- client.send_message(addr, False)
-
-def ellipsis(client, enable: bool):
- addr="/avatar/parameters/" + generate_utils.getEllipsisParam()
- client.send_message(addr, enable)
-
-def clear(osc_state: OscState):
- disable(osc_state.client)
-
- addr="/avatar/parameters/" + generate_utils.getClearBoardParam()
- osc_state.client.send_message(addr, True)
-
- time.sleep(SYNC_DELAY_S)
-
- addr="/avatar/parameters/" + generate_utils.getClearBoardParam()
- osc_state.client.send_message(addr, False)
-
- osc_state.reset()
-
-# Note: `nth_audio` is 1-indexed
-def playAudio(osc_state: OscState, nth_audio: int, value: bool):
- addr="/avatar/parameters/" + generate_utils.getSoundParam(nth_audio)
- osc_state.client.send_message(addr, value)
-
-def updateRegion(client, region_idx, letter_encoded):
- for byte in range(0, client.bytes_per_char):
- addr="/avatar/parameters/" + generate_utils.getBlendParam(region_idx, byte)
- letter_remapped = (-127.5 + letter_encoded[byte]) / 127.5
- client.send_message(addr, letter_remapped)
-
-# Sends one slice of `msg` to the board then returns. Slices are sent
-# in FIFO order; e.g., the most recently spoken words are sent last.
-# Returns True if done paging, False otherwise.
-def pageMessage(cfg, osc_state: OscState, msg: str, estate: EmotesState) -> bool:
- msg = estate.encode_emotes(msg)
-
- msg_slice, slice_idx = osc_state.pager.getNextSlice(msg)
- if slice_idx == -1:
- for i in range(5):
- playAudio(osc_state, i+1, False)
- return True
-
- sounds_to_make = set()
- letter_i = 1
- for letter in ["a", "e", "i", "o", "u"]:
- if letter in msg_slice.lower():
- sounds_to_make.add(letter_i)
- letter_i += 1
- if len(sounds_to_make) > 0:
- for i in range(5):
- if i+1 in sounds_to_make and random.randint(1,3) != 1:
- playAudio(osc_state, i+1, True)
- else:
- playAudio(osc_state, i+1, False)
-
- #print("sending page {}: {} ({})".format(slice_idx, msg_slice,
- # len(msg_slice)))
-
- # Really long messages just wrap back around.
-
- # if in last region:
- # how long is it
- num_cells = cfg["rows"] * cfg["cols"]
- num_regions = ceil(num_cells / cfg["chars_per_sync"])
- which_region = slice_idx % num_regions
- #print(f"which_region: {which_region}")
- #print(f"num_regions: {num_regions}")
- #print("num regions: {}".format(num_regions))
- if which_region == num_regions - 1:
- layers_in_last_region = num_cells % cfg["chars_per_sync"]
- #print(f"layers in last region: {layers_in_last_region}")
- if layers_in_last_region == 0:
- layers_in_last_region = cfg["chars_per_sync"]
- #print("layers in last region: {}".format(layers_in_last_region))
- old_len = len(msg_slice)
- msg_slice = msg_slice[0:layers_in_last_region]
- #print("truncate msg_slice from length {} to length {}".format(old_len,
- # len(msg_slice)))
-
- #print("send \"{}\" to region {}".format(msg_slice, which_region))
-
- enable(osc_state.client)
-
- # Seek to the current region.
- addr="/avatar/parameters/" + generate_utils.getSelectParam()
- osc_state.client.send_message(addr, which_region)
-
- # Update each letter.
- encoded = encodeMessage(osc_state.encoding, msg_slice)
- #print("len encoded: {}".format(len(encoded)))
- for i in range(0, len(encoded)):
- updateRegion(osc_state.client, i, encoded[i])
-
- ellipsis(osc_state.client, False)
-
-# Like `pageMessage` but uses the built-in chatbox. The built-in chatbox
-# truncates data at about 150 chars, so just send the suffix of the message for
-# now.
-def pageMessageBuiltin(cfg, osc_state: OscState, msg: str) -> bool:
- if len(msg) == 0 or msg.isspace():
- return False # Not paging
-
- msg_begin = max(len(msg) - 140, 0)
- msg_suffix = msg[msg_begin:len(msg)]
-
- if osc_state.builtin_msg != msg:
- addr="/chatbox/typing"
- osc_state.client.send_message(addr, False)
-
- addr="/chatbox/input"
- osc_state.client.send_message(addr, (msg_suffix, True))
- osc_state.builtin_msg = msg
-
- return False # Not paging
-
-if __name__ == "__main__":
- parser = argparse.ArgumentParser()
- parser.add_argument("-i", default="127.0.0.1", help="OSC server IP")
- parser.add_argument("-p", type=int, default=9000, help="OSC server port")
- args = parser.parse_args()
-
diff --git a/Scripts/paging.py b/Scripts/paging.py
deleted file mode 100644
index c8ba8c3..0000000
--- a/Scripts/paging.py
+++ /dev/null
@@ -1,128 +0,0 @@
-#!/usr/bin/env python3
-
-from math import ceil
-from text_wrapping import TextWrapper
-
-def getSlice(msg: str, idx: int, slice_len: int) -> str:
- begin = idx * slice_len
- end = (idx + 1) * slice_len
- msg_len = len(msg)
- if msg_len >= end:
- return msg[begin:end]
- if msg_len > begin:
- return msg[begin:end] + (" " * (end - msg_len))
- return None
-
-def setSlice(msg: str, idx: int, slice_len: int, msg_slice: str,
- include_suffix: bool = True) -> str:
- begin = idx * slice_len
- end = (idx + 1) * slice_len
- prefix = msg[0:begin]
- prefix += " " * (begin - len(prefix))
- suffix = msg[end:]
- msg = prefix + msg_slice
- if include_suffix:
- msg += suffix
- return msg
-
-class SingleLinePager:
- def __init__(self, slice_len: int):
- self.msg = ""
- self.slice_len = slice_len
-
- def reset(self):
- self.msg = ""
-
- def getNextSlice(self, msg) -> tuple[str, int]:
- for i in range(0, ceil(len(msg) / self.slice_len)):
- old_slice = getSlice(self.msg, i, self.slice_len)
- new_slice = getSlice(msg, i, self.slice_len)
- if old_slice != new_slice:
- self.msg = setSlice(self.msg, i, self.slice_len, new_slice)
- return new_slice, i
- return "", -1
-
-class MultiLinePager:
- def __init__(self, slice_len: int, rows: int, cols: int):
- # This is a list of lists of SingleLinePagers.
- # It represents a list of pages, each containing a list of lines.
- self.pages = []
- self.slice_len = slice_len
- self.rows = rows
- self.cols = cols
-
- def reset(self):
- self.pages = []
-
- def getNextSlice(self, msg) -> tuple[str, int]:
- pages = TextWrapper(self.rows, self.cols).wrap(msg)
-
- # Wrapping split the input message along line boundaries and along page
- # boundaries. However, we're going to treat each page like a single
- # line, so that `slice_idx` can be used as a region index. Therefore,
- # we need exactly one SingleLinePager per page.
- for pi in range(len(self.pages), len(pages)):
- self.pages.append(SingleLinePager(self.slice_len))
-
- for pi in range(0, len(pages)):
- line = "".join(pages[pi])
- pager = self.pages[pi]
- msg_slice, slice_idx = pager.getNextSlice(line)
- if slice_idx != -1:
- # Reset every page after this. This guarantees that any text
- # written in this operation will eventually be redrawn.
- for pj in range(pi + 1, len(pages)):
- self.pages[pj].reset()
- return msg_slice, slice_idx
- return "", -1
-
-if __name__ == "__main__":
- assert(getSlice("abcdefghij", 0, 1) == "a")
- assert(getSlice("abcdefghij", 9, 1) == "j")
- assert(getSlice("abcdefghij", 0, 2) == "ab")
- assert(getSlice("abcdefghij", 1, 2) == "cd")
- assert(getSlice("abcdefghij", 3, 3) == "j ")
- assert(getSlice("abcdefghij", 10, 1) == None)
- assert(getSlice("abcdefghij", 11, 1) == None)
-
- assert(setSlice("abcdefghij", 1, 2, "kl") == "abklefghij")
- assert(setSlice("abc", 1, 2, "de") == "abde")
- assert(setSlice("abc", 0, 2, "de") == "dec")
-
- slice_len = 2
- p = SingleLinePager(slice_len)
- p.msg = "test"
- assert(p.getNextSlice("test")[0] == "")
- assert(p.getNextSlice("tast")[0] == "ta")
- assert(p.getNextSlice("tast")[0] == "")
-
- p.msg = ""
- assert(p.getNextSlice("test")[0] == "te")
- assert(p.msg == "te")
- assert(p.getNextSlice("test")[0] == "st")
- assert(p.msg == "test")
- assert(p.getNextSlice("test")[0] == "")
- assert(p.msg == "test")
- assert(p.getNextSlice("tests")[0] == "s ")
-
- slice_len = 2
- rows = 2
- cols = 4
- p = MultiLinePager(slice_len, rows, cols)
- assert(p.getNextSlice("")[0] == "")
- assert(p.getNextSlice("yo")[0] == "yo")
- assert(p.getNextSlice("yogi")[0] == "gi")
- assert(p.getNextSlice("yugi")[0] == "yu")
- assert(p.getNextSlice("yugi is a")[0] == "is")
- assert(p.getNextSlice("yugi is a")[0] == " a")
- assert(p.getNextSlice("yugi is a pussy")[0] == "pu")
- assert(p.getNextSlice("yugi is a pussy")[0] == "s-")
- assert(p.getNextSlice("yugi is a pussy")[0] == "sy")
-
- p = MultiLinePager(slice_len, rows, cols)
- assert(p.getNextSlice("yo")[0] == "yo")
- assert(p.getNextSlice("yo")[0] == " ")
- assert(p.getNextSlice("yo")[0] == " ")
- assert(p.getNextSlice("yo")[0] == " ")
- assert(p.getNextSlice("yo")[0] == "")
-
diff --git a/Scripts/profanity_filter.py b/Scripts/profanity_filter.py
deleted file mode 100644
index b8c84ed..0000000
--- a/Scripts/profanity_filter.py
+++ /dev/null
@@ -1,43 +0,0 @@
-#!/usr/bin/env python3
-
-class ProfanityFilter:
- def __init__(self, en_path: str):
- self.en_path = en_path
- self.en_profanity = set()
-
- def load(self):
- with open(self.en_path, 'r') as f:
- for line in f:
- self.en_profanity.add(line.strip())
-
- def filter(self, line: str, language_code: str = "en") -> str:
- filtered = ""
-
- if language_code not in {"en"}:
- raise ValueError(f"Language code \"{language_code}\" is " +
- "unsupported by the profanity filter")
-
- # Translation table converting vowels to asterisks.
- vowel_to_asterisk = str.maketrans('aeiouAEIOU', '**********')
-
- result = []
- for word in line.split():
- word_clean = word.lower()
- # Filter out non-alphabet characters from the word.
- word_clean = ''.join([char for char in word_clean if char.isalpha()])
- if word_clean in self.en_profanity:
- result.append(word.translate(vowel_to_asterisk))
- else:
- result.append(word)
-
- return " ".join(result)
-
-if __name__ == "__main__":
- en_path = "/mnt/d/vrc/TaSTT/GUI/Profanity/Profanity/en"
- p = ProfanityFilter(en_path)
- p.load()
- assert(p.filter("fuck") == "f*ck")
- assert(p.filter("fuck!") == "f*ck!")
- assert(p.filter("fuck shit") == "f*ck sh*t")
- assert(p.filter("fuck shit this should not be filtered") == "f*ck sh*t this should not be filtered")
- assert(p.filter("ASS") == "*SS")
diff --git a/Scripts/remove_audio_sources.py b/Scripts/remove_audio_sources.py
deleted file mode 100644
index 0486169..0000000
--- a/Scripts/remove_audio_sources.py
+++ /dev/null
@@ -1,25 +0,0 @@
-import argparse
-import libunity
-import sys
-
-def removeAudioSources(path: str):
- parser = libunity.MulticoreUnityParser()
- anim = parser.parseFile(path)
- anchors = set()
- node = anim.popNodeOfClass("82")
- while node:
- print("Killed audio source")
- anchors.add(node.anchor)
- node = anim.popNodeOfClass("82")
- for node in anim.nodes:
- anim.scrubReferencesByValue(node, values=anchors)
- with open(path, "w", encoding="utf-8") as f:
- f.write(libunity.unityYamlToString(anim.nodes))
-
-if __name__ == "__main__":
- parser = argparse.ArgumentParser()
- parser.add_argument("--prefab", type=str, help="Path to .prefab file.")
- args = parser.parse_args()
-
- removeAudioSources(args.prefab)
-
diff --git a/Scripts/requirements.txt b/Scripts/requirements.txt
deleted file mode 100644
index 41c581c..0000000
--- a/Scripts/requirements.txt
+++ /dev/null
@@ -1,18 +0,0 @@
-ctranslate2==4.5.0
-editdistance
-faster-whisper@https://github.com/guillaumekln/faster-whisper/archive/53bbe5401683c9a7549db62642e3d4535956b95c.tar.gz
-future==0.18.2
-huggingface_hub==0.16.4
-keyboard
-langcodes
-language-data
-openvr
-pillow
-pyaudio
-pydub
-python-osc
-pyyaml
-sentence_splitter
-transformers>=4.21.0
-wget
-
diff --git a/Scripts/requirements_frozen.txt b/Scripts/requirements_frozen.txt
deleted file mode 100644
index 9e6a6ab..0000000
--- a/Scripts/requirements_frozen.txt
+++ /dev/null
@@ -1,42 +0,0 @@
-av==13.1.0
-certifi==2024.8.30
-charset-normalizer==3.4.0
-colorama==0.4.6
-coloredlogs==15.0.1
-ctranslate2==4.5.0
-editdistance==0.8.1
-faster-whisper @ https://github.com/guillaumekln/faster-whisper/archive/53bbe5401683c9a7549db62642e3d4535956b95c.tar.gz#sha256=17b49d15a58e18d78b4639af59bd35da12bc0bf3bb73c9af4ad48891dd6793f7
-filelock==3.16.1
-flatbuffers==24.3.25
-fsspec==2024.10.0
-future==0.18.2
-huggingface-hub==0.16.4
-humanfriendly==10.0
-idna==3.10
-keyboard==0.13.5
-langcodes==3.4.1
-language_data==1.2.0
-marisa-trie==1.2.1
-mpmath==1.3.0
-numpy==2.1.3
-onnxruntime==1.20.0
-openvr==2.5.101
-packaging==24.2
-pillow==11.0.0
-protobuf==5.28.3
-PyAudio==0.2.14
-pydub==0.25.1
-pyreadline3==3.5.4
-python-osc==1.9.0
-PyYAML==6.0.2
-regex==2024.11.6
-requests==2.32.3
-safetensors==0.4.5
-sentence-splitter==1.4
-sympy==1.13.3
-tokenizers==0.15.2
-tqdm==4.67.0
-transformers==4.35.2
-typing_extensions==4.12.2
-urllib3==2.2.3
-wget==3.2
diff --git a/Scripts/set_texture_sz.py b/Scripts/set_texture_sz.py
deleted file mode 100644
index f6fbb45..0000000
--- a/Scripts/set_texture_sz.py
+++ /dev/null
@@ -1,24 +0,0 @@
-import argparse
-import libunity
-import sys
-
-def setTextureSize(path: str, size: int):
- parser = libunity.MulticoreUnityParser()
- anim = parser.parseFile(path)
-
- node = anim.nodes[0]
- node.mapping['TextureImporter'].mapping['maxTextureSize'] = size
- for plat in node.mapping['TextureImporter'].mapping['platformSettings'].sequence:
- plat.mapping['maxTextureSize'] = size
-
- with open(path, "w", encoding="utf-8") as f:
- f.write(libunity.unityYamlToString(anim.nodes))
-
-if __name__ == "__main__":
- parser = argparse.ArgumentParser()
- parser.add_argument("--meta", type=str, help="Path to texture .meta file.")
- parser.add_argument("--size", type=int, help="Texture size.")
- args = parser.parse_args()
-
- setTextureSize(args.meta, args.size)
-
diff --git a/Scripts/steamvr.py b/Scripts/steamvr.py
deleted file mode 100644
index 3e6c6c9..0000000
--- a/Scripts/steamvr.py
+++ /dev/null
@@ -1,89 +0,0 @@
-#!/usr/bin/env python3
-
-# python3 -m pip install openvr
-# License: BSD-3.0 (requires showing notice in binary distributions)
-import openvr as vr
-import sys
-import time
-
-EVENT_NONE = 0
-EVENT_RISING_EDGE = 1
-EVENT_FALLING_EDGE = 2
-
-class InputEvent:
- def __init__(self,
- opcode: int):
- self.opcode = opcode
-
-# Checks if the given button on the given controller is pressed.
-def pollButtonPress(
- hand: str = "right",
- button: str = "b",
- ctrl = None # ThreadControl object
- ) -> int:
- hands = {}
- hands["left"] = vr.TrackedControllerRole_LeftHand
- hands["right"] = vr.TrackedControllerRole_RightHand
-
- buttons = {}
- buttons["a"] = vr.k_EButton_IndexController_A
- buttons["b"] = vr.k_EButton_IndexController_B
- buttons["thumbstick"] = vr.k_EButton_Axis0
-
- system = None
- first = True
- while ctrl.run_app and not system:
- try:
- system = vr.init(vr.VRApplication_Background)
- except Exception as e:
- if first:
- print(f"Failed to start steamVR input thread: {repr(e)}", file=sys.stderr)
- first = False
- time.sleep(1)
- last_packet = 0
- event_high = False
-
- while ctrl.run_app:
- time.sleep(0.01)
-
- lh_idx = system.getTrackedDeviceIndexForControllerRole(hands[hand])
- #print("left hand device idx: {}".format(lh_idx))
-
- got_state, state = system.getControllerState(lh_idx)
- if not got_state:
- continue
-
- if state.unPacketNum == last_packet:
- continue
-
- # Clicking joysticks and moving joysticks fire the same events. To
- # differentiate movement from clicking, we create a dead zone: if the event
- # fires while the stick isn't moved far from center, we assume it's a
- # click, not movement.
- dead_zone_radius = 0.7
-
- button_mask = (1 << buttons[button])
- ret = EVENT_NONE
- if (state.ulButtonPressed & button_mask) != 0 and\
- (state.rAxis[0].x**2 + state.rAxis[0].y**2 < dead_zone_radius**2):
- #print("button pressed: %016x" % state.ulButtonPressed)
- #for i in range(0, 5):
- # print("axis {} x: {} y: {}".format(i, state.rAxis[i].x, state.rAxis[i].y))
- if not event_high:
- yield InputEvent(EVENT_RISING_EDGE)
- event_high = True
- elif event_high:
- event_high = False
- yield InputEvent(EVENT_FALLING_EDGE)
-
-if __name__ == "__main__":
- gen = pollButtonPress()
- while True:
- time.sleep(0.1)
-
- event = pollButtonPress(session_state)
- if event == EVENT_RISING_EDGE:
- print("rising edge")
- elif event == EVENT_FALLING_EDGE:
- print("falling edge")
-
diff --git a/Scripts/text_to_text_demo.py b/Scripts/text_to_text_demo.py
deleted file mode 100644
index 4810361..0000000
--- a/Scripts/text_to_text_demo.py
+++ /dev/null
@@ -1,96 +0,0 @@
-#!/usr/bin/env python3
-# python3 -m pip install python-osc pillow
-
-from math import ceil
-from paging import MultiLinePager
-from pythonosc import udp_client
-
-import generate_utils
-import osc_ctrl
-import time
-
-class AppConfig:
- def __init__(self,
- rows: int = 4,
- cols: int = 40,
- chars_per_sync: int = 10,
- osc_sync_rate_hz: int = 3):
- self.rows = rows
- self.cols = cols
- self.chars_per_sync = chars_per_sync
- self.osc_sync_rate_hz = osc_sync_rate_hz
- self.client = osc_ctrl.getClient()
-
-def encodeMessage(msg):
- encoded = []
- for char in msg:
- encoded.append(ord(char))
- return encoded
-
-class OSCSyncHelper:
- def __init__(self,
- config: AppConfig):
- self.sync_delay_s = 1.0 / config.osc_sync_rate_hz
- self.last = time.time() - self.sync_delay_s
-
- def waitForSync(self) -> None:
- # sleep() can sleep for too short a time, so use a loop to ensure that
- # we sleep at least a full sync window's worth of time.
- while time.time() - self.last < self.sync_delay_s:
- time.sleep(0.01)
- self.last = time.time()
-
-def sendMessage(msg: str, cfg: AppConfig, osc: OSCSyncHelper) -> None:
- num_cells = cfg.rows * cfg.cols
- num_regions = ceil(num_cells / cfg.chars_per_sync)
-
- pager = MultiLinePager(cfg.chars_per_sync, cfg.rows, cfg.cols)
-
- # Show the chatbox
- osc.waitForSync()
- osc_ctrl.toggleBoard(cfg.client, True)
- osc_ctrl.ellipsis(cfg.client, False)
- osc_ctrl.disable(cfg.client)
-
- # Ensure that the chatbox is cleared.
- addr="/avatar/parameters/" + generate_utils.getClearBoardParam()
- cfg.client.send_message(addr, True)
- osc.waitForSync()
- cfg.client.send_message(addr, False)
-
- slice_idx = 0
- while slice_idx != -1:
-
- msg_slice, slice_idx = pager.getNextSlice(msg)
- which_region = slice_idx % num_regions
-
- print(f"Sending slice '{msg_slice}' to region {which_region}")
-
- # Wait until OSC has had enough time to sync the previous window of
- # data.
- osc.waitForSync()
-
- # Enable chatbox animations.
- osc_ctrl.enable(cfg.client)
-
- # Seek to the current region.
- addr="/avatar/parameters/" + generate_utils.getSelectParam()
- cfg.client.send_message(addr, which_region)
-
- # Send all characters in the current region.
- encoded = encodeMessage(msg_slice)
- for i in range(0, len(msg_slice)):
- print(f"Sending char {msg_slice[i]} / {encoded[i]}")
- addr="/avatar/parameters/" + generate_utils.getBlendParam(i, 0)
- letter_remapped = (-127.5 + encoded[i]) / 127.5
- cfg.client.send_message(addr, letter_remapped)
-
- # Disable chatbox animations to ensure stability.
- osc.waitForSync()
- osc_ctrl.disable(cfg.client)
-
-if __name__ == "__main__":
- cfg = AppConfig()
- osc = OSCSyncHelper(cfg)
- sendMessage("Hello, world! aiueo aiueo aiueo aiueo aiueo eeeeeeeeeeeeeeeeeeeeeeee", cfg, osc)
-
diff --git a/Scripts/text_wrapping.py b/Scripts/text_wrapping.py
deleted file mode 100644
index 7576b78..0000000
--- a/Scripts/text_wrapping.py
+++ /dev/null
@@ -1,55 +0,0 @@
-#!/usr/bin/env python3
-
-class TextWrapper:
- def __init__(self, rows, cols):
- self.rows = rows
- self.cols = cols
-
- # Split `msg` along line boundaries. Long words tend to just go onto new
- # lines. Words that are too long to fit on any line are hyphenated and
- # split.
- # Lines are padded with space (" ") characters so they're all `self.cols`
- # characters long. Pages are padded with lines full of space characters so
- # they're all `self.rows` lines long.
- def wrap(self, msg: str) -> list[list[str]]:
- pages = []
- lines = []
- line = ""
- for word in msg.split():
- if len(line) + 1 + len(word) <= self.cols:
- if len(line):
- line += " "
- line += word
- continue
- # Word won't fit onto this line. End the line.
- if len(line):
- line += " " * (self.cols - len(line))
- lines.append(line)
- line = ""
- while len(word) > self.cols:
- prefix = word[0:self.cols-1] + "-"
- lines.append(prefix)
- suffix = word[self.cols-1:]
- word = suffix
- if len(word):
- line = word
- if len(line):
- line += " " * (self.cols - len(line))
- lines.append(line)
- while len(lines):
- pages.append(lines[0:self.rows])
- lines = lines[self.rows:]
- if len(pages):
- num_extra_lines = (self.rows - (len(pages[-1]) % self.rows)) % self.rows
- pages[-1] += [" " * self.cols] * num_extra_lines
- return pages
-
-if __name__ == "__main__":
- w = TextWrapper(2, 5)
-
- assert(w.wrap("foo") == [["foo ", " "]])
- assert(w.wrap("foo bar") == [["foo ", "bar "]])
- assert(w.wrap("bagel") == [["bagel", " "]])
- assert(w.wrap("bagels") == [["bage-", "ls "]])
- assert(w.wrap("hot bagels") == [["hot ", "bage-"], ["ls ", " "]])
-
diff --git a/Scripts/transcribe_pipeline.py b/Scripts/transcribe_pipeline.py
deleted file mode 100644
index 5914afc..0000000
--- a/Scripts/transcribe_pipeline.py
+++ /dev/null
@@ -1,35 +0,0 @@
-import time
-
-
-class TranscriptCommit:
- def __init__(self,
- delta: str,
- preview: str,
- latency_s: float = None,
- thresh_at_commit: int = None,
- audio: bytes = None,
- duration_s: float = None,
- start_ts: float = None):
- self.delta = delta
- self.preview = preview
- self.latency_s = latency_s
- self.thresh_at_commit = thresh_at_commit
- self.audio = audio
- # Time at which the commit is generated
- self.ts = time.time()
- # Time corresponding to the start of the segment
- self.start_ts = start_ts
- # The duration of the audio segment, in seconds.
- self.duration_s = duration_s
-
-
-class StreamingPlugin:
- def __init__(self):
- pass
-
- def transform(self, commit: TranscriptCommit) -> TranscriptCommit:
- return commit
-
- def stop(self):
- pass
-
diff --git a/Scripts/transcribe_v2.py b/Scripts/transcribe_v2.py
deleted file mode 100644
index e024bae..0000000
--- a/Scripts/transcribe_v2.py
+++ /dev/null
@@ -1,1172 +0,0 @@
-from browser_src import BrowserSource
-from datetime import datetime
-from emotes_v2 import EmotesState
-from faster_whisper import WhisperModel
-from functools import partial
-from huggingface_hub import hf_hub_download
-from profanity_filter import ProfanityFilter
-from pydub import AudioSegment
-from sentence_splitter import split_text_into_sentences
-from transcribe_pipeline import StreamingPlugin, TranscriptCommit
-
-import app_config
-import argparse
-import ctranslate2
-import editdistance
-import glob
-import keybind_event_machine
-import keyboard
-import langcodes
-import lang_compat
-import math
-import numpy as np
-import os
-import osc_ctrl
-import pyaudio
-import steamvr
-import subprocess
-import sys
-import threading
-import time
-import transformers
-import typing
-import vad
-import wave
-import winsound
-
-class ThreadControl:
- def __init__(self, cfg):
- self.cfg = cfg
- self.run_app = True
-
-class AudioStream():
- FORMAT = pyaudio.paInt16
- # Size of each frame (audio sample), in bytes. If you change FORMAT, make
- # sure this stays up to date!
- FRAME_SZ = 2
- # Frames per second.
- FPS = 16000
- CHANNELS = 1
- def __init__(self):
- pass
-
- def getSamples(self) -> bytes:
- raise NotImplementedError("getSamples is not implemented!")
-
-class DiskStream(AudioStream):
- def __init__(self, path: str):
- fmt = None
- if path.endswith(".mp3"):
- fmt = "mp3"
- elif path.endswith(".wav"):
- fmt = "wav"
- else:
- raise NotImplementedError(f"Requested file type {path} " + \
- "is not supported")
- print(f"Loading audio data", file=sys.stderr)
- audio = AudioSegment.from_file(path, format=fmt)
- audio = audio.set_channels(1)
- # TODO(yum) replace manual decimation code with this!
- audio = audio.set_frame_rate(16000)
- frames = np.array(audio.get_array_of_samples())
- frames = np.int16(frames).tobytes()
-
- self.frames = frames
-
- print(f"Loaded data", file=sys.stderr)
-
- def getSamples(self) -> bytes:
- # Give out samples at a fixed rate to minimize
- # noise.
- give_s = 0.2
- nframes = int(give_s * AudioStream.FPS)
- frames = self.frames[0:nframes * AudioStream.FRAME_SZ];
- self.frames = self.frames[nframes * AudioStream.FRAME_SZ:]
-
- if len(frames) < nframes:
- frames += np.zeros(nframes - len(frames), dtype=np.int16).tobytes()
-
- return frames
-
-class MicStream(AudioStream):
- CHUNK_SZ = 1024
-
- def __init__(self, which_mic: str):
- self.p = pyaudio.PyAudio()
- self.stream = None
- self.sample_rate = None
- # Each time pyaudio gives us audio data, it's in the form of a chunk of
- # samples. We keep these in a list to keep the audio callback as light
- # as possible. Whenever downstream layers want data, we collapse the
- # list into a single array of data (a bytes object).
- self.chunks = []
- # If set, incoming frames are simply discarded.
- self.paused = False
-
- print(f"Finding mic {which_mic}", file=sys.stderr)
- self.dumpMicDevices()
-
- got_match = False
- device_index = -1
- if which_mic == "index":
- target_str = "Digital Audio Interface"
- elif which_mic == "focusrite":
- target_str = "Focusrite"
- elif which_mic == "motu":
- target_str = "In 1-2 (MOTU M Series)"
- elif which_mic == "beyond":
- target_str = "Microphone (Beyond)"
- else:
- print(f"Mic {which_mic} requested, treating it as a numerical " +
- "device ID", file=sys.stderr)
- device_index = int(which_mic)
- got_match = True
- if not got_match:
- info = self.p.get_host_api_info_by_index(0)
- numdevices = info.get('deviceCount')
- for i in range(0, numdevices):
- if (self.p.get_device_info_by_host_api_device_index(0, i).get('maxInputChannels')) > 0:
- device_name = self.p.get_device_info_by_host_api_device_index(0, i).get('name')
- if target_str in device_name:
- print(f"Got matching mic: {device_name}",
- file=sys.stderr)
- device_index = i
- got_match = True
- break
- if not got_match:
- raise KeyError(f"Mic {which_mic} not found")
-
- info = self.p.get_device_info_by_host_api_device_index(0, device_index)
- print(f"Found mic {which_mic}: {info['name']}", file=sys.stderr)
- self.sample_rate = int(info['defaultSampleRate'])
- print(f"Mic sample rate: {self.sample_rate}", file=sys.stderr)
-
- self.stream = self.p.open(
- rate=self.sample_rate,
- channels=AudioStream.CHANNELS,
- format=AudioStream.FORMAT,
- input=True,
- frames_per_buffer=MicStream.CHUNK_SZ,
- input_device_index=device_index,
- stream_callback=self.onAudioFramesAvailable)
-
- self.stream.start_stream()
-
- AudioStream.__init__(self)
-
- def pause(self, state: bool = True):
- self.paused = state
-
- def dumpMicDevices(self):
- info = self.p.get_host_api_info_by_index(0)
- numdevices = info.get('deviceCount')
-
- for i in range(0, numdevices):
- if (self.p.get_device_info_by_host_api_device_index(0, i).get('maxInputChannels')) > 0:
- device_name = self.p.get_device_info_by_host_api_device_index(0, i).get('name')
- print("Input Device id ", i, " - ", device_name)
-
- def onAudioFramesAvailable(self,
- frames,
- frame_count,
- time_info,
- status_flags):
- if self.paused:
- # Don't literally pause, just start returning silence. This allows
- # the `min_segment_age_s` check to work while paused.
- n_frames = int(frame_count * AudioStream.FPS /
- float(self.sample_rate))
- self.chunks.append(np.zeros(n_frames,
- dtype=np.int16).tobytes())
- return (frames, pyaudio.paContinue)
-
- decimated = b''
- # In pyaudio, a `frame` is a single sample of audio data.
- frame_len = AudioStream.FRAME_SZ
- next_frame = 0.0
- # The mic probably has a higher sample rate than Whisper wants, so
- # decrease the sample rate by dropping samples. Note that this
- # algorithm only works if the mic's rate is higher than whisper's
- # expected rate.
- keep_every = float(self.sample_rate) / AudioStream.FPS
- for i in range(frame_count):
- if i >= next_frame:
- decimated += frames[i*frame_len:(i+1)*frame_len]
- next_frame += keep_every
- self.chunks.append(decimated)
-
- return (frames, pyaudio.paContinue)
-
- # Get audio data and the corresponding timestamp.
- def getSamples(self) -> bytes:
- chunks = self.chunks
- self.chunks = []
- result = b''.join(chunks)
- return result
-
-class AudioCollector:
- def __init__(self, stream: AudioStream):
- self.stream = stream
- self.frames = b''
- # Note: by design, this is the only spot where we anchor our timestamps
- # against the real world. This is done to make it possible to profile
- # test cases which read from disk (at much faster than real speed) in
- # the same way that we profile real-time data.
- self.wall_ts = time.time()
-
- def getAudio(self) -> bytes:
- frames = self.stream.getSamples()
- if frames:
- self.frames += frames
- return self.frames
-
- def dropAudioPrefix(self, dur_s: float) -> bytes:
- n_bytes = int(dur_s * AudioStream.FPS) * self.stream.FRAME_SZ
- n_bytes = min(n_bytes, len(self.frames))
- cut_portion = self.frames[:n_bytes]
- self.frames = self.frames[n_bytes:]
- self.wall_ts += float(n_bytes / self.stream.FRAME_SZ) / self.stream.FPS
- return cut_portion
-
- def dropAudioPrefixByFrames(self, dur_frames: int) -> bytes:
- n_bytes = dur_frames * self.stream.FRAME_SZ
- n_bytes = min(n_bytes, len(self.frames))
- cut_portion = self.frames[:n_bytes]
- self.frames = self.frames[n_bytes:]
- self.wall_ts += float(n_bytes / self.stream.FRAME_SZ) / self.stream.FPS
- return cut_portion
-
- def keepLast(self, dur_s: float) -> bytes:
- drop_len = max(0, self.duration() - dur_s)
- return self.dropAudioPrefix(drop_len)
-
- def dropAudio(self):
- self.wall_ts += self.duration()
- cut_portion = self.frames
- self.frames = b''
- return cut_portion
-
- def duration(self):
- return len(self.frames) / (AudioStream.FPS * self.stream.FRAME_SZ)
-
- def begin(self):
- return self.wall_ts
-
- def now(self):
- return self.begin() + self.duration()
-
-class AudioCollectorFilter:
- def __init__(self, parent: AudioCollector):
- self.parent = parent
-
- def getAudio(self) -> bytes:
- return self.parent.getAudio()
- def dropAudioPrefix(self, dur_s: float):
- return self.parent.dropAudioPrefix(dur_s)
- def dropAudioPrefixByFrames(self, dur_frames: int):
- return self.parent.dropAudioPrefixByFrames(dur_frames)
- def keepLast(self, dur_s):
- return self.parent.keepLast(dur_s)
- def dropAudio(self):
- return self.parent.dropAudio()
- def duration(self):
- return self.parent.duration()
- def begin(self):
- return self.parent.begin()
- def now(self):
- return self.parent.now()
-
-# Audio collector that enforces a minimum length on its audio data.
-class LengthEnforcingAudioCollector(AudioCollectorFilter):
- def __init__(self, parent: AudioCollector, min_duration_s: float):
- AudioCollectorFilter.__init__(self, parent)
- self.min_duration_s = min_duration_s
-
- def getAudio(self) -> bytes:
- audio = self.parent.getAudio()
- min_duration_frames = int(self.min_duration_s * AudioStream.FPS)
- pad_len_frames = max(0, min_duration_frames - int(len(audio) /
- AudioStream.FRAME_SZ))
- pad = np.zeros(pad_len_frames, dtype=np.int16).tobytes()
- return pad + audio
-
-class NormalizingAudioCollector(AudioCollectorFilter):
- def __init__(self, parent: AudioCollector):
- AudioCollectorFilter.__init__(self, parent)
-
- def getAudio(self) -> bytes:
- audio = self.parent.getAudio()
-
- audio = AudioSegment(audio, sample_width=AudioStream.FRAME_SZ,
- frame_rate=AudioStream.FPS, channels=AudioStream.CHANNELS)
- audio = audio.normalize()
-
- frames = np.array(audio.get_array_of_samples())
- frames = np.int16(frames).tobytes()
-
- return frames
-
-class CompressingAudioCollector(AudioCollectorFilter):
- def __init__(self, parent: AudioCollector):
- AudioCollectorFilter.__init__(self, parent)
-
- def getAudio(self) -> bytes:
- audio = self.parent.getAudio()
-
- audio = AudioSegment(audio, sample_width=AudioStream.FRAME_SZ,
- frame_rate=AudioStream.FPS, channels=AudioStream.CHANNELS)
- # subtle compression has a slight positive effect on my benchmark
- audio = audio.compress_dynamic_range(threshold=-10, ratio=2.0)
-
- frames = np.array(audio.get_array_of_samples())
- frames = np.int16(frames).tobytes()
-
- return frames
-
-class AudioSegmenter:
- def __init__(self,
- min_silence_ms=250,
- max_speech_s=5):
- self.vad_options = vad.VadOptions(
- min_silence_duration_ms=min_silence_ms,
- max_speech_duration_s=max_speech_s)
- pass
-
- def segmentAudio(self, audio: bytes):
- audio = np.frombuffer(audio,
- dtype=np.int16).flatten().astype(np.float32) / 32768.0
- return vad.get_speech_timestamps(audio, vad_options=self.vad_options)
-
- # Returns the stable cutoff (if any) and whether there are any segments.
- def getStableCutoff(self, audio: bytes) -> typing.Tuple[int, bool]:
- min_delta_frames = int((self.vad_options.min_silence_duration_ms *
- AudioStream.FPS) / 1000.0)
- cutoff = None
-
- last_end = None
- segments = self.segmentAudio(audio)
-
- for i in range(len(segments)):
- s = segments[i]
- #print(f"s: {s}")
- #print(f"last_end: {last_end}")
-
- if last_end:
- delta_frames = s['start'] - last_end
- #print(f"delta frames: {delta_frames}")
- if delta_frames > min_delta_frames:
- cutoff = s['start']
- else:
- last_end = s['end']
-
- if i == len(segments) - 1:
- now = int(len(audio) / AudioStream.FRAME_SZ)
- #print(f"now: {now}")
- #print(f"min d: {min_delta_frames}")
- delta_frames = now - s['end']
- if delta_frames > min_delta_frames:
- cutoff = now - int(min_delta_frames / 2)
-
- return (cutoff, len(segments) > 0)
-
-# A segment of transcribed audio. `start_ts` and `end_ts` are floating point
-# number of seconds since the beginning of audio data.
-class Segment:
- def __init__(self,
- transcript: str,
- start_ts: float,
- end_ts: float,
- wall_ts: float,
- avg_logprob: float,
- no_speech_prob: float,
- compression_ratio: float):
- self.transcript = transcript
- # start_ts, end_ts are timestamps in seconds relative to `wall_ts`.
- self.start_ts = start_ts
- self.end_ts = end_ts
- # wall_ts is the time.time() at which the oldest audio sample leading
- # to this transcript was collected.
- self.wall_ts = wall_ts
- self.avg_logprob = avg_logprob
- self.no_speech_prob = no_speech_prob
- self.compression_ratio = compression_ratio
-
- def __str__(self):
- ts = f"(ts: {self.start_ts}-{self.end_ts}) "
-
- wall_ts_start = datetime.utcfromtimestamp(self.start_ts + self.wall_ts).strftime('%H:%M:%S')
- wall_ts_end = datetime.utcfromtimestamp(self.end_ts + self.wall_ts).strftime('%H:%M:%S')
- wall_ts = f"(wall ts: {wall_ts_start}-{wall_ts_end}) "
-
- no_speech = f"(no_speech: {self.no_speech_prob}) "
- avg_logprob = f"(avg_logprob: {self.avg_logprob}) "
- return f"{self.transcript} " + ts + wall_ts + no_speech + avg_logprob
-
-class Whisper:
- def __init__(self,
- collector: AudioCollector,
- cfg: typing.Dict):
- self.collector = collector
- self.model = None
- self.cfg = cfg
-
- abspath = os.path.abspath(__file__)
- my_dir = os.path.dirname(abspath)
- parent_dir = os.path.dirname(my_dir)
-
- model_str = cfg["model"]
- model_root = os.path.join(parent_dir, "Models",
- os.path.normpath(model_str))
- print(f"Model {cfg['model']} will be saved to {model_root}",
- file=sys.stderr)
-
- model_device = "cuda"
- if cfg["use_cpu"]:
- model_device = "cpu"
-
- already_downloaded = os.path.exists(model_root)
-
- self.model = WhisperModel(model_str,
- device = model_device,
- device_index = cfg["gpu_idx"],
- compute_type = cfg["compute_type"],
- download_root = model_root,
- local_files_only = already_downloaded)
-
- def transcribe(self, frames: bytes = None) -> typing.List[Segment]:
- if frames is None:
- frames = self.collector.getAudio()
- # Convert from signed 16-bit int [-32768, 32767] to signed 32-bit float on
- # [-1, 1].
- audio = np.frombuffer(frames,
- dtype=np.int16).flatten().astype(np.float32) / 32768.0
-
- t0 = time.time()
- segments, info = self.model.transcribe(
- audio,
- language = langcodes.find(self.cfg["language"]).language,
- vad_filter = True,
- temperature=0.0,
- without_timestamps = False)
- res = []
- for s in segments:
- # Manual touchup. I see a decent number of hallucinations sneaking
- # in with high `no_speech_prob` and modest `avg_logprob`.
- if s.no_speech_prob > 0.6 and s.avg_logprob < -0.5:
- if cfg["enable_debug_mode"]:
- print(f"Drop probable hallucination (case 1) " +
- f"(text='{s.text}', " +
- f"no_speech_prob={s.no_speech_prob}, " +
- f"avg_logprob={s.avg_logprob})", file=sys.stderr)
- continue
- # Another touchup targeted at the vexatious "thanks for watching!"
- # hallucination. This triggers a lot when listening to
- # instrumental/electronic music.
- if s.no_speech_prob > 0.15 and s.avg_logprob < -0.7:
- if cfg["enable_debug_mode"]:
- print(f"Drop probable hallucination (case 2) " +
- f"(text='{s.text}', " +
- f"no_speech_prob={s.no_speech_prob}, " +
- f"avg_logprob={s.avg_logprob})", file=sys.stderr)
- continue
- if cfg["enable_debug_mode"]:
- print(f"s get: {s}")
- if s.avg_logprob < -1.0:
- continue
- if s.compression_ratio > 2.4:
- continue
- res.append(Segment(s.text, s.start, s.end,
- self.collector.begin(),
- s.avg_logprob, s.no_speech_prob, s.compression_ratio))
- t1 = time.time()
- if cfg["enable_debug_mode"]:
- print(f"Transcription latency (s): {t1 - t0}")
- return res
-
-def saveAudio(audio: bytes, path: str):
- with wave.open(path, 'wb') as wf:
- print(f"Saving audio to {path}", file=sys.stderr)
- wf.setnchannels(AudioStream.CHANNELS)
- wf.setsampwidth(AudioStream.FRAME_SZ)
- wf.setframerate(AudioStream.FPS)
- wf.writeframes(audio)
-
-class VadCommitter:
- def __init__(self,
- cfg: typing.Dict,
- collector: AudioCollector,
- whisper: Whisper,
- segmenter: AudioSegmenter):
- self.cfg = cfg
- self.collector = collector
- self.whisper = whisper
- self.segmenter = segmenter
-
- def getDelta(self) -> TranscriptCommit:
- audio = self.collector.getAudio()
- stable_cutoff, has_audio = self.segmenter.getStableCutoff(audio)
-
- delta = ""
- commit_audio = None
- latency_s = None
- duration_s = self.collector.duration()
- start_ts = self.collector.begin()
-
- if has_audio and stable_cutoff:
- #print(f"stable cutoff get: {stable_cutoff}", file=sys.stderr)
- latency_s = self.collector.now() - self.collector.begin()
- duration_s = stable_cutoff / AudioStream.FPS
- start_ts = self.collector.begin()
- commit_audio = self.collector.dropAudioPrefixByFrames(stable_cutoff)
-
- segments = self.whisper.transcribe(commit_audio)
- delta = ''.join(s.transcript for s in segments)
- audio = self.collector.getAudio()
- if cfg["enable_debug_mode"]:
- for s in segments:
- print(f"commit segment: {s}", file=sys.stderr)
- print(f"delta get: {delta}", file=sys.stderr)
-
- if False:
- ts = datetime.fromtimestamp(self.collector.now() - latency_s)
- filename = str(ts.strftime('%Y_%m_%d__%H-%M-%S')) + ".wav"
- saveAudio(commit_audio, filename)
-
- preview = ""
- if self.cfg["enable_previews"] and has_audio:
- segments = self.whisper.transcribe(audio)
- preview = "".join(s.transcript for s in segments)
-
- if not has_audio:
- #print("VAD detects no audio, skip transcription", file=sys.stderr)
- self.collector.keepLast(1.0)
-
- return TranscriptCommit(
- delta.strip(),
- preview.strip(),
- latency_s,
- audio=audio,
- duration_s=duration_s,
- start_ts=start_ts)
-
-def install_in_venv(pkgs: typing.List[str]) -> bool:
- pkgs_str = " ".join(pkgs)
- print(f"Installing {pkgs_str}")
- pip_proc = subprocess.Popen(
- f"Resources/Python/python.exe -m pip install {pkgs_str} --no-warn-script-location".split(),
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- pip_stdout, pip_stderr = pip_proc.communicate()
- pip_stdout = pip_stdout.decode("utf-8")
- pip_stderr = pip_stderr.decode("utf-8")
- print(pip_stdout, file=sys.stderr)
- print(pip_stderr, file=sys.stderr)
- if pip_proc.returncode != 0:
- print(f"`pip install {pkgs_str}` exited with {pip_proc.returncode}",
- file=sys.stderr)
- return False
- return True
-
-class TranslationPlugin(StreamingPlugin):
- def __init__(self, cfg):
- lang_bits = cfg["language_target"].split(" | ")
- self.cfg = cfg
- self.language_target = None
- self.translator = None
- self.tokenizer = None
- if len(lang_bits) != 2:
- return
- self.language_target = lang_bits[1]
-
- print("Translation requested", file=sys.stderr)
- # The ctranslate2 model converter needs torch. Grr.
- if not install_in_venv(["torch==2.2.2"]):
- return
-
- output_dir = "Resources/" + cfg["model_translation"]
- # Provided by ctranslate2 Python package
- cmd = "ct2-transformers-converter.exe --model facebook/" + \
- cfg["model_translation"] + " --output_dir " + output_dir
-
- print(f"Fetching translation algorithm ({cfg['model_translation']})",
- file=sys.stderr)
- if not os.path.exists(output_dir):
- ct2_proc = subprocess.Popen(
- cmd.split(),
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- ct2_stdout, ct2_stderr = ct2_proc.communicate()
- ct2_stdout = ct2_stdout.decode("utf-8")
- ct2_stderr = ct2_stderr.decode("utf-8")
- print(ct2_stdout, file=sys.stderr)
- print(ct2_stderr, file=sys.stderr)
- if ct2_proc.returncode != 0:
- print(f"Failed to get NLLB model: ct2 process exited with "
- "{ct2_proc.returncode}", file=sys.stderr)
- print(f"Using model at {output_dir}", file=sys.stderr)
-
- model_device = "cuda"
- if cfg["use_cpu"]:
- model_device = "cpu"
-
- self.translator = ctranslate2.Translator(output_dir,
- device = model_device,
- device_index = cfg["gpu_idx"],
- compute_type = cfg["compute_type"])
-
- whisper_lang = cfg["language"]
- nllb_lang = lang_compat.whisper_to_nllb[whisper_lang]
-
- self.tokenizer = transformers.AutoTokenizer.from_pretrained(
- "facebook/" + cfg["model_translation"],
- src_lang=nllb_lang)
-
- print(f"Translation ready to go", file=sys.stderr)
-
- def transform(self, commit: TranscriptCommit) -> TranscriptCommit:
- if not self.language_target:
- return commit
-
- def _translate_text(text: str) -> str:
-
- whisper_lang = self.cfg["language"]
- nllb_lang = lang_compat.whisper_to_nllb[whisper_lang]
- ss_lang = lang_compat.nllb_to_ss[nllb_lang]
- sentences = split_text_into_sentences(text, language=ss_lang)
-
- translated_sentences = []
- for sentence in sentences:
- source = self.tokenizer.convert_ids_to_tokens(self.tokenizer.encode(sentence))
- target_prefix = [self.language_target]
- results = self.translator.translate_batch([source], target_prefix=[target_prefix])
- target = results[0].hypotheses[0][1:]
- translated_sentence = self.tokenizer.decode(self.tokenizer.convert_tokens_to_ids(target))
- translated_sentences.append(translated_sentence)
- translated = " ".join(translated_sentences)
- if cfg["enable_orig_lang"] and len(sentences) > 0:
- translated += f" ({text})"
- return translated
-
- commit.delta = _translate_text(commit.delta)
- commit.preview = _translate_text(commit.preview)
- return commit
-
-class LowercasePlugin(StreamingPlugin):
- def __init__(self, cfg):
- self.cfg = cfg
-
- def transform(self, commit: TranscriptCommit) -> TranscriptCommit:
- if self.cfg["enable_lowercase_filter"]:
- commit.delta = commit.delta.lower()
- commit.preview = commit.preview.lower()
- return commit
-
-class UppercasePlugin(StreamingPlugin):
- def __init__(self, cfg):
- self.cfg = cfg
-
- def transform(self, commit: TranscriptCommit) -> TranscriptCommit:
- if self.cfg["enable_uppercase_filter"]:
- commit.delta = commit.delta.upper()
- commit.preview = commit.preview.upper()
- return commit
-
-class UwuPlugin(StreamingPlugin):
- def __init__(self, cfg):
- self.cfg = cfg
-
- def transform(self, commit: TranscriptCommit) -> TranscriptCommit:
- if self.cfg["enable_uwu_filter"]:
- def _to_uwu(s: str) -> str:
- uwu_proc = subprocess.Popen(["Resources/Uwu/Uwwwu.exe", s],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- uwu_stdout, uwu_stderr = uwu_proc.communicate()
- uwu_text = uwu_stdout.decode("utf-8")
- uwu_text = uwu_text.replace("\n", "")
- uwu_text = uwu_text.replace("\r", "")
- if uwu_text.isspace():
- return ""
- # Guarantee that the segment starts with a single space and
- # doesn't end with whitespace.
- uwu_text = " " + uwu_text.lstrip().rstrip()
- return uwu_text
- commit.delta = _to_uwu(commit.delta)
- commit.preview = _to_uwu(commit.preview)
- return commit
-
-class ProfanityPlugin(StreamingPlugin):
- def __init__(self, cfg):
- self.cfg = cfg
- en_profanity_path = os.path.abspath("Resources/Profanity/en")
- self.filter = ProfanityFilter(en_profanity_path)
- if cfg["enable_profanity_filter"]:
- self.filter.load()
-
- def transform(self, commit: TranscriptCommit) -> TranscriptCommit:
- if self.cfg["enable_profanity_filter"]:
- commit.delta = self.filter.filter(commit.delta)
- commit.preview = self.filter.filter(commit.preview)
- return commit
-
-class PresentationFilter:
- def __init__(self):
- pass
-
- def transform(self, transcript: str, preview: str) -> typing.Tuple[str, str]:
- return transcript, preview
-
- def stop(self):
- pass
-
-class TrailingPeriodFilter(PresentationFilter):
- def __init__(self, cfg):
- self.cfg = cfg
-
- def transform(self, transcript: str, preview: str) -> typing.Tuple[str, str]:
- if self.cfg["remove_trailing_period"]:
- def _remove_trailing_period(s: str) -> str:
- if len(s) > 0 and s[-1] == '.' and not s.endswith("..."):
- s = s[0:len(s)-1]
- return s
- if len(preview) == 0:
- print("here")
- transcript = _remove_trailing_period(transcript)
- else:
- print("there")
- preview = _remove_trailing_period(preview)
- return transcript, preview
-
-class OscPager:
- def __init__(self, cfg):
- self.osc_state = osc_ctrl.OscState(cfg["chars_per_sync"],
- cfg["rows"],
- cfg["cols"],
- cfg["bytes_per_char"])
- self.cfg = cfg
- self.next_sync_window = time.time()
-
- def page(self, text):
- if self.cfg["use_builtin"]:
- osc_ctrl.pageMessageBuiltin(self.cfg, self.osc_state, text)
- self.bumpSyncWindow(amount_s=1.5)
- else:
- osc_ctrl.pageMessage(self.cfg, self.osc_state, text, EmotesState())
- self.bumpSyncWindow()
-
- def bumpSyncWindow(self, amount_s=osc_ctrl.SYNC_DELAY_S):
- self.next_sync_window = time.time() + amount_s
-
- def getSyncWindow(self):
- while time.time() < self.next_sync_window:
- time.sleep(0.01)
-
- def clear(self):
- osc_ctrl.clear(self.osc_state)
- self.bumpSyncWindow()
-
- def toggleBoard(self, state: bool):
- osc_ctrl.toggleBoard(self.osc_state.client, state)
- self.bumpSyncWindow()
-
- def lockWorld(self, state: bool):
- osc_ctrl.lockWorld(self.osc_state.client, state)
- self.bumpSyncWindow()
-
- def ellipsis(self, state: bool):
- osc_ctrl.ellipsis(self.osc_state.client, state)
- self.bumpSyncWindow()
-
-def transcriptionThread(ctrl: ThreadControl):
- last_stable_commit = None
-
- while ctrl.run_app:
- time.sleep(ctrl.cfg["transcription_loop_delay_ms"] / 1000.0);
-
- op = None
-
- commit = ctrl.committer.getDelta()
-
- for plugin in ctrl.plugins:
- commit = plugin.transform(commit)
-
- if len(commit.delta) > 0 or len(commit.preview) > 0:
- # Avoid re-sending text after long pauses. User controls the length
- # of the pause in the UI.
- if ctrl.cfg["reset_after_silence_s"] > 0:
- silence_duration = 0
- if last_stable_commit:
- last_commit_end_ts = \
- last_stable_commit.start_ts + \
- last_stable_commit.duration_s
- silence_duration = commit.start_ts - last_commit_end_ts
- if silence_duration > ctrl.cfg["reset_after_silence_s"]:
- print(f"Resetting transcript after {silence_duration}-second "
- "silence", file=sys.stderr)
- ctrl.transcript = ""
- ctrl.preview = ""
- if commit.delta:
- last_stable_commit = commit
-
- # Hard-cap displayed transcript length at 4k characters to prevent
- # runaway memory use in UI. Keep the full transcript to avoid
- # breaking OSC pager.
- transcript = ctrl.transcript[-4096:] + commit.delta
- preview = commit.preview
-
- for filt in ctrl.filters:
- transcript, preview = filt.transform(transcript, preview)
-
- try:
- print(f"Transcript: {transcript}")
- except UnicodeEncodeError:
- print("Failed to encode transcript - discarding delta",
- file=sys.stderr)
- continue
- try:
- print(f"Preview: {preview}")
- except UnicodeEncodeError:
- print("Failed to encode preview - discarding", file=sys.stderr)
-
- if cfg["enable_debug_mode"]:
- print(f"commit latency: {commit.latency_s}", file=sys.stderr)
- print(f"commit thresh: {commit.thresh_at_commit}",
- file=sys.stderr)
-
- if len(ctrl.transcript) > 0 and \
- (not ctrl.transcript.endswith(' ')) and \
- (not commit.delta.startswith(' ')):
- commit.delta = ' ' + commit.delta
- if len(commit.delta) > 0 and \
- (not commit.delta.endswith(' ')) and \
- (not commit.preview.startswith(' ')):
- commit.preview = ' ' + commit.preview
-
- ctrl.transcript += commit.delta
- ctrl.preview = ctrl.transcript + commit.preview
- for plugin in ctrl.plugins:
- plugin.stop()
- for filt in ctrl.filters:
- filt.stop()
-
-def vrInputThread(ctrl: ThreadControl):
- RECORD_STATE = 0
- PAUSE_STATE = 1
- state = PAUSE_STATE
-
- hand_id = ctrl.cfg["button"].split()[0]
- button_id = ctrl.cfg["button"].split()[1]
-
- # Rough description of state machine:
- # Single short press: toggle transcription
- # Medium press: dismiss custom chatbox
- # Long press: update chatbox in place
- # Medium press + long press: type transcription
-
- last_rising = time.time()
- last_medium_press_end = 0
-
- waveform0 = os.path.abspath("Resources/Sounds/Noise_On_Quiet.wav")
- waveform1 = os.path.abspath("Resources/Sounds/Noise_Off_Quiet.wav")
- waveform2 = os.path.abspath("Resources/Sounds/Dismiss_Noise_Quiet.wav")
- waveform3 = os.path.abspath("Resources/Sounds/KB_Noise_Off_Quiet.wav")
-
- button_generator = steamvr.pollButtonPress(hand=hand_id, button=button_id,
- ctrl=ctrl)
- while ctrl.run_app:
- time.sleep(0.01)
- try:
- event = next(button_generator)
- except StopIteration:
- break
-
- if event.opcode == steamvr.EVENT_RISING_EDGE:
- last_rising = time.time()
-
- if state == PAUSE_STATE:
- ctrl.stream.pause(False)
- ctrl.stream.getSamples()
-
- elif event.opcode == steamvr.EVENT_FALLING_EDGE:
- now = time.time()
- if now - last_rising > 1.5:
- # Long press: treat as the end of transcription.
- state = PAUSE_STATE
-
- ctrl.stream.pause(True)
-
- if last_rising - last_medium_press_end < 1.0:
- # Type transcription
- if ctrl.cfg["enable_local_beep"]:
- winsound.PlaySound(waveform3, winsound.SND_FILENAME | winsound.SND_ASYNC)
- pass
- # TODO(yum) this is broken! Audio is not being collected
- # while paused anymore.
- #keyboard.write(ctrl.preview)
- else:
- if ctrl.cfg["enable_local_beep"]:
- winsound.PlaySound(waveform1, winsound.SND_FILENAME | winsound.SND_ASYNC)
- pass
-
- elif now - last_rising > 0.5:
- # Medium press
- print("CLEARING", file=sys.stderr)
- last_medium_press_end = now
- state = PAUSE_STATE
-
- if ctrl.cfg["enable_local_beep"]:
- winsound.PlaySound(waveform2, winsound.SND_FILENAME | winsound.SND_ASYNC)
- pass
-
- if not ctrl.cfg["use_builtin"]:
- ctrl.pager.getSyncWindow()
- ctrl.pager.toggleBoard(False)
-
- # Flush the *entire* pipeline.
- ctrl.stream.pause(True)
- ctrl.stream.getSamples()
- ctrl.collector.dropAudio()
- ctrl.pager.clear()
- if ctrl.cfg["enable_lock_at_spawn"]:
- # Give the board 0.5 seconds to disappear before unlocking from
- # world space.
- time.sleep(0.5)
- ctrl.pager.lockWorld(False)
- else:
- # Short hold
- if state == RECORD_STATE:
- print("PAUSED", file=sys.stderr)
- state = PAUSE_STATE
- if not ctrl.cfg["use_builtin"] and not ctrl.cfg["enable_lock_at_spawn"]:
- ctrl.pager.getSyncWindow()
- ctrl.pager.lockWorld(True)
-
- ctrl.stream.pause(True)
-
- if ctrl.cfg["enable_local_beep"]:
- winsound.PlaySound(waveform1, winsound.SND_FILENAME | winsound.SND_ASYNC)
- pass
- elif state == PAUSE_STATE:
- print("RECORDING", file=sys.stderr)
- state = RECORD_STATE
- if not ctrl.cfg["use_builtin"]:
- ctrl.pager.getSyncWindow()
- ctrl.pager.toggleBoard(True)
- ctrl.pager.lockWorld(ctrl.cfg["enable_lock_at_spawn"])
- ctrl.pager.ellipsis(True)
- if ctrl.cfg["reset_on_toggle"]:
- if ctrl.cfg["enable_debug_mode"]:
- print("Toggle detected, dropping transcript (3)",
- file=sys.stderr)
- ctrl.transcript = ""
- ctrl.preview = ""
- #audio_state.drop_transcription = True
- else:
- if ctrl.cfg["enable_debug_mode"]:
- print("Toggle detected, committing preview text (3)", file=sys.stderr)
- #audio_state.text += audio_state.preview_text
-
- ctrl.stream.pause(False)
- ctrl.pager.clear()
-
- if ctrl.cfg["enable_local_beep"]:
- winsound.PlaySound(waveform0, winsound.SND_FILENAME | winsound.SND_ASYNC)
- pass
-
-def kbInputThread(ctrl: ThreadControl):
- machine = keybind_event_machine.KeybindEventMachine(ctrl.cfg["keybind"])
- last_press_time = 0
-
- # double pressing the keybind
- double_press_timeout = 0.5
-
- RECORD_STATE = 0
- PAUSE_STATE = 1
- state = PAUSE_STATE
-
- waveform0 = os.path.abspath("Resources/Sounds/Noise_On_Quiet.wav")
- waveform1 = os.path.abspath("Resources/Sounds/Noise_Off_Quiet.wav")
- waveform2 = os.path.abspath("Resources/Sounds/Dismiss_Noise_Quiet.wav")
- waveform3 = os.path.abspath("Resources/Sounds/KB_Noise_Off_Quiet.wav")
-
- while ctrl.run_app:
- time.sleep(0.01)
-
- cur_press_time = machine.getNextPressTime()
- if cur_press_time == 0:
- continue
-
- EVENT_SINGLE_PRESS = 0
- EVENT_DOUBLE_PRESS = 1
- if last_press_time == 0:
- event = EVENT_SINGLE_PRESS
- elif cur_press_time - last_press_time < double_press_timeout:
- event = EVENT_DOUBLE_PRESS
- else:
- event = EVENT_SINGLE_PRESS
- last_press_time = cur_press_time
-
- if event == EVENT_DOUBLE_PRESS:
- print("CLEARING", file=sys.stderr)
- state = PAUSE_STATE
-
- if ctrl.cfg["enable_local_beep"]:
- winsound.PlaySound(waveform2, winsound.SND_FILENAME | winsound.SND_ASYNC)
- pass
-
- if not ctrl.cfg["use_builtin"]:
- ctrl.pager.getSyncWindow()
- ctrl.pager.toggleBoard(False)
-
- # Flush the *entire* pipeline.
- ctrl.stream.pause(True)
- ctrl.stream.getSamples()
- ctrl.collector.dropAudio()
- ctrl.pager.clear()
- if ctrl.cfg["enable_lock_at_spawn"]:
- # Give the board 0.5 seconds to disappear before unlocking from
- # world space.
- time.sleep(0.5)
- ctrl.pager.lockWorld(False)
- continue
-
- # Short hold
- if state == RECORD_STATE:
- print("PAUSED", file=sys.stderr)
- state = PAUSE_STATE
- if not ctrl.cfg["use_builtin"] and not ctrl.cfg["enable_lock_at_spawn"]:
- ctrl.pager.getSyncWindow()
- ctrl.pager.lockWorld(True)
-
- ctrl.stream.pause(True)
-
- if ctrl.cfg["enable_local_beep"]:
- winsound.PlaySound(waveform1, winsound.SND_FILENAME | winsound.SND_ASYNC)
- pass
- elif state == PAUSE_STATE:
- print("RECORDING", file=sys.stderr)
- state = RECORD_STATE
- if not ctrl.cfg["use_builtin"]:
- ctrl.pager.getSyncWindow()
- ctrl.pager.toggleBoard(True)
- ctrl.pager.lockWorld(ctrl.cfg["enable_lock_at_spawn"])
- ctrl.pager.ellipsis(True)
- if ctrl.cfg["reset_on_toggle"]:
- if ctrl.cfg["enable_debug_mode"]:
- print("Toggle detected, dropping transcript (2)",
- file=sys.stderr)
- ctrl.transcript = ""
- ctrl.preview = ""
- else:
- if ctrl.cfg["enable_debug_mode"]:
- print("Toggle detected, committing preview text (2)",
- file=sys.stderr)
- #audio_state.text += audio_state.preview_text
-
- ctrl.stream.pause(False)
- ctrl.pager.clear()
-
- if ctrl.cfg["enable_local_beep"]:
- winsound.PlaySound(waveform0, winsound.SND_FILENAME | winsound.SND_ASYNC)
- pass
-
-def oscThread(ctrl: ThreadControl):
- while ctrl.run_app:
- ctrl.pager.getSyncWindow()
- ctrl.pager.page(ctrl.preview)
- time.sleep(0.01)
-
-def run(cfg):
- stream = MicStream(cfg["microphone"])
-
- collector = AudioCollector(stream)
- #collector = LengthEnforcingAudioCollector(collector, 5.0)
- collector = NormalizingAudioCollector(collector)
- collector = CompressingAudioCollector(collector)
- whisper = Whisper(collector, cfg)
- segmenter = AudioSegmenter(min_silence_ms=cfg["min_silence_duration_ms"],
- max_speech_s=cfg["max_speech_duration_s"])
- committer = VadCommitter(cfg, collector, whisper, segmenter)
- pager = OscPager(cfg)
-
- ctrl = ThreadControl(cfg)
- ctrl.stream = stream
- ctrl.collector = collector
- ctrl.whisper = whisper
- ctrl.committer = committer
-
- ctrl.plugins = []
- ctrl.plugins.append(TranslationPlugin(cfg))
- ctrl.plugins.append(UppercasePlugin(cfg))
- ctrl.plugins.append(LowercasePlugin(cfg))
- ctrl.plugins.append(ProfanityPlugin(cfg))
- ctrl.plugins.append(UwuPlugin(cfg))
- ctrl.plugins.append(BrowserSource(cfg))
-
- ctrl.filters = []
- ctrl.filters.append(TrailingPeriodFilter(cfg))
-
- ctrl.pager = pager
- ctrl.transcript = ""
- ctrl.preview = ""
-
- transcribe_audio_thd = threading.Thread(target=transcriptionThread, args=[ctrl])
- transcribe_audio_thd.daemon = True
- transcribe_audio_thd.start()
-
- vr_input_thd = threading.Thread(target=vrInputThread, args=[ctrl])
- vr_input_thd.daemon = True
- vr_input_thd.start()
-
- kb_input_thd = threading.Thread(target=kbInputThread, args=[ctrl])
- kb_input_thd.daemon = True
- kb_input_thd.start()
-
- osc_thd = threading.Thread(target=oscThread, args=[ctrl])
- osc_thd.daemon = True
- osc_thd.start()
-
- for line in sys.stdin:
- if "exit" in line or "quit" in line:
- print("Exit requested", file=sys.stderr)
- break
-
- ctrl.run_app = False
- print("Join transcription thread", file=sys.stderr)
- transcribe_audio_thd.join()
- print("Join vr input thread", file=sys.stderr)
- vr_input_thd.join()
- print("Join kb input thread", file=sys.stderr)
- kb_input_thd.join()
- print("Join osc thread", file=sys.stderr)
- osc_thd.join()
- print("Done", file=sys.stderr)
-
-if __name__ == "__main__":
- sys.stdout.reconfigure(encoding="utf-8")
-
- parser = argparse.ArgumentParser()
- parser.add_argument("--config", type=str, help="Path to app config YAML file.")
- args = parser.parse_args()
-
- cfg = app_config.getConfig(args.config)
-
- experiments = [
- ("Evaluate/declaration_short/audio.mp3",
- "Evaluate/declaration_short/control.txt"),
- ("Evaluate/moist/audio.mp3",
- "Evaluate/moist/control.txt"),
- ("Evaluate/vei/audio.mp3",
- "Evaluate/vei/control.txt"),
- ]
-
- if False:
- sum = 0
- for audio, control in experiments:
- print(f"Run experiment {audio} :: {control}", file=sys.stderr)
- sum += evaluate(cfg, audio, control)
- print(f"Total score: {sum}", file=sys.stderr)
- else:
- #optimize(cfg, experiments)
- run(cfg)
-
diff --git a/Scripts/vad.py b/Scripts/vad.py
deleted file mode 100644
index 25f0ad0..0000000
--- a/Scripts/vad.py
+++ /dev/null
@@ -1,315 +0,0 @@
-# MIT License
-#
-# Copyright (c) 2023 Guillaume Klein
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in all
-# copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-# SOFTWARE.
-
-import bisect
-import functools
-import os
-import warnings
-
-from typing import List, NamedTuple, Optional
-
-import numpy as np
-
-
-# The code below is adapted from https://github.com/snakers4/silero-vad.
-class VadOptions(NamedTuple):
- """VAD options.
-
- Attributes:
- threshold: Speech threshold. Silero VAD outputs speech probabilities for each audio chunk,
- probabilities ABOVE this value are considered as SPEECH. It is better to tune this
- parameter for each dataset separately, but "lazy" 0.5 is pretty good for most datasets.
- min_speech_duration_ms: Final speech chunks shorter min_speech_duration_ms are thrown out.
- max_speech_duration_s: Maximum duration of speech chunks in seconds. Chunks longer
- than max_speech_duration_s will be split at the timestamp of the last silence that
- lasts more than 100ms (if any), to prevent aggressive cutting. Otherwise, they will be
- split aggressively just before max_speech_duration_s.
- min_silence_duration_ms: In the end of each speech chunk wait for min_silence_duration_ms
- before separating it
- window_size_samples: Audio chunks of window_size_samples size are fed to the silero VAD model.
- WARNING! Silero VAD models were trained using 512, 1024, 1536 samples for 16000 sample rate.
- Values other than these may affect model performance!!
- speech_pad_ms: Final speech chunks are padded by speech_pad_ms each side
- """
-
- threshold: float = 0.5
- min_speech_duration_ms: int = 250
- max_speech_duration_s: float = float("inf")
- min_silence_duration_ms: int = 2000
- window_size_samples: int = 1024
- speech_pad_ms: int = 400
-
-
-def get_speech_timestamps(
- audio: np.ndarray,
- vad_options: Optional[VadOptions] = None,
- **kwargs,
-) -> List[dict]:
- """This method is used for splitting long audios into speech chunks using silero VAD.
-
- Args:
- audio: One dimensional float array.
- vad_options: Options for VAD processing.
- kwargs: VAD options passed as keyword arguments for backward compatibility.
-
- Returns:
- List of dicts containing begin and end samples of each speech chunk.
- """
- if vad_options is None:
- vad_options = VadOptions(**kwargs)
-
- threshold = vad_options.threshold
- min_speech_duration_ms = vad_options.min_speech_duration_ms
- max_speech_duration_s = vad_options.max_speech_duration_s
- min_silence_duration_ms = vad_options.min_silence_duration_ms
- window_size_samples = vad_options.window_size_samples
- speech_pad_ms = vad_options.speech_pad_ms
-
- if window_size_samples not in [512, 1024, 1536]:
- warnings.warn(
- "Unusual window_size_samples! Supported window_size_samples:\n"
- " - [512, 1024, 1536] for 16000 sampling_rate"
- )
-
- sampling_rate = 16000
- min_speech_samples = sampling_rate * min_speech_duration_ms / 1000
- speech_pad_samples = sampling_rate * speech_pad_ms / 1000
- max_speech_samples = (
- sampling_rate * max_speech_duration_s
- - window_size_samples
- - 2 * speech_pad_samples
- )
- min_silence_samples = sampling_rate * min_silence_duration_ms / 1000
- min_silence_samples_at_max_speech = sampling_rate * 98 / 1000
-
- audio_length_samples = len(audio)
-
- model = get_vad_model()
- state = model.get_initial_state(batch_size=1)
-
- speech_probs = []
- for current_start_sample in range(0, audio_length_samples, window_size_samples):
- chunk = audio[current_start_sample : current_start_sample + window_size_samples]
- if len(chunk) < window_size_samples:
- chunk = np.pad(chunk, (0, int(window_size_samples - len(chunk))))
- speech_prob, state = model(chunk, state, sampling_rate)
- speech_probs.append(speech_prob)
-
- triggered = False
- speeches = []
- current_speech = {}
- neg_threshold = threshold - 0.15
-
- # to save potential segment end (and tolerate some silence)
- temp_end = 0
- # to save potential segment limits in case of maximum segment size reached
- prev_end = next_start = 0
-
- for i, speech_prob in enumerate(speech_probs):
- if (speech_prob >= threshold) and temp_end:
- temp_end = 0
- if next_start < prev_end:
- next_start = window_size_samples * i
-
- if (speech_prob >= threshold) and not triggered:
- triggered = True
- current_speech["start"] = window_size_samples * i
- continue
-
- if (
- triggered
- and (window_size_samples * i) - current_speech["start"] > max_speech_samples
- ):
- if prev_end:
- current_speech["end"] = prev_end
- speeches.append(current_speech)
- current_speech = {}
- # previously reached silence (< neg_thres) and is still not speech (< thres)
- if next_start < prev_end:
- triggered = False
- else:
- current_speech["start"] = next_start
- prev_end = next_start = temp_end = 0
- else:
- current_speech["end"] = window_size_samples * i
- speeches.append(current_speech)
- current_speech = {}
- prev_end = next_start = temp_end = 0
- triggered = False
- continue
-
- if (speech_prob < neg_threshold) and triggered:
- if not temp_end:
- temp_end = window_size_samples * i
- # condition to avoid cutting in very short silence
- if (window_size_samples * i) - temp_end > min_silence_samples_at_max_speech:
- prev_end = temp_end
- if (window_size_samples * i) - temp_end < min_silence_samples:
- continue
- else:
- current_speech["end"] = temp_end
- if (
- current_speech["end"] - current_speech["start"]
- ) > min_speech_samples:
- speeches.append(current_speech)
- current_speech = {}
- prev_end = next_start = temp_end = 0
- triggered = False
- continue
-
- if (
- current_speech
- and (audio_length_samples - current_speech["start"]) > min_speech_samples
- ):
- current_speech["end"] = audio_length_samples
- speeches.append(current_speech)
-
- for i, speech in enumerate(speeches):
- if i == 0:
- speech["start"] = int(max(0, speech["start"] - speech_pad_samples))
- if i != len(speeches) - 1:
- silence_duration = speeches[i + 1]["start"] - speech["end"]
- if silence_duration < 2 * speech_pad_samples:
- speech["end"] += int(silence_duration // 2)
- speeches[i + 1]["start"] = int(
- max(0, speeches[i + 1]["start"] - silence_duration // 2)
- )
- else:
- speech["end"] = int(
- min(audio_length_samples, speech["end"] + speech_pad_samples)
- )
- speeches[i + 1]["start"] = int(
- max(0, speeches[i + 1]["start"] - speech_pad_samples)
- )
- else:
- speech["end"] = int(
- min(audio_length_samples, speech["end"] + speech_pad_samples)
- )
-
- return speeches
-
-
-def collect_chunks(audio: np.ndarray, chunks: List[dict]) -> np.ndarray:
- """Collects and concatenates audio chunks."""
- if not chunks:
- return np.array([], dtype=np.float32)
-
- return np.concatenate([audio[chunk["start"] : chunk["end"]] for chunk in chunks])
-
-
-class SpeechTimestampsMap:
- """Helper class to restore original speech timestamps."""
-
- def __init__(self, chunks: List[dict], sampling_rate: int, time_precision: int = 2):
- self.sampling_rate = sampling_rate
- self.time_precision = time_precision
- self.chunk_end_sample = []
- self.total_silence_before = []
-
- previous_end = 0
- silent_samples = 0
-
- for chunk in chunks:
- silent_samples += chunk["start"] - previous_end
- previous_end = chunk["end"]
-
- self.chunk_end_sample.append(chunk["end"] - silent_samples)
- self.total_silence_before.append(silent_samples / sampling_rate)
-
- def get_original_time(
- self,
- time: float,
- chunk_index: Optional[int] = None,
- ) -> float:
- if chunk_index is None:
- chunk_index = self.get_chunk_index(time)
-
- total_silence_before = self.total_silence_before[chunk_index]
- return round(total_silence_before + time, self.time_precision)
-
- def get_chunk_index(self, time: float) -> int:
- sample = int(time * self.sampling_rate)
- return min(
- bisect.bisect(self.chunk_end_sample, sample),
- len(self.chunk_end_sample) - 1,
- )
-
-
-@functools.lru_cache
-def get_vad_model():
- """Returns the VAD model instance."""
- abspath = os.path.abspath(__file__)
- my_dir = os.path.dirname(abspath)
- parent_dir = os.path.dirname(my_dir)
-
- path = os.path.join(parent_dir, "Models/silero_vad.onnx")
- return SileroVADModel(path)
-
-
-class SileroVADModel:
- def __init__(self, path):
- try:
- import onnxruntime
- except ImportError as e:
- raise RuntimeError(
- "Applying the VAD filter requires the onnxruntime package"
- ) from e
-
- opts = onnxruntime.SessionOptions()
- opts.inter_op_num_threads = 1
- opts.intra_op_num_threads = 1
- opts.log_severity_level = 4
-
- self.session = onnxruntime.InferenceSession(
- path,
- providers=["CPUExecutionProvider"],
- sess_options=opts,
- )
-
- def get_initial_state(self, batch_size: int):
- h = np.zeros((2, batch_size, 64), dtype=np.float32)
- c = np.zeros((2, batch_size, 64), dtype=np.float32)
- return h, c
-
- def __call__(self, x, state, sr: int):
- if len(x.shape) == 1:
- x = np.expand_dims(x, 0)
- if len(x.shape) > 2:
- raise ValueError(
- f"Too many dimensions for input audio chunk {len(x.shape)}"
- )
- if sr / x.shape[1] > 31.25:
- raise ValueError("Input audio chunk is too short")
-
- h, c = state
-
- ort_inputs = {
- "input": x,
- "h": h,
- "c": c,
- "sr": np.array(sr, dtype="int64"),
- }
-
- out, h, c = self.session.run(None, ort_inputs)
- state = (h, c)
-
- return out, state