From ee8213d1d2c2008d2d996929500c9e87dac325a3 Mon Sep 17 00:00:00 2001 From: yum Date: Sat, 17 Dec 2022 17:51:12 -0800 Subject: Finish python virtual env GUI can now download all TaSTT dependencies and install them into a virtual environment. * Add buttons to check embedded python version & install dependencies * Add class to wrap interacting with embedded Python * Put all TaSTT python scripts into a folder --- GUI/GUI/GUI/Frame.cpp | 116 +++- GUI/GUI/GUI/Frame.h | 11 +- GUI/GUI/GUI/GUI.vcxproj | 2 + GUI/GUI/GUI/GUI.vcxproj.filters | 14 +- GUI/GUI/GUI/PythonWrapper.cpp | 54 ++ GUI/GUI/GUI/PythonWrapper.h | 28 + GUI/package.ps1 | 1 + Python/python310._pth | 5 +- Scripts/emotes.py | 130 ++++ Scripts/generate_fonts.py | 147 +++++ Scripts/generate_params.py | 91 +++ Scripts/generate_utils.py | 130 ++++ Scripts/libtastt.py | 578 +++++++++++++++++ Scripts/libunity.py | 1356 +++++++++++++++++++++++++++++++++++++++ Scripts/obfuscate.py | 92 +++ Scripts/osc_ctrl.py | 355 ++++++++++ Scripts/steamvr.py | 73 +++ Scripts/string_matcher.py | 155 +++++ Scripts/transcribe.py | 353 ++++++++++ emotes.py | 130 ---- generate_fonts.py | 147 ----- generate_params.py | 91 --- generate_utils.py | 130 ---- libtastt.py | 578 ----------------- libunity.py | 1356 --------------------------------------- obfuscate.py | 92 --- osc_ctrl.py | 355 ---------- steamvr.py | 73 --- string_matcher.py | 155 ----- transcribe.py | 353 ---------- 30 files changed, 3672 insertions(+), 3479 deletions(-) create mode 100644 GUI/GUI/GUI/PythonWrapper.cpp create mode 100644 GUI/GUI/GUI/PythonWrapper.h create mode 100644 Scripts/emotes.py create mode 100644 Scripts/generate_fonts.py create mode 100644 Scripts/generate_params.py create mode 100644 Scripts/generate_utils.py create mode 100644 Scripts/libtastt.py create mode 100644 Scripts/libunity.py create mode 100644 Scripts/obfuscate.py create mode 100644 Scripts/osc_ctrl.py create mode 100644 Scripts/steamvr.py create mode 100644 Scripts/string_matcher.py create mode 100644 Scripts/transcribe.py delete mode 100644 emotes.py delete mode 100644 generate_fonts.py delete mode 100644 generate_params.py delete mode 100644 generate_utils.py delete mode 100644 libtastt.py delete mode 100644 libunity.py delete mode 100644 obfuscate.py delete mode 100644 osc_ctrl.py delete mode 100644 steamvr.py delete mode 100644 string_matcher.py delete mode 100644 transcribe.py diff --git a/GUI/GUI/GUI/Frame.cpp b/GUI/GUI/GUI/Frame.cpp index 841bfb9..74683ca 100644 --- a/GUI/GUI/GUI/Frame.cpp +++ b/GUI/GUI/GUI/Frame.cpp @@ -1,23 +1,44 @@ #include "Frame.h" +#include "PythonWrapper.h" #include +#include +#include + +namespace { + enum FrameIds { + ID_PY_PANEL, + ID_PY_VERSION_BUTTON, + ID_PY_SETUP_BUTTON, + ID_PY_OUT, + }; +}; Frame::Frame() - : wxFrame(nullptr, wxID_ANY, "TaSTT") + : wxFrame(nullptr, wxID_ANY, "TaSTT"), + py_panel_(this, ID_PY_PANEL), + py_panel_sizer_(wxVERTICAL), + py_version_button_(&py_panel_, ID_PY_VERSION_BUTTON, "Check embedded Python version"), + py_setup_button_(&py_panel_, ID_PY_SETUP_BUTTON, "Set up Python virtual environment"), + py_out_(&py_panel_, ID_PY_OUT, wxEmptyString, wxDefaultPosition, + wxSize(/*x_px=*/480, /*y_px=*/160), wxTE_MULTILINE) { - Bind(wxEVT_MENU, &Frame::OnExit, this, wxID_EXIT); + Bind(wxEVT_MENU, &Frame::OnExit, this, wxID_EXIT); + Bind(wxEVT_BUTTON, &Frame::OnGetPythonVersion, this, ID_PY_VERSION_BUTTON); + Bind(wxEVT_BUTTON, &Frame::OnSetupPython, this, ID_PY_SETUP_BUTTON); - // wx needs this to be able to load PNGs. - wxImage::AddHandler(&png_handler_); + // wx needs this to be able to load PNGs. + wxImage::AddHandler(&png_handler_); + const std::string icon_path = "Resources/logo.png"; + LoadAndSetIcon(icon_path); - const std::string logo_path = "Resources/logo.png"; - if (!std::filesystem::exists(logo_path)) { - wxLogFatalError("Logo is missing from %s", logo_path.c_str()); - } - wxBitmap icon_img("Resources/logo.png", wxBITMAP_TYPE_PNG); - wxIcon icon; - icon.CopyFromBitmap(icon_img); - SetIcon(icon); + wxSize py_out_size(/*x=*/80, /*y=*/20); + py_out_.SetSize(py_out_size); + + py_panel_.SetSizer(&py_panel_sizer_); + py_panel_sizer_.Add(&py_version_button_); + py_panel_sizer_.Add(&py_setup_button_); + py_panel_sizer_.Add(&py_out_); } void Frame::OnExit(wxCommandEvent& event) @@ -25,3 +46,74 @@ void Frame::OnExit(wxCommandEvent& event) Close(true); } +void Frame::OnGetPythonVersion(wxCommandEvent& event) +{ + PythonWrapper py; + std::string py_version = py.GetVersion(); + py_out_.AppendText(py_version + "\n"); +} + +void Frame::OnSetupPython(wxCommandEvent& event) +{ + PythonWrapper py; + + py_out_.AppendText("Setting up Python virtual environment\n"); + py_out_.AppendText("This could take several minutes, please be patient!\n"); + py_out_.AppendText("This will download ~5GB of dependencies.\n"); + py_out_.AppendText("Dependencies are installed in the executable folder, " + "so deleting the folder is all that's needed to undo this."); + + { + std::string py_out; + std::ostringstream py_out_oss; + py_out_oss << "Installing pip" << std::endl; + py_out_.AppendText(py_out_oss.str()); + if (!py.InstallPip(&py_out)) { + std::ostringstream py_out_oss; + py_out_oss << "Failed to install pip: " << py_out; + py_out_.AppendText(py_out_oss.str()); + } + } + + const std::vector pip_deps{ + "pillow", + "pydub", + "pyaudio", + "playsound==1.2.2", + "torch --extra-index-url https://download.pytorch.org/whl/cu116", + "git+https://github.com/openai/whisper.git", + "openvr", + "editdistance", + "pydub", + "python-osc", + }; + + for (const auto& pip_dep : pip_deps) { + { + std::ostringstream py_out_oss; + py_out_oss << "Installing " << pip_dep << std::endl; + py_out_.AppendText(py_out_oss.str()); + } + std::string py_out; + bool res = py.InvokeWithArgs({ "-m", "pip", "install", pip_dep }, &py_out); + if (!res) { + std::ostringstream py_out_oss; + py_out_oss << "Failed to install " << pip_dep << ": " << py_out << std::endl; + py_out_.AppendText(py_out_oss.str()); + return; + } + } + + py_out_.AppendText("Python virtual environment successfully set up!\n"); +} + +void Frame::LoadAndSetIcon(const std::string& icon_path) { + if (!std::filesystem::exists(icon_path)) { + wxLogFatalError("Logo is missing from %s", icon_path.c_str()); + } + wxBitmap icon_img(icon_path, wxBITMAP_TYPE_PNG); + wxIcon icon; + icon.CopyFromBitmap(icon_img); + SetIcon(icon); +} + diff --git a/GUI/GUI/GUI/Frame.h b/GUI/GUI/GUI/Frame.h index 4dcfd4a..62e9169 100644 --- a/GUI/GUI/GUI/Frame.h +++ b/GUI/GUI/GUI/Frame.h @@ -13,8 +13,15 @@ public: private: wxPNGHandler png_handler_; + wxPanel py_panel_; + wxBoxSizer py_panel_sizer_; + wxButton py_version_button_; + wxButton py_setup_button_; + wxTextCtrl py_out_; - void OnHello(wxCommandEvent& event); void OnExit(wxCommandEvent& event); - void OnAbout(wxCommandEvent& event); + void OnGetPythonVersion(wxCommandEvent& event); + void OnSetupPython(wxCommandEvent& event); + + void LoadAndSetIcon(const std::string& icon_path); }; diff --git a/GUI/GUI/GUI/GUI.vcxproj b/GUI/GUI/GUI/GUI.vcxproj index 79bb220..223f47e 100644 --- a/GUI/GUI/GUI/GUI.vcxproj +++ b/GUI/GUI/GUI/GUI.vcxproj @@ -138,10 +138,12 @@ + + diff --git a/GUI/GUI/GUI/GUI.vcxproj.filters b/GUI/GUI/GUI/GUI.vcxproj.filters index c332693..74e4659 100644 --- a/GUI/GUI/GUI/GUI.vcxproj.filters +++ b/GUI/GUI/GUI/GUI.vcxproj.filters @@ -24,16 +24,22 @@ Source Files + + Source Files + - - Source Files + + Header Files - Source Files + Header Files - Source Files + Header Files + + + Header Files \ No newline at end of file diff --git a/GUI/GUI/GUI/PythonWrapper.cpp b/GUI/GUI/GUI/PythonWrapper.cpp new file mode 100644 index 0000000..27d12fd --- /dev/null +++ b/GUI/GUI/GUI/PythonWrapper.cpp @@ -0,0 +1,54 @@ +#include "PythonWrapper.h" + +#include + +#include + +bool PythonWrapper::InvokeWithArgs(std::vector&& args, std::string* out) { + std::ostringstream cmd_oss; + cmd_oss << "Resources/Python/python.exe"; + for (const auto& arg : args) { + cmd_oss << " " << arg; + } + + wxArrayString cmd_output_ary; + long result = wxExecute(cmd_oss.str(), cmd_output_ary); + std::ostringstream cmd_out_oss; + for (const auto& line : cmd_output_ary) { + if (!cmd_out_oss.str().empty()) { + cmd_out_oss << std::endl; + } + cmd_out_oss << line; + } + if (result == -1) { + std::ostringstream err_oss; + err_oss << "Error while executing python command \"" << cmd_oss.str() << "\": Failed to launch process"; + *out = err_oss.str(); + return false; + } else if (result) { + std::ostringstream err_oss; + err_oss << "Error while executing python command \"" << cmd_oss.str() << "\": Process returned " << result << ": " << cmd_out_oss.str(); + *out = err_oss.str(); + return false; + } + + *out = cmd_out_oss.str(); + return true; +} + + +std::string PythonWrapper::GetVersion() { + std::string result; + bool ok = InvokeWithArgs({ "--version" }, &result); + if (!ok) { + wxLogFatalError("Failed to get python version: %s", result.c_str()); + } + return result; +} + +bool PythonWrapper::InstallPip(std::string* out) { + std::string result; + + std::string pip_path = "Resources/Python/get-pip.py"; + return InvokeWithArgs({ pip_path }, out); +} diff --git a/GUI/GUI/GUI/PythonWrapper.h b/GUI/GUI/GUI/PythonWrapper.h new file mode 100644 index 0000000..607507d --- /dev/null +++ b/GUI/GUI/GUI/PythonWrapper.h @@ -0,0 +1,28 @@ +#pragma once + +#include + +#ifndef WX_PRECOMP +#include +#endif + +#include +#include + +/* + * This class wraps interactions with the embedded Python interpreter. +*/ +class PythonWrapper +{ +public: + // Invoke the interpreter with arguments. + // On error, sets `out` to an error message and returns false. + bool InvokeWithArgs(std::vector&& args, std::string* out); + + // Execute python --version. + std::string GetVersion(); + + // Execute get-pip.py. + bool InstallPip(std::string* out); +}; + diff --git a/GUI/package.ps1 b/GUI/package.ps1 index 4b25f3f..0c2cec0 100644 --- a/GUI/package.ps1 +++ b/GUI/package.ps1 @@ -8,5 +8,6 @@ mkdir $install_dir > $null mkdir $install_dir/Resources > $null cp ../Images/logo.png TaSTT/Resources cp -Recurse ../Python TaSTT/Resources/Python +cp -Recurse ../Scripts TaSTT/Resources/Scripts cp GUI/x64/Release/GUI.exe TaSTT/TaSTT.exe diff --git a/Python/python310._pth b/Python/python310._pth index 2676d09..91fe233 100644 --- a/Python/python310._pth +++ b/Python/python310._pth @@ -4,7 +4,10 @@ python310.zip # Uncomment to run site.main() automatically import site +# TaSTT Python scripts +../Scripts + Lib Lib/site-packages -Scripts Dependencies/future-0.18.2 + diff --git a/Scripts/emotes.py b/Scripts/emotes.py new file mode 100644 index 0000000..b922fdf --- /dev/null +++ b/Scripts/emotes.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python3 + +import argparse +from math import floor +import os +# python3 -m pip install pillow +from PIL import Image +import sys + +# (row, col) +TEX_SZ = (2048, 2048) + +IMG_SZ_PX = 256 +IMG_PER_ROW = int(TEX_SZ[0] / IMG_SZ_PX) +IMG_PER_COL = int(TEX_SZ[1] / IMG_SZ_PX) + +# TODO(yum) this should live in a config file. +# Note: the name of the emote must be no longer than 6 characters. +IMG_TEX_DATA = [] +IMG_TEX_DATA.append(("Images/Emotes/xdd.png", "xdd")) +IMG_TEX_DATA.append(("Images/Emotes/pog.png", "pog")) +IMG_TEX_DATA.append(("Images/Emotes/lulw.png", "laugh")) +IMG_TEX_DATA.append(("Images/Emotes/bighardo.png", "hard")) +IMG_TEX_DATA.append(("Images/Emotes/peepoHappy.png", "happy")) +IMG_TEX_DATA.append(("Images/Emotes/peepoSad.png", "sad")) +IMG_TEX_DATA.append(("Images/Emotes/bedge.png", "bed")) +IMG_TEX_DATA.append(("Images/Emotes/reallymad.png", "mad")) +IMG_TEX_DATA.append(("Images/Emotes/clueless.png", "surely")) +IMG_TEX_DATA.append(("Images/Emotes/what.png", "what")) +IMG_TEX_DATA.append(("Images/Emotes/based.png", "based")) +IMG_TEX_DATA.append(("Images/Emotes/chad.png", "chad")) +IMG_TEX_DATA.append(("Images/Emotes/aware.png", "aware")) +IMG_TEX_DATA.append(("Images/Emotes/girl.png", "girl")) + +IMG_TEX_KEYWORD_TO_COORD = {} +for i in range(0, len(IMG_TEX_DATA)): + IMG_TEX_KEYWORD_TO_COORD[IMG_TEX_DATA[i][1]] = i + +# We treat images like words. To keep things simple, they're the same height as +# a word, and they're a fixed width. +IMG_SZ_LETTER_ROWS = 1 +IMG_SZ_LETTER_COLS = 6 + +def lookup(word: str): + word = word.lower() + word = ''.join(c for c in word.lower() if c.isalpha()) + if word in IMG_TEX_KEYWORD_TO_COORD.keys(): + return word, IMG_TEX_KEYWORD_TO_COORD[word] + return None, None + +def openTexture(tex_path: str): + if not os.path.exists(args.texture_path): + return Image.new("RGB", TEX_SZ) + tex = Image.open(args.texture_path) + if tex.size != TEX_SZ: + print("Texture at {} has mismatching size {}, creating new texture".format( + tex_path, tex.size), file=sys.stderr) + return Image.new("RGB", TEX_SZ) + return tex + +# Add an image to the texture at the coordinates (x, y). x and y should be in +# the range [0, IMG_PER_COL) and [0, IMG_PER_ROW) respectively. +def addImageToTexture(tex: Image, img_path: str, x: int, y:int): + # Transparent images will be composited on top of a black background. + img = Image.open(img_path).convert('RGBA') + img_bg = Image.new('RGBA', img.size, (0, 0, 0)) + img = Image.alpha_composite(img_bg, img).convert('RGB') + + max_px = IMG_SZ_PX + + # Scale the image up so it uses as much space as is given to it. + # I originally planned to support multiple scales, but this proved to be + # too much work - getting line wrapping to work with this would be a pain. + # So for now, all images are the same height as words. + scale = 1 + img_x, img_y = img.size + max_dim = max(img_x, img_y) + img_scale = (max_px / max_dim) * scale + new_sz = (int(floor(img.size[0] * img_scale)), + int(floor(img.size[1] * img_scale))) + print("Add image {}".format(img_path)) + print(" Original size: {}".format(img.size)) + print(" Scaled size: {}".format(new_sz)) + img = img.resize(new_sz) + + # Center the image within its new coordinate space. + padded_img_sz = (IMG_SZ_PX * scale, IMG_SZ_PX * scale) + padded_img = Image.new("RGB", padded_img_sz) + centered_x = int(floor((padded_img_sz[0] - new_sz[0]) / 2)) + centered_y = int(floor((padded_img_sz[1] - new_sz[1]) / 2)) + padded_img.paste(img, box=(centered_x, centered_y)) + img = padded_img + + # Break the image into tiles and write them into the texture. + for slot in range(0, scale * scale): + tile_x = slot % scale + tile_y = int(floor(slot / scale)) + tile_bbox = (tile_x * IMG_SZ_PX, tile_y * IMG_SZ_PX, (tile_x + 1) * IMG_SZ_PX, (tile_y + 1) * IMG_SZ_PX) + tile = img.crop(tile_bbox) + print(" tile {},{} (bbox={})".format(tile_x, tile_y, tile_bbox)) + + slot_x = x + slot % IMG_PER_ROW + slot_y = y + int(floor(slot / IMG_PER_ROW)) + slot_x_px = slot_x * IMG_SZ_PX + slot_y_px = slot_y * IMG_SZ_PX + print(" Add img at {},{} (px {},{})".format(slot_x, slot_y, slot_x_px, slot_y_px)) + + tex.paste(tile, box=(slot_x_px, slot_y_px)) + +def parseArgs(): + parser = argparse.ArgumentParser() + parser.add_argument("--texture_path", type=str, help="Path to save the generated texture.") + args = parser.parse_args() + + if not args.texture_path: + args.texture_path = "img_texture.png" + + return args + +if __name__ == "__main__": + args = parseArgs() + + tex = openTexture(args.texture_path) + for i in range(0, len(IMG_TEX_DATA)): + filename = IMG_TEX_DATA[i][0] + x = i % IMG_PER_ROW + y = int(floor(i / IMG_PER_ROW)) + addImageToTexture(tex, filename, x, y) + tex.save(args.texture_path) + diff --git a/Scripts/generate_fonts.py b/Scripts/generate_fonts.py new file mode 100644 index 0000000..ef5bfc5 --- /dev/null +++ b/Scripts/generate_fonts.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python3 + +# python3 -m pip install pillow +# License: HPND license. +from PIL import Image, ImageFont, ImageDraw + +import math + +# Use a power of 2 pixels per character so we can evenly divide the plane. +font_pixels = 128 +full_ratio = 0.75 +half_ratio = 0.5 +full_sz = int(font_pixels * full_ratio) +half_sz = int(font_pixels * half_ratio) +layout_engine = ImageFont.Layout.BASIC + +unifont = ImageFont.truetype("Fonts/unifont-15.0.01.ttf", full_sz, layout_engine=layout_engine) +unifont_half = ImageFont.truetype("Fonts/unifont-15.0.01.ttf", half_sz, layout_engine=layout_engine) + +noto_sans_mono = ImageFont.truetype("Fonts/Noto_Sans_Mono/NotoSansMono-VariableFont_wdth,wght.ttf", full_sz, layout_engine=layout_engine) + +noto_sans_sc_half = ImageFont.truetype("Fonts/Noto_Sans_Simplified_Chinese/NotoSansSC-Regular.otf", half_sz, layout_engine=layout_engine) + +noto_sans_kr_half = ImageFont.truetype("Fonts/Noto_Sans_Korean/NotoSansKR-Regular.otf", half_sz, layout_engine=layout_engine) + +n_rows = 64 +n_cols = 128 + +class FontInfo: + def __init__(self, font, dy): + self.font = font + self.dy = dy + +def allow_range(allowlist, lo_hi, font = None, dy = 0): + for i in range(lo_hi[0], lo_hi[1] + 1): + allowlist[i] = FontInfo(font, dy) +def ban_range(allowlist, lo, hi): + for i in range(lo, hi + 1): + del allowlist[i] +allowlist = {} +# ASCII +basic_latin = (32, 126) +allow_range(allowlist, basic_latin, font=noto_sans_mono, dy = -20) +# Latin-1 supplement +latin_1_supplement = (0x00A1, 0x00ff) +allow_range(allowlist, latin_1_supplement, font = noto_sans_mono) +# Latin extended-A +latin_extended_a = (0x0100, 0x017f) +allow_range(allowlist, latin_extended_a, font = noto_sans_mono) +# Latin extended-B +latin_extended_b = (0x0180, 0x024f) +allow_range(allowlist, latin_extended_b, font = noto_sans_mono) +# Spacing modifier letters +ipa_extensions = (0x0250, 0x02af) +allow_range(allowlist, ipa_extensions, font = unifont) +# Greek and Coptic +greek = (0x0370, 0x03ff) +allow_range(allowlist, greek, font = noto_sans_mono) +ban_range(allowlist, 0x0378, 0x03a2) +# Cyrillic +cyrillic = (0x0400, 0x04ff) +allow_range(allowlist, cyrillic, font = unifont) +# Currency symbols +currency_symbols = (0x20a0, 0x20c0) +allow_range(allowlist, currency_symbols, font = noto_sans_mono) + +# CJK +# +hangul_jamo = (0x1100, 0x11FF) +allow_range(allowlist, hangul_jamo, font = noto_sans_kr_half) +# +general_punctuation = (0x2000, 0x206f) +allow_range(allowlist, general_punctuation, font = noto_sans_mono) +# +kangxi_radicals = (0x2f00, 0x2fdf) +allow_range(allowlist, kangxi_radicals, font = noto_sans_sc_half) +# +cjk_symbols_and_punctuation = (0x3000, 0x303f) +allow_range(allowlist, cjk_symbols_and_punctuation, font = noto_sans_sc_half) +# +hiragana = (0x3041, 0x309f) +allow_range(allowlist, hiragana, font = noto_sans_sc_half) +ban_range(allowlist, 0x3097, 0x3098) +# +katakana = (0x30a0, 0x30ff) +allow_range(allowlist, katakana, font = noto_sans_sc_half) +# +hangul_compatibility_jamo = (0x3130, 0x318f) +allow_range(allowlist, hangul_compatibility_jamo, font = noto_sans_sc_half) +# +enclosed_cjk_letters_and_months = (0x3200, 0x32FF) +allow_range(allowlist, enclosed_cjk_letters_and_months, font = noto_sans_sc_half) +# +cjk_compatibility = (0x3300, 0x33ff) +allow_range(allowlist, cjk_compatibility, font = noto_sans_sc_half) +# +cjk_unified_extension_a = (0x3400, 0x4dbf) +allow_range(allowlist, cjk_unified_extension_a, font = noto_sans_sc_half) +# +cjk_ideographs = (0x4e00, 0x9fff) +allow_range(allowlist, cjk_ideographs, font = noto_sans_sc_half) +# +hangul_syllables = (0xAC00, 0xD7A3) +allow_range(allowlist, hangul_syllables, font = noto_sans_kr_half) +# +halfwidth_and_fullwidth = (0xff00, 0xffef) +allow_range(allowlist, halfwidth_and_fullwidth, font = noto_sans_sc_half) + +def in_range(x, range_pair) -> bool: + return x >= range_pair[0] and x <= range_pair[1] + +max_char = max(allowlist) +print("max char: {}".format(max_char)) +print("num chars: {}".format(len(allowlist))) +total_rows = math.ceil(max_char / n_cols) +print("total rows {}".format(total_rows)) +total_textures = math.ceil(total_rows / n_rows) +print("total textures {}".format(total_textures)) + +for nth_texture in range(0, total_textures): + # Create an 8K grayscale ("L") or black and white ("1") image + image = Image.new(mode="1", size=(8192,8192), color=0) + draw = ImageDraw.Draw(image) + + row_begin = nth_texture * n_rows + + for row in range(row_begin, row_begin + n_rows): + line = "" + for col in range(0, n_cols): + # Generate the unicode character for this spot. + n = row * n_cols + col + char = None + font_info = None + if n in allowlist.keys(): + char = chr(n) + font_info = allowlist[n] + else: + char = " " + font_info = FontInfo(unifont, 0) + # Hack: Chinese, Japanese, and Korean characters are all double + # width and are all on textures [1,6]. To fit them in the same + # grid, we use a half-size font. + draw.text((col * font_pixels / 2, (row - row_begin) * font_pixels + + font_info.dy), char, font=font_info.font, fill=255) + + image.save("Fonts/Bitmaps/font-%01d.png" % nth_texture) + diff --git a/Scripts/generate_params.py b/Scripts/generate_params.py new file mode 100644 index 0000000..323502c --- /dev/null +++ b/Scripts/generate_params.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python3 + +import generate_utils + +PARAM_HEADER = """ +%YAML 1.1 +%TAG !u! tag:unity3d.com,2011: +--- !u!114 &11400000 +MonoBehaviour: + m_ObjectHideFlags: 0 + m_CorrespondingSourceObject: {fileID: 0} + m_PrefabInstance: {fileID: 0} + m_PrefabAsset: {fileID: 0} + m_GameObject: {fileID: 0} + m_Enabled: 1 + m_EditorHideFlags: 0 + m_Script: {fileID: -1506855854, guid: 67cc4cb7839cd3741b63733d5adf0442, type: 3} + m_Name: TaSTT_params + m_EditorClassIdentifier: + parameters: +"""[1:][0:-1] + +INT_PARAM = """ + - name: %PARAM_NAME% + valueType: 0 + saved: 0 + defaultValue: 0 +"""[1:][0:-1] + +BOOL_PARAM = """ + - name: %PARAM_NAME% + valueType: 2 + saved: %SAVED% + defaultValue: 0 +"""[1:][0:-1] + +FLOAT_PARAM = """ + - name: %PARAM_NAME% + valueType: 1 + saved: 0 + defaultValue: %DEFAULT_FLOAT% +"""[1:][0:-1] + +# We're working with an 84-character board, and each FX layer is responsible +# for 8 of those characters. +params = {} +params["SAVED"] = "0" +params["DEFAULT_FLOAT"] = "0" +print(generate_utils.replaceMacros(PARAM_HEADER, params)) + +params["PARAM_NAME"] = generate_utils.getDummyParam() +print(generate_utils.replaceMacros(BOOL_PARAM, params)) + +params["PARAM_NAME"] = generate_utils.getEnableParam() +print(generate_utils.replaceMacros(BOOL_PARAM, params)) + +params["PARAM_NAME"] = generate_utils.getIndicator0Param() +print(generate_utils.replaceMacros(BOOL_PARAM, params)) + +params["PARAM_NAME"] = generate_utils.getIndicator1Param() +print(generate_utils.replaceMacros(BOOL_PARAM, params)) + +params["PARAM_NAME"] = generate_utils.getScaleParam() +params["DEFAULT_FLOAT"] = "0.2" +print(generate_utils.replaceMacros(FLOAT_PARAM, params)) +params["DEFAULT_FLOAT"] = "0" + +params["PARAM_NAME"] = generate_utils.getToggleParam() +print(generate_utils.replaceMacros(BOOL_PARAM, params)) + +params["PARAM_NAME"] = generate_utils.getSpeechNoiseToggleParam() +print(generate_utils.replaceMacros(BOOL_PARAM, params)) + +params["PARAM_NAME"] = generate_utils.getSpeechNoiseEnableParam() +params["SAVED"] = "1" +print(generate_utils.replaceMacros(BOOL_PARAM, params)) +params["SAVED"] = "0" + +params["PARAM_NAME"] = generate_utils.getLockWorldParam() +print(generate_utils.replaceMacros(BOOL_PARAM, params)) + +params["PARAM_NAME"] = generate_utils.getClearBoardParam() +print(generate_utils.replaceMacros(BOOL_PARAM, params)) + +params["PARAM_NAME"] = generate_utils.getSelectParam() +print(generate_utils.replaceMacros(INT_PARAM, params)) + +for byte in range(0, generate_utils.BYTES_PER_CHAR): + for i in range(0, generate_utils.NUM_LAYERS): + params["PARAM_NAME"] = generate_utils.getBlendParam(i, byte) + print(generate_utils.replaceMacros(FLOAT_PARAM, params)) diff --git a/Scripts/generate_utils.py b/Scripts/generate_utils.py new file mode 100644 index 0000000..e8fcc8b --- /dev/null +++ b/Scripts/generate_utils.py @@ -0,0 +1,130 @@ +from math import ceil +from math import floor + +def replaceMacros(lines, macro_defs): + for k,v in macro_defs.items(): + lines = lines.replace("%" + k + "%", v) + return lines + +# Note, (BOARD_ROWS * BOARD_COLS % NUM_LAYERS) must equal 0. If not, writing to +# the last cell will (with the current implementation) wrap around to the front +# of the board. +BOARD_ROWS=4 +BOARD_COLS=48 +NUM_REGIONS = 24 +CHARS_PER_CELL=256 +BYTES_PER_CHAR=2 + +NUM_LAYERS=ceil((BOARD_ROWS * BOARD_COLS) / NUM_REGIONS) + +# Implementation detail. We use this parameter to return from the terminal +# state of the FX layer to the starting state. +def getDummyParam(): + return "TaSTT_Dummy" + +def getHipToggleParam(): + return "TaSTT_Hip_Toggle" + +def getHandToggleParam(): + return "TaSTT_Hand_Toggle" + +def getToggleParam(): + return "TaSTT_Toggle" + +def getScaleParam(): + return "TaSTT_Scale" + +# When this is set to true, the board will emit a soft beep sound. It's used to +# grab attention when speaking. +def getSpeechNoiseToggleParam(): + return "TaSTT_Speech_Noise_Toggle" + +# This is used to disable speaking noises. +def getSpeechNoiseEnableParam(): + return "TaSTT_Speech_Noise_Enable" + +# When this is set to true, the board clears. +def getClearBoardParam(): + return "TaSTT_Clear_Board" + +def getLockWorldParam(): + return "TaSTT_Lock_World" + +# Each layer controls a group of cells. There's only one letter per layer, thus +# this is also the name of the parameter which sets the letter for a layer. +def getLayerParam(which_layer: int, byte: int) -> str: + return "TaSTT_L%02dB%01d" % (which_layer, byte) + +def getLayerName(which_layer: int, byte: int) -> str: + return getLayerParam(which_layer, byte) + +def getBlendParam(which_layer: int, byte: int) -> str: + return "TaSTT_L%02dB%01d_Blend" % (which_layer, byte) + +def getDefaultStateName(which_layer:int , byte: int): + return "TaSTT_L%02dB%01d_Do_Nothing" % (which_layer, byte) + +def getActiveStateName(which_layer: int, byte: int): + return "TaSTT_L%02dB%01d_Active" % (which_layer, byte) + +def getSelectStateName(which_layer, select): + return "TaSTT_L%02d_S%02d_B%01d" % (which_layer, select, byte) + +def getBlendStateName(which_layer, select, byte): + return "TaSTT_L%02d_S%02d_B%01d_Blend" % (which_layer, select, byte) + +def getLetterStateName(which_layer, select, letter, byte): + return "TaSTT_L%02d_S%02d_L%03d_B%01d" % (which_layer, select, letter, byte) + +def getSelectParam() -> str: + return "TaSTT_Select" + +def getEnableParam(): + return "TaSTT_Enable" + +def getIndicator0Param(): + return "TaSTT_Indicator_0" + +def getIndicator1Param(): + return "TaSTT_Indicator_1" + +def getBoardIndex(which_layer, select): + # Because we divide the board into a multiple of 8 cells, some cells may + # describe animations which don't exist, depending on the size of the board. + # We work around this by simply wrapping those animations back to the top + # of the board, and rely on the OSC controller to simply not reference + # those cells. + return (select * NUM_LAYERS + which_layer) % (BOARD_ROWS * BOARD_COLS) + +def getShaderParamByRowColByte(row, col, byte): + return "_Letter_Row%02d_Col%02d_Byte%01d" % (row, col, byte) + +# Mapping from layer to shader param. +def getShaderParam(which_layer, select, byte): + index = getBoardIndex(which_layer, select) + + col = index % BOARD_COLS + row = floor(index / BOARD_COLS) + + return getShaderParamByRowCol(row, col, byte) + +# The name of the animation which writes `letter` at a specific position in the +# display. +def getLetterAnimationName(row, col, letter, nth_byte): + return "R%02dC%02dL%02dB%01d" % (row, col, letter, nth_byte) + +# The name of the animation which clears the entire board. +def getClearAnimationName(): + return "TaSTT_Clear_Board" + +def getAnimationNameByLayerAndIndex(which_layer, select, letter, nth_byte): + index = getBoardIndex(which_layer, select) + + col = index % BOARD_COLS + row = floor(index / BOARD_COLS) + + return "R%02dC%02dL%02dB%01d" % (row, col, letter, nth_byte) + +# Returns the path to the animation for the given shader parameter + letter. +def getAnimationPath(shader_param, letter): + return "generated/animations/%s_Letter%02d.anim" % (shader_param, letter) diff --git a/Scripts/libtastt.py b/Scripts/libtastt.py new file mode 100644 index 0000000..bee535f --- /dev/null +++ b/Scripts/libtastt.py @@ -0,0 +1,578 @@ +#!/usr/bin/env python3 + +import argparse +import generate_utils +import libunity +import os +import pickle +import sys +import typing + +# TODO(yum) we're getting the encoding scheme from here, but I think it should +# be in a different layer. +import osc_ctrl + +LETTER_ANIMATION_TEMPLATE = """ +%YAML 1.1 +%TAG !u! tag:unity3d.com,2011: +--- !u!74 &7400000 +AnimationClip: + m_ObjectHideFlags: 0 + m_CorrespondingSourceObject: {fileID: 0} + m_PrefabInstance: {fileID: 0} + m_PrefabAsset: {fileID: 0} + m_Name: REPLACEME_ANIMATION_NAME + serializedVersion: 6 + m_Legacy: 0 + m_Compressed: 0 + m_UseHighQualityCurve: 1 + m_RotationCurves: [] + m_CompressedRotationCurves: [] + m_EulerCurves: [] + m_PositionCurves: [] + m_ScaleCurves: [] + m_FloatCurves: + - curve: + serializedVersion: 2 + m_Curve: + - serializedVersion: 3 + time: 0 + value: REPLACEME_LETTER_VALUE + inSlope: 0 + outSlope: 0 + tangentMode: 136 + weightedMode: 0 + inWeight: 0 + outWeight: 0 + - serializedVersion: 3 + time: 0.016666668 + value: REPLACEME_LETTER_VALUE + inSlope: 0 + outSlope: 0 + tangentMode: 136 + weightedMode: 0 + inWeight: 0 + outWeight: 0 + m_PreInfinity: 2 + m_PostInfinity: 2 + m_RotationOrder: 4 + attribute: material.REPLACEME_LETTER_PARAM + path: TaSTT + classID: 137 + script: {fileID: 0} + m_PPtrCurves: [] + m_SampleRate: 60 + m_WrapMode: 0 + m_Bounds: + m_Center: {x: 0, y: 0, z: 0} + m_Extent: {x: 0, y: 0, z: 0} + m_ClipBindingConstant: + genericBindings: + - serializedVersion: 2 + path: 2794480623 + attribute: 2284639795 + script: {fileID: 0} + typeID: 137 + customType: 22 + isPPtrCurve: 0 + pptrCurveMapping: [] + m_AnimationClipSettings: + serializedVersion: 2 + m_AdditiveReferencePoseClip: {fileID: 0} + m_AdditiveReferencePoseTime: 0 + m_StartTime: 0 + m_StopTime: 0 + m_OrientationOffsetY: 0 + m_Level: 0 + m_CycleOffset: 0 + m_HasAdditiveReferencePose: 0 + m_LoopTime: 1 + m_LoopBlend: 0 + m_LoopBlendOrientation: 0 + m_LoopBlendPositionY: 0 + m_LoopBlendPositionXZ: 0 + m_KeepOriginalOrientation: 0 + m_KeepOriginalPositionY: 1 + m_KeepOriginalPositionXZ: 0 + m_HeightFromFeet: 0 + m_Mirror: 0 + m_EditorCurves: + - curve: + serializedVersion: 2 + m_Curve: + - serializedVersion: 3 + time: 0 + value: REPLACEME_LETTER_VALUE + inSlope: 0 + outSlope: 0 + tangentMode: 136 + weightedMode: 0 + inWeight: 0 + outWeight: 0 + - serializedVersion: 3 + time: 0.016666668 + value: REPLACEME_LETTER_VALUE + inSlope: 0 + outSlope: 0 + tangentMode: 136 + weightedMode: 0 + inWeight: 0 + outWeight: 0 + m_PreInfinity: 2 + m_PostInfinity: 2 + m_RotationOrder: 4 + attribute: material.REPLACEME_LETTER_PARAM + path: TaSTT + classID: 137 + script: {fileID: 0} + m_EulerEditorCurves: [] + m_HasGenericRootTransform: 0 + m_HasMotionFloatCurves: 0 + m_Events: [] +""" + +ANIMATOR_TEMPLATE = """ +--- !u!91 &9100000 +AnimatorController: + m_ObjectHideFlags: 0 + m_CorrespondingSourceObject: {fileID: 0} + m_PrefabInstance: {fileID: 0} + m_PrefabAsset: {fileID: 0} + m_Name: TaSTT_fx + serializedVersion: 5 + m_AnimatorParameters: [] + m_AnimatorLayers: [] +""" + +# For whatever reason, running unrelated animations s.a. +# facial expressions can have a slight effect on supposedly +# unrelated parameters, causing letter to flip. Add a +# little buffer to reduce the odds that this effect causes +# a letter to change after it has been written. +UNITY_ANIMATION_FUDGE_MARGIN = 0.1 + +def generateClearAnimation(anim_dir, guid_map): + print("Generating board clearing animation", file=sys.stderr) + + parser = libunity.UnityParser() + parser.parse(LETTER_ANIMATION_TEMPLATE) + + anim_node = parser.nodes[0] + anim_clip = anim_node.mapping['AnimationClip'] + curve_template = anim_clip.mapping['m_FloatCurves'].sequence[0] + anim_clip.mapping['m_FloatCurves'].sequence = [] + anim_clip.mapping['m_EditorCurves'].sequence = [] + + letter = 0 + + for byte in range(0, generate_utils.BYTES_PER_CHAR): + for row in range(0, generate_utils.BOARD_ROWS): + for col in range(0, generate_utils.BOARD_COLS): + curve = curve_template.copy() + for keyframe in curve.mapping['curve'].mapping['m_Curve'].sequence: + keyframe.mapping['value'] = str(letter + + UNITY_ANIMATION_FUDGE_MARGIN) + curve.mapping['attribute'] = "material.{}".format(generate_utils.getShaderParamByRowColByte(row, col, byte)) + curve.mapping['path'] = "World Constraint/Container/TaSTT" + # Add curve to animation + anim_clip.mapping['m_FloatCurves'].sequence.append(curve) + anim_clip.mapping['m_EditorCurves'].sequence.append(curve) + # Serialize animation to file + anim_name = generate_utils.getClearAnimationName() + anim_path = anim_dir + anim_name + ".anim" + with open(anim_path, "w") as f: + f.write(libunity.unityYamlToString([anim_node])) + # Generate metadata + meta = libunity.Metadata() + with open(anim_path + ".meta", "w") as f: + f.write(str(meta)) + # Add metadata to guid map + guid_map[anim_path] = meta.guid + guid_map[meta.guid] = anim_path + +# Generate a toggle animation for a shader parameter. +def generateToggleAnimations(anim_dir, shader_param, guid_map): + print("Generating shader toggle animation", file=sys.stderr) + + parser = libunity.UnityParser() + parser.parse(LETTER_ANIMATION_TEMPLATE) + + # 0.0 represents false, 1.0 represents true. Don't forget that we add + # `UNITY_ANIMATION_FUDGE_MARGIN` to everything. + for shader_value in range(0, 2): + anim_node = parser.nodes[0] + anim_clip = anim_node.mapping['AnimationClip'] + curve_template = anim_clip.mapping['m_FloatCurves'].sequence[0] + anim_clip.mapping['m_FloatCurves'].sequence = [] + anim_clip.mapping['m_EditorCurves'].sequence = [] + + curve = curve_template.copy() + for keyframe in curve.mapping['curve'].mapping['m_Curve'].sequence: + keyframe.mapping['value'] = str(float(shader_value) + + UNITY_ANIMATION_FUDGE_MARGIN) + curve.mapping['attribute'] = "material.{}".format(shader_param) + curve.mapping['path'] = "World Constraint/Container/TaSTT" + # Add curve to animation + anim_clip.mapping['m_FloatCurves'].sequence.append(curve) + anim_clip.mapping['m_EditorCurves'].sequence.append(curve) + + # Serialize animation to file + anim_name = generate_utils.getClearAnimationName() + anim_suffix = "_Off" + if shader_value == 1: + anim_suffix = "_On" + anim_path = anim_dir + shader_param + anim_suffix + ".anim" + with open(anim_path, "w") as f: + f.write(libunity.unityYamlToString([anim_node])) + # Generate metadata + meta = libunity.Metadata() + with open(anim_path + ".meta", "w") as f: + f.write(str(meta)) + # Add metadata to guid map + guid_map[anim_path] = meta.guid + guid_map[meta.guid] = anim_path + +# Generate a toggle animation for a shader parameter. +def generateFloatAnimation(anim_name: str, anim_dir: str, + path: str, attribute: str, + value: float, + guid_map: typing.Dict[str, str]) -> str: + print("Generating float toggle animation {}/{}".format(path,attribute), + file=sys.stderr) + + parser = libunity.UnityParser() + parser.parse(LETTER_ANIMATION_TEMPLATE) + + # 0.0 represents false, 1.0 represents true. Don't forget that we add + # `UNITY_ANIMATION_FUDGE_MARGIN` to everything. + anim_node = parser.nodes[0] + anim_clip = anim_node.mapping['AnimationClip'] + curve_template = anim_clip.mapping['m_FloatCurves'].sequence[0] + anim_clip.mapping['m_FloatCurves'].sequence = [] + anim_clip.mapping['m_EditorCurves'].sequence = [] + + curve = curve_template.copy() + for keyframe in curve.mapping['curve'].mapping['m_Curve'].sequence: + keyframe.mapping['value'] = str(value) + curve.mapping['attribute'] = attribute + curve.mapping['path'] = path + # Add curve to animation + anim_clip.mapping['m_FloatCurves'].sequence.append(curve) + anim_clip.mapping['m_EditorCurves'].sequence.append(curve) + + # Serialize animation to file + anim_path = anim_dir + anim_name + ".anim" + with open(anim_path, "w") as f: + f.write(libunity.unityYamlToString([anim_node])) + # Generate metadata + meta = libunity.Metadata() + with open(anim_path + ".meta", "w") as f: + f.write(str(meta)) + # Add metadata to guid map + guid_map[anim_path] = meta.guid + guid_map[meta.guid] = anim_path + + return meta.guid + +def generateAnimations(anim_dir, guid_map): + generateClearAnimation(args.gen_anim_dir, guid_map) + + generateToggleAnimations(args.gen_anim_dir, generate_utils.getIndicator0Param(), guid_map) + generateToggleAnimations(args.gen_anim_dir, generate_utils.getIndicator1Param(), guid_map) + + print("Generating letter animations", file=sys.stderr) + + parser = libunity.UnityParser() + parser.parse(LETTER_ANIMATION_TEMPLATE) + + anim_node = parser.nodes[0] + anim_clip = anim_node.mapping['AnimationClip'] + curve_template = anim_clip.mapping['m_FloatCurves'].sequence[0] + anim_clip.mapping['m_FloatCurves'].sequence = [] + anim_clip.mapping['m_EditorCurves'].sequence = [] + + # To support more languages, we use 2 bytes per character, giving us a 64K character set. + for byte in range(0, generate_utils.BYTES_PER_CHAR): + for row in range(0, generate_utils.BOARD_ROWS): + print("Generating letter animations (row {}/{}) (byte {}/2)".format(row, + generate_utils.BOARD_ROWS, byte), file=sys.stderr) + for col in range(0, generate_utils.BOARD_COLS): + for letter in range(0, 2): + if letter == 1: + letter = generate_utils.CHARS_PER_CELL - 1 + + # Make a deep copy of the templates + node = anim_node.copy() + curve = curve_template.copy() + clip = node.mapping['AnimationClip'] + # Populate animation name + anim_name = generate_utils.getLetterAnimationName(row, col, letter, byte) + clip.mapping['m_Name'] = anim_name + # Populate letter value + for keyframe in curve.mapping['curve'].mapping['m_Curve'].sequence: + keyframe.mapping['value'] = str(letter + UNITY_ANIMATION_FUDGE_MARGIN) + # Populate path to letter parameter + curve.mapping['attribute'] = "material.{}".format(generate_utils.getShaderParamByRowColByte(row, col, byte)) + curve.mapping['path'] = "World Constraint/Container/TaSTT" + # Add curve to animation + clip.mapping['m_FloatCurves'].sequence.append(curve) + clip.mapping['m_EditorCurves'].sequence.append(curve) + # Serialize animation to file + anim_path = anim_dir + anim_name + ".anim" + with open(anim_path, "w") as f: + f.write(libunity.unityYamlToString([node])) + # Generate metadata + meta = libunity.Metadata() + with open(anim_path + ".meta", "w") as f: + f.write(str(meta)) + # Add metadata to guid map + guid_map[anim_path] = meta.guid + guid_map[meta.guid] = anim_path + +def generateFXController(anim: libunity.UnityAnimator) -> typing.Dict[int, libunity.UnityDocument]: + parser = libunity.UnityParser() + parser.parse(ANIMATOR_TEMPLATE) + anim.addNodes(parser.nodes) + + anim.addParameter(generate_utils.getEnableParam(), bool) + anim.addParameter(generate_utils.getDummyParam(), bool) + anim.addParameter(generate_utils.getHipToggleParam(), bool) + anim.addParameter(generate_utils.getHandToggleParam(), bool) + anim.addParameter(generate_utils.getToggleParam(), bool) + anim.addParameter(generate_utils.getSpeechNoiseEnableParam(), bool) + anim.addParameter(generate_utils.getClearBoardParam(), bool) + anim.addParameter(generate_utils.getIndicator0Param(), bool) + anim.addParameter(generate_utils.getIndicator1Param(), bool) + anim.addParameter(generate_utils.getScaleParam(), float) + + layers = {} + for byte in range(0, generate_utils.BYTES_PER_CHAR): + layers[byte] = {} + for i in range(0, generate_utils.NUM_LAYERS): + anim.addParameter(generate_utils.getBlendParam(i, byte), float) + + layer = anim.addLayer(generate_utils.getLayerName(i, byte)) + layers[byte][i] = layer + anim.addParameter(generate_utils.getSelectParam(), int) + + return layers + +def generateFXLayer(which_layer: int, anim: libunity.UnityAnimator, layer: + libunity.UnityDocument, gen_anim_dir: str, byte: int): + is_default_state = True + default_state = anim.addAnimatorState(layer, + generate_utils.getDefaultStateName(which_layer, byte), is_default_state) + + dy = 100 + active_state = anim.addAnimatorState(layer, + generate_utils.getActiveStateName(which_layer, byte), dy = dy) + + active_state_transition = anim.addTransition(active_state) + enable_param = generate_utils.getEnableParam() + anim.addTransitionBooleanCondition(default_state, active_state_transition, + enable_param, True) + + select_states = {} + for i in range(0, generate_utils.NUM_REGIONS): + dx = i * 200 + dy = 200 + + # Create blend tree for this region. + anim_lo_path = gen_anim_dir + \ + generate_utils.getAnimationNameByLayerAndIndex( + which_layer, i, 0, byte) + \ + ".anim" + guid_lo = guid_map[anim_lo_path] + anim_hi_path = gen_anim_dir + \ + generate_utils.getAnimationNameByLayerAndIndex( + which_layer, i, generate_utils.CHARS_PER_CELL - 1, byte) + \ + ".anim" + guid_hi = guid_map[anim_hi_path] + + select_states[i] = anim.addAnimatorBlendTree(layer, + generate_utils.getBlendStateName(which_layer, i, byte), + generate_utils.getBlendParam(which_layer, byte), + guid_lo, guid_hi, dx = dx, dy = dy) + state = select_states[i] + + # Create transition to state. + select_state_transition = anim.addTransition(state) + select_param = generate_utils.getSelectParam() + anim.addTransitionIntegerEqualityCondition(active_state, + select_state_transition, select_param, i) + + # Create return-home transition. + home_state_transition = anim.addTransition(default_state) + home_state_transition.mapping['AnimatorStateTransition'].mapping['m_InterruptionSource'] = '0' + dummy_param = generate_utils.getDummyParam() + anim.addTransitionBooleanCondition(state, + home_state_transition, dummy_param, False) + pass + +# Generic toggle adding utility. +# Generates the layer and parameter. +# Returns a map containing the off and on states, as well as the +# transitions between them. +def generateToggle(layer_name: str, + gen_anim_dir: str, + off_anim_basename: str, + on_anim_basename: str, + anim: libunity.UnityAnimator) -> typing.Dict[str, + libunity.UnityDocument]: + layer = anim.addLayer(layer_name) + + # For simplicity, use the layer name as the parameter name. + parameter_name = layer_name + anim.addParameter(parameter_name, bool) + + off_state = anim.addAnimatorState(layer, layer_name + "_Off", + is_default_state = True) + on_state = anim.addAnimatorState(layer, layer_name + "_On", dy=100) + + if off_anim_basename: + off_anim_path = gen_anim_dir + off_anim_basename + off_anim_meta = libunity.Metadata() + off_anim_meta.load(off_anim_path) + anim.setAnimatorStateAnimation(off_state, off_anim_meta.guid) + + if on_anim_basename: + on_anim_path = gen_anim_dir + on_anim_basename + on_anim_meta = libunity.Metadata() + on_anim_meta.load(on_anim_path) + anim.setAnimatorStateAnimation(on_state, on_anim_meta.guid) + + off_to_on_trans = anim.addTransition(on_state) + anim.addTransitionBooleanCondition(off_state, + off_to_on_trans, parameter_name, True) + + on_to_off_trans = anim.addTransition(off_state) + anim.addTransitionBooleanCondition(on_state, + on_to_off_trans, parameter_name, False) + + result = {} + result["off"] = off_state + result["on"] = on_state + result["off_to_on"] = off_to_on_trans + result["on_to_off"] = on_to_off_trans + + return result + +def generateScaleLayer(anim: libunity.UnityAnimator, + gen_anim_dir: str, + guid_map: typing.Dict[str, str]): + + scale_layer = anim.addLayer(generate_utils.getScaleParam()) + + path = "World Constraint/Container/TaSTT" + attribute = "blendShape.Scale" + + guid_lo = generateFloatAnimation("TaSTT_Scale_0", gen_anim_dir, + path, attribute, + 0.0, guid_map) + guid_hi = generateFloatAnimation("TaSTT_Scale_100", gen_anim_dir, + path, attribute, + 100.0, guid_map) + + anim.addAnimatorBlendTree(scale_layer, + generate_utils.getScaleParam(), + generate_utils.getScaleParam(), + guid_lo, guid_hi, + lo_threshold = 0.0, hi_threshold = 1.0); + + pass + +def generateFX(guid_map, gen_anim_dir): + anim = libunity.UnityAnimator() + + layers = generateFXController(anim) + + # TODO(yum) parallelize + for byte in range(0, generate_utils.BYTES_PER_CHAR): + for which_layer, layer in layers[byte].items(): + print("Generating layer {}/{}".format(which_layer, len(layers[byte].items())), file=sys.stderr) + generateFXLayer(which_layer, anim, layer, gen_anim_dir, byte) + + states = generateToggle( + generate_utils.getSpeechNoiseToggleParam(), + "Animations/", + "TaSTT_Speech_Noise_Off.anim", + "TaSTT_Speech_Noise_On.anim", + anim) + # Enable beeping only if user has turned it on. + anim.addTransitionBooleanCondition(states["off"], + states["off_to_on"], generate_utils.getSpeechNoiseEnableParam(), True) + # Enable beeping only if board is out. + anim.addTransitionBooleanCondition(states["off"], + states["off_to_on"], generate_utils.getToggleParam(), True) + + generateToggle(generate_utils.getToggleParam(), + "Animations/", + "TaSTT_Toggle_Off.anim", + "TaSTT_Toggle_On.anim", + anim) + generateToggle(generate_utils.getLockWorldParam(), + "Animations/", + "TaSTT_Lock_World_Disable.anim", + "TaSTT_Lock_World_Enable.anim", + anim) + generateToggle( + generate_utils.getClearBoardParam(), + gen_anim_dir, + None, # No animation in the `off` state. + generate_utils.getClearAnimationName() + ".anim", + anim) + generateToggle(generate_utils.getIndicator0Param(), + gen_anim_dir, + generate_utils.getIndicator0Param() + "_Off.anim", + generate_utils.getIndicator0Param() + "_On.anim", + anim) + generateToggle(generate_utils.getIndicator1Param(), + gen_anim_dir, + generate_utils.getIndicator1Param() + "_Off.anim", + generate_utils.getIndicator1Param() + "_On.anim", + anim) + generateScaleLayer(anim, gen_anim_dir, guid_map) + + return anim + +def parseArgs(): + parser = argparse.ArgumentParser() + parser.add_argument("cmd", type=str, help="") + parser.add_argument("--gen_dir", type=str, help="The directory under " + + "which all generated assets are placed") + parser.add_argument("--gen_anim_dir", type=str, help="The directory under " + + "which all generated animations are placed.") + parser.add_argument("--guid_map", type=str, help="The path to a file which will store guids") + args = parser.parse_args() + + if not args.gen_dir: + args.gen_dir = "generated/" + + if not args.gen_anim_dir: + args.gen_anim_dir = args.gen_dir + "animations/" + + if not args.guid_map: + args.guid_map = "guid.map" + + return args + +if __name__ == "__main__": + args = parseArgs() + + if args.cmd == "gen_anims": + guid_map = {} + with open(args.guid_map, 'rb') as f: + guid_map = pickle.load(f) + + os.makedirs(args.gen_anim_dir, exist_ok=True) + generateAnimations(args.gen_anim_dir, guid_map) + + with open(args.guid_map, 'wb') as f: + pickle.dump(guid_map, f) + elif args.cmd == "gen_fx": + guid_map = {} + with open(args.guid_map, 'rb') as f: + guid_map = pickle.load(f) + + print(str(generateFX(guid_map, args.gen_anim_dir))) + diff --git a/Scripts/libunity.py b/Scripts/libunity.py new file mode 100644 index 0000000..f9e9e28 --- /dev/null +++ b/Scripts/libunity.py @@ -0,0 +1,1356 @@ +#!/usr/bin/env python3 + +import argparse +import copy +import enum +import math +import os +import pickle +import random +import sys +# python3 -m pip install pyyaml +# License: MIT. +import yaml + +import multiprocessing as mp + +WRITE_DEFAULTS_ANIM_TEMPLATE = """ +%YAML 1.1 +%TAG !u! tag:unity3d.com,2011: +--- !u!74 &7400000 +AnimationClip: + m_ObjectHideFlags: 0 + m_CorrespondingSourceObject: {fileID: 0} + m_PrefabInstance: {fileID: 0} + m_PrefabAsset: {fileID: 0} + m_Name: TaSTT_Reset_Animations + serializedVersion: 6 + m_Legacy: 0 + m_Compressed: 0 + m_UseHighQualityCurve: 1 + m_RotationCurves: [] + m_CompressedRotationCurves: [] + m_EulerCurves: [] + m_PositionCurves: [] + m_ScaleCurves: [] + m_FloatCurves: + - curve: + serializedVersion: 2 + m_Curve: + - serializedVersion: 3 + time: 0 + value: 0 + inSlope: 0 + outSlope: 0 + tangentMode: 136 + weightedMode: 0 + inWeight: 0 + outWeight: 0 + m_PreInfinity: 2 + m_PostInfinity: 2 + m_RotationOrder: 4 + attribute: REPLACEME_ATTRIBUTE + path: REPLACEME_PATH + classID: 137 + script: {fileID: 0} + m_PPtrCurves: [] + m_SampleRate: 60 + m_WrapMode: 0 + m_Bounds: + m_Center: {x: 0, y: 0, z: 0} + m_Extent: {x: 0, y: 0, z: 0} + m_ClipBindingConstant: + genericBindings: + - serializedVersion: 2 + path: 2794480623 + attribute: 2284639795 + script: {fileID: 0} + typeID: 137 + customType: 22 + isPPtrCurve: 0 + pptrCurveMapping: [] + m_AnimationClipSettings: + serializedVersion: 2 + m_AdditiveReferencePoseClip: {fileID: 0} + m_AdditiveReferencePoseTime: 0 + m_StartTime: 0 + m_StopTime: 0 + m_OrientationOffsetY: 0 + m_Level: 0 + m_CycleOffset: 0 + m_HasAdditiveReferencePose: 0 + m_LoopTime: 1 + m_LoopBlend: 0 + m_LoopBlendOrientation: 0 + m_LoopBlendPositionY: 0 + m_LoopBlendPositionXZ: 0 + m_KeepOriginalOrientation: 0 + m_KeepOriginalPositionY: 1 + m_KeepOriginalPositionXZ: 0 + m_HeightFromFeet: 0 + m_Mirror: 0 + m_EditorCurves: + - curve: + serializedVersion: 2 + m_Curve: + - serializedVersion: 3 + time: 0 + value: 0 + inSlope: 0 + outSlope: 0 + tangentMode: 136 + weightedMode: 0 + inWeight: 0 + outWeight: 0 + m_PreInfinity: 2 + m_PostInfinity: 2 + m_RotationOrder: 4 + attribute: REPLACEME_ATTRIBUTE + path: REPLACEME_PATH + classID: 137 + script: {fileID: 0} + m_EulerEditorCurves: [] + m_HasGenericRootTransform: 0 + m_HasMotionFloatCurves: 0 + m_Events: [] +"""[1:][:-1] + +METADATA_TEMPLATE = """ +fileFormatVersion: 2 +guid: REPLACEME_GUID +NativeFormatImporter: + externalObjects: {} + mainObjectFileID: 7400000 + userData: + assetBundleName: + assetBundleVariant: +"""[1:][:-1] + +ANIMATION_STATE_TEMPLATE = """ +--- !u!1102 &110200000 +AnimatorState: + serializedVersion: 6 + m_ObjectHideFlags: 1 + m_CorrespondingSourceObject: {fileID: 0} + m_PrefabInstance: {fileID: 0} + m_PrefabAsset: {fileID: 0} + m_Name: REPLACEME_ANIMATION_NAME + m_Speed: 1 + m_CycleOffset: 0 + m_Transitions: [] + m_StateMachineBehaviours: [] + m_Position: {x: 50, y: 50, z: 0} + m_IKOnFeet: 0 + m_WriteDefaultValues: 0 + m_Mirror: 0 + m_SpeedParameterActive: 0 + m_MirrorParameterActive: 0 + m_CycleOffsetParameterActive: 0 + m_TimeParameterActive: 0 + m_Motion: {} + m_Tag: + m_SpeedParameter: + m_MirrorParameter: + m_CycleOffsetParameter: + m_TimeParameter: +"""[1:][:-1] + +TRANSITION_TEMPLATE = """ +--- !u!1101 &110100000 +AnimatorStateTransition: + m_ObjectHideFlags: 1 + m_CorrespondingSourceObject: {fileID: 0} + m_PrefabInstance: {fileID: 0} + m_PrefabAsset: {fileID: 0} + m_Name: + m_Conditions: [] + m_DstStateMachine: {fileID: 0} + m_DstState: {fileID: 0} + m_Solo: 0 + m_Mute: 0 + m_IsExit: 0 + serializedVersion: 3 + m_TransitionDuration: 0 + m_TransitionOffset: 0 + m_ExitTime: 0.0 + m_HasExitTime: 0 + m_HasFixedDuration: 1 + m_InterruptionSource: 2 + m_OrderedInterruption: 1 + m_CanTransitionToSelf: 1 +"""[1:][:-1] + +BLEND_TREE_TEMPLATE = """ +--- !u!206 &1071664566462684110 +BlendTree: + m_ObjectHideFlags: 1 + m_CorrespondingSourceObject: {fileID: 0} + m_PrefabInstance: {fileID: 0} + m_PrefabAsset: {fileID: 0} + m_Name: REPLACEME_BLEND_TREE_NAME + m_Childs: + - serializedVersion: 2 + m_Motion: {fileID: 7400000, guid: REPLACEME_GUID_LO, type: 2} + m_Threshold: -1 + m_Position: {x: 0, y: 0} + m_TimeScale: 1 + m_CycleOffset: 0 + m_DirectBlendParameter: REPLACEME_BLEND_PARAMETER + m_Mirror: 0 + - serializedVersion: 2 + m_Motion: {fileID: 7400000, guid: REPLACEME_GUID_HI, type: 2} + m_Threshold: 1 + m_Position: {x: 0, y: 0} + m_TimeScale: 1 + m_CycleOffset: 0 + m_DirectBlendParameter: REPLACEME_BLEND_PARAMETER + m_Mirror: 0 + m_BlendParameter: REPLACEME_BLEND_PARAMETER + m_BlendParameterY: REPLACEME_BLEND_PARAMETER + m_MinThreshold: -1 + m_MaxThreshold: 1 + m_UseAutomaticThresholds: 0 + m_NormalizedBlendValues: 0 + m_BlendType: 0 +"""[1:][:-1] + +class Metadata: + def __init__(self): + self.guid = "%032x" % random.randrange(16 ** 32) + + def load(self, path): + if not path.endswith(".meta"): + path = path + ".meta" + + self.guid = None + with open(path, "r") as f: + for line in f: + if line.startswith("guid"): + self.guid = line.split()[1] + + def __str__(self): + return METADATA_TEMPLATE.replace("REPLACEME_GUID", self.guid) + +class Node: + def __init__(self): + # Optional. In Unity, this is the fileID of an object. Not all YAML + # mappings have an anchor. + self.anchor = None + + # Pointer to the Node containing this one. + self.parent = None + +class Sequence(Node): + def __init__(self): + super().__init__() + self.sequence = [] + + def copy(self): + new = Sequence() + new.anchor = self.anchor + new.parent = self.parent + + for v in self.sequence: + if hasattr(v, "copy"): + new.sequence.append(v.copy()) + new.sequence[-1].parent = new + else: + new.sequence.append(v) + + return new + + def prettyPrint(self, first_indent=None, leading_newline=None): + depth = 0 + p = self.parent + while p != None: + depth += 1 + p = p.parent + indent = " " * depth + + lines = [] + first = True + for item in self.sequence: + cur_indent = indent + if first: + if first_indent != None: + cur_indent = first_indent + first = False + if hasattr(item, "prettyPrint"): + lines.append("{}- {}".format(cur_indent, item.prettyPrint(first_indent="", leading_newline=False))) + else: + lines.append("{}- {}".format(cur_indent, item)) + + if len(lines) == 0: + return "[]" + + return "\n" + '\n'.join(lines) + + def __str__(self): + return self.prettyPrint() + + def addChildMapping(self, anchor = None, add_to_head = False): + child = Mapping() + child.anchor = anchor + child.parent = self + child.sequence = [] + + if add_to_head: + self.sequence = [child] + self.sequence + else: + self.sequence.append(child) + + return child + + def addChildSequence(self, anchor = None): + child = Sequence() + child.anchor = anchor + child.parent = self + child.sequence = [] + + self.sequence.append(child) + + return child + + def forEach(self, cb): + for k in self.sequence: + cb(k) + +class Mapping(Node): + def __init__(self): + super().__init__() + self.mapping = {} + + def copy(self): + new = Mapping() + new.anchor = self.anchor + new.parent = self.parent + + for k, v in self.mapping.items(): + if hasattr(v, "copy"): + new.mapping[k] = v.copy() + new.mapping[k].parent = new + else: + new.mapping[k] = v + + return new + + def prettyPrint(self, first_indent=None, leading_newline=True): + depth = 0 + p = self.parent + while p != None: + depth += 1 + p = p.parent + indent = " " * depth + + lines = [] + first = True + for k, v in self.mapping.items(): + cur_indent = indent + if first: + if first_indent != None: + cur_indent = first_indent + first = False + lines.append("{}{}: {}".format(cur_indent, k, v)) + + result = '\n'.join(lines) + + # Inline 1-item mappings, matching Unity behavior. + if len(self.mapping.keys()) == 1 and len(result.split("\n")) == 1: + if first_indent == None: + return self.prettyPrint(first_indent="") + return "{" + lines[0] + "}" + + # Empty mappings are represented by '{}'. If we don't do this, Unity + # will assume that they are Sequences and get very sad. + if len(self.mapping.keys()) == 0: + return "{}" + + if leading_newline: + result = "\n" + result + + return result + + def __str__(self): + return self.prettyPrint() + + def addChildMapping(self, key, anchor = None): + child = Mapping() + child.anchor = anchor + child.parent = self + child.mapping = {} + + self.mapping[key] = child + + return child + + def addChildSequence(self, key, anchor = None): + child = Sequence() + child.anchor = anchor + child.parent = self + child.mapping = {} + + self.mapping[key] = child + + return child + + def forEach(self, cb): + for k, v in self.mapping.items(): + cb(v) + +class UnityDocument(Mapping): + def __init__(self): + super().__init__() + self.class_id = None + + def __str__(self): + return super().__str__() + + def copy(self): + result = super().copy() + result.class_id = self.class_id + return result + +# Class representing a Unity AnimatorController. Implements manipulations, like +# merging and reanchoring. +class UnityAnimator(): + def __init__(self): + self.nodes = [] + self.id_to_node = {} + self.next_id = 1000 * 1000 + + def __str__(self): + return unityYamlToString(self.nodes) + + def addNodes(self, nodes): + for node in nodes: + self.nodes.append(node) + anchor = node.anchor + if anchor == None: + raise Exception("Node is missing anchor: {}".format(str(node))) + if anchor in self.id_to_node: + raise Exception("Duplicate anchor: {}, node 1: {}, node 2: {}".format(anchor, str(node), str(self.id_to_node[anchor]))) + self.id_to_node[anchor] = node + + if int(anchor) > self.next_id: + self.next_id = int(anchor) + 1 + # I don't know why but this fixes a bug in the `fixWriteDefaults` + # codepath: two documents wind up with the same anchor. + self.next_id += 1 + + def allocateId(self) -> int: + result = self.next_id + self.next_id += 1 + return result + + # Checks if `old_id` is in `self.id_mapping`, and if so, returns the + # already-generated ID. Otherwise this allocates a new ID and + # records it in `self.id_mapping`. + def mapId(self, old_id: str) -> int: + new_id = None + if old_id in self.id_mapping.keys(): + new_id = self.id_mapping[old_id] + else: + new_id = self.allocateId() + self.id_mapping[old_id] = new_id + return new_id + + # Recursively iterate every mapping under `node` and assign new IDs to + # every identifier. Mappings are recorded in `self.id_mapping`. + def mergeIterator(self, node): + if hasattr(node, "mapping"): + # Don't relabel anything that's defined in an external file. + # TODO(yum) do this. + if 'fileID' in node.mapping and not 'guid' in node.mapping: + if node.mapping['fileID'] != '0': + old_id = node.mapping['fileID'] + new_id = self.mapId(old_id) + node.mapping['fileID'] = str(new_id) + if hasattr(node, "forEach"): + node.forEach(self.mergeIterator) + + def peekNodeOfClass(self, classId): + for node in self.nodes: + if node.class_id == classId: + return node + return None + + def popNodeOfClass(self, classId): + result = None + for node in self.nodes: + if node.class_id == classId: + result = node + self.nodes.remove(result) + break + if result: + del self.id_to_node[result.anchor] + return result + + def pushNode(self, node): + self.nodes.append(node) + self.id_to_node[node.anchor] = node + + # Merges two animator controllers and returns the result. Any identifiers + # in the animators are reassigned in a new namespace. The mappings from old + # identifiers to new identifiers are recorded in `self.id_mapping0` and + # `self.id_mapping1`. + def mergeAnimatorControllers(self, ctrl0, ctrl1): + ctrl0 = copy.deepcopy(ctrl0) + ctrl1 = copy.deepcopy(ctrl1) + + self.id_mapping0 = {} + self.id_mapping1 = {} + + p0 = ctrl0.mapping['AnimatorController'].mapping['m_AnimatorParameters'] + p1 = ctrl1.mapping['AnimatorController'].mapping['m_AnimatorParameters'] + + a0 = ctrl0.mapping['AnimatorController'].mapping['m_AnimatorLayers'] + a1 = ctrl1.mapping['AnimatorController'].mapping['m_AnimatorLayers'] + + self.id_mapping = self.id_mapping0 + p0.forEach(self.mergeIterator) + a0.forEach(self.mergeIterator) + + # Hack to prevent ctrl1 from getting a new ID for the animator. + # TODO(yum) delete this? + #del self.class_to_next_id['91'] + + self.id_mapping = self.id_mapping1 + p1.forEach(self.mergeIterator) + a1.forEach(self.mergeIterator) + + p0.sequence += p1.sequence + a0.sequence += a1.sequence + + for elm in p0.sequence: + elm.mapping['m_Controller'].mapping['fileID'] = ctrl0.anchor + for elm in a0.sequence: + elm.mapping['m_Controller'].mapping['fileID'] = ctrl0.anchor + + return ctrl0 + + def merge(self, other): + ctrl0 = self.popNodeOfClass('91') + ctrl1 = other.popNodeOfClass('91') + # Merge animators and populate `self.id_mapping0` and + # `self.id_mapping1. + merged_anim = self.mergeAnimatorControllers(ctrl0, ctrl1) + + # Mapping from class ID (string) to new class ID (int) + self.id_mapping = self.id_mapping0 + for node in self.nodes: + new_id = self.mapId(node.anchor) + node.anchor = str(new_id) + node.forEach(self.mergeIterator) + + self.id_mapping = self.id_mapping1 + for node in other.nodes: + new_id = self.mapId(node.anchor) + node.anchor = str(new_id) + node.forEach(self.mergeIterator) + + nodes = self.nodes + self.nodes = [] + self.id_to_node = {} + self.pushNode(merged_anim) + self.addNodes(nodes) + self.addNodes(other.nodes) + + # TODO(yum) support overwriting duplicates + def addParameter(self, param_name, param_type): + unity_type = None + if param_type == float: + unity_type = '1' + elif param_type == int: + unity_type = '3' + elif param_type == bool: + unity_type = '4' + anim = self.peekNodeOfClass('91') + params = anim.mapping['AnimatorController'].mapping['m_AnimatorParameters'] + param = params.addChildMapping() + param.mapping['m_Name'] = param_name + param.mapping['m_Type'] = unity_type + param.mapping['m_DefaultFloat'] = '0' + param.mapping['m_DefaultInt'] = '0' + param.mapping['m_DefaultBool'] = '0' + ctrl = param.addChildMapping('m_Controller') + ctrl.mapping['fileID'] = anim.anchor + + def addLayer(self, layer_name, add_to_head = False) -> UnityDocument: + # Add layer to controller + anim = self.peekNodeOfClass('91') + layers = anim.mapping['AnimatorController'].mapping['m_AnimatorLayers'] + layer = layers.addChildMapping(add_to_head = add_to_head) + layer.mapping['serializedVersion'] = '5' + layer.mapping['m_Name'] = layer_name + new_id = self.allocateId() + layer.addChildMapping('m_StateMachine').mapping['fileID'] = str(new_id) + layer.addChildMapping('m_Mask').mapping['fileID'] = '0' + layer.addChildSequence('m_Motions') + layer.addChildSequence('m_Behaviours') + layer.mapping['m_BlendingMode'] = '0' + layer.mapping['m_SyncedLayerIndex'] = '-1' + layer.mapping['m_DefaultWeight'] = '1' + layer.mapping['m_IKPass'] = '0' + layer.mapping['m_SyncedLayerAffectsTiming'] = '0' + layer.addChildMapping('m_Controller').mapping['fileID'] = anim.anchor + + # Create layer object + layer = UnityDocument() + layer.class_id = "1107" + layer.anchor = str(new_id) + mach = layer.addChildMapping('AnimatorStateMachine') + + mach.mapping['serializedVersion'] = '6' + + mach.mapping['m_ObjectHideFlags'] = '1' + mach.addChildMapping('m_CorrespondingSourceObject').mapping['fileID'] = '0' + mach.addChildMapping('m_PrefabInstance').mapping['fileID'] = '0' + mach.addChildMapping('m_PrefabAsset').mapping['fileID'] = '0' + mach.mapping['m_Name'] = layer_name + mach.addChildSequence('m_ChildStates') + mach.addChildSequence('m_ChildStateMachines') + mach.addChildSequence('m_AnyStateTransitions') + mach.addChildSequence('m_EntryTransitions') + mach.addChildMapping('m_StateMachineTransitions') + mach.addChildSequence('m_StateMachineBehaviours') + pos = mach.addChildMapping('m_AnyStatePosition') + pos.mapping['x'] = '50' + pos.mapping['y'] = '20' + pos.mapping['z'] = '0' + pos = mach.addChildMapping('m_EntryPosition') + pos.mapping['x'] = '50' + pos.mapping['y'] = '120' + pos.mapping['z'] = '0' + pos = mach.addChildMapping('m_ExitPosition') + pos.mapping['x'] = '800' + pos.mapping['y'] = '120' + pos.mapping['z'] = '0' + pos = mach.addChildMapping('m_ParentStateMachinePosition') + pos.mapping['x'] = '800' + pos.mapping['y'] = '20' + pos.mapping['z'] = '0' + mach.addChildMapping('m_DefaultState') + + self.nodes.append(layer) + return layer + + def addAnimatorState(self, layer, state_name, is_default_state = False, + dx = 0, dy = 0) -> UnityDocument: + # Create animation state + parser = UnityParser() + parser.parse(ANIMATION_STATE_TEMPLATE) + new_anim = UnityAnimator() + new_anim.addNodes(parser.nodes) + node = new_anim.nodes[0] + + new_id = self.allocateId() + node.class_id = "1102" + node.anchor = str(new_id) + state = node.mapping['AnimatorState'] + state.mapping['m_Name'] = state_name + #state.mapping['m_Motion'].mapping['guid'] = anim_guid + self.nodes.append(node) + + # Add state to layer + child_state = layer.mapping['AnimatorStateMachine'].mapping['m_ChildStates'].addChildMapping() + child_state.mapping['serializedVersion'] = '1' + child_state.addChildMapping('m_State').mapping['fileID'] = str(new_id) + state_pos = child_state.addChildMapping('m_Position') + state_pos.mapping['x'] = str(280 + dx) + state_pos.mapping['y'] = str(80 + dy) + state_pos.mapping['z'] = '0' + + if is_default_state: + layer.mapping['AnimatorStateMachine'].mapping['m_DefaultState'].mapping['fileID'] = str(new_id) + + return node + + def setAnimatorStateAnimation(self, anim_state, anim_guid): + anim_state.mapping['AnimatorState'].mapping['m_Motion'].mapping['guid'] = anim_guid + anim_state.mapping['AnimatorState'].mapping['m_Motion'].mapping['fileID'] = '7400000' + anim_state.mapping['AnimatorState'].mapping['m_Motion'].mapping['type'] = '2' + + # Adds a blend tree which uses the parameter named `param_name` to blend + # between anim_lo and anim_hi. Also creates the corresponding animation + # state. + def addAnimatorBlendTree(self, layer, state_name, param_name, + anim_guid_lo, anim_guid_hi, dx = 0, dy = 0, + lo_threshold = -1.0, hi_threshold = 1.0, + is_default_state = False) -> UnityDocument: + # Create the blend tree. + parser = UnityParser() + parser.parse(BLEND_TREE_TEMPLATE) + new_anim = UnityAnimator() + new_anim.addNodes(parser.nodes) + node = new_anim.nodes[0] + + new_id = self.allocateId() + node.class_id = "206" + node.anchor = str(new_id) + tree = node.mapping['BlendTree'] + tree.mapping['m_Name'] = state_name + # Low animation + tree.mapping['m_Childs'].sequence[0].mapping['m_Motion'].mapping['guid'] = anim_guid_lo + tree.mapping['m_Childs'].sequence[0].mapping['m_DirectBlendParameter'] = param_name + tree.mapping['m_Childs'].sequence[0].mapping['m_Threshold'] = str(lo_threshold) + # High animation + tree.mapping['m_Childs'].sequence[1].mapping['m_Motion'].mapping['guid'] = anim_guid_hi + tree.mapping['m_Childs'].sequence[1].mapping['m_DirectBlendParameter'] = param_name + tree.mapping['m_Childs'].sequence[1].mapping['m_Threshold'] = str(hi_threshold) + + tree.mapping['m_BlendParameter'] = param_name + tree.mapping['m_BlendParameterY'] = param_name + + self.nodes.append(node) + + # Create the corresponding animation state. + anim_state = self.addAnimatorState(layer, state_name, is_default_state, dx = dx, dy = + dy) + anim_state.mapping['AnimatorState'].mapping['m_Motion'].mapping['fileID'] = node.anchor + + return anim_state + + def addTransition(self, dst_state): + # Create animation state + parser = UnityParser() + parser.parse(TRANSITION_TEMPLATE) + new_transition = UnityAnimator() + new_transition.addNodes(parser.nodes) + node = new_transition.nodes[0] + + new_id = self.allocateId() + node.class_id = "1101" + node.anchor = str(new_id) + state = node.mapping['AnimatorStateTransition'] + state.mapping['m_DstState'].mapping['fileID'] = dst_state.anchor + self.nodes.append(node) + + return node + + def fixWriteDefaults(self, guid_map, generated_anim_path): + # TODO(yum) we should have an Animation class which encapsulates all + # this stuff. + parser = UnityParser() + parser.parse(WRITE_DEFAULTS_ANIM_TEMPLATE) + new_anim = UnityAnimator() + new_anim.addNodes(parser.nodes) + + new_clip = new_anim.peekNodeOfClass('74').mapping['AnimationClip'] + curve_template = new_clip.mapping['m_FloatCurves'].sequence[0] + new_clip.mapping['m_FloatCurves'].sequence = [] + new_clip.mapping['m_EditorCurves'].sequence = [] + + # Keep track of the (attribute, path) tuples we've already set to avoid + # animating the same thing twice. + attributes_set = set() + + animator_state_id = '1102' + for node in self.nodes: + if node.class_id != animator_state_id: + continue + + # Looking at an animator state. + if node.mapping['AnimatorState'].mapping['m_WriteDefaultValues'] != '1': + continue + + # Disable write defaults. + node.mapping['AnimatorState'].mapping['m_WriteDefaultValues'] = '0' + + # Looking at an animator state with write defaults. + motion = node.mapping['AnimatorState'].mapping['m_Motion'] + # Some animations have write defaults but don't trigger an + # animation. No idea what that's about. For now, just ignore. + if not 'guid' in motion.mapping: + continue + guid = motion.mapping['guid'] + + # Again, not really sure what's going on here, just ignore and + # revisit if we hit problems. + if not guid in guid_map.keys(): + continue + + # OK, we found an animation with write defaults, and we know where + # the animation lives. Crack it open and see what it's writing. + animation_path = guid_map[guid] + print("Animation has write defaults: {}".format(animation_path), file=sys.stderr) + parser = UnityParser() + parser.parseFile(animation_path) + anim = UnityAnimator() + anim.addNodes(parser.nodes) + + clip = anim.peekNodeOfClass('74') + + for curve in clip.mapping['AnimationClip'].mapping['m_FloatCurves'].sequence: + attr = curve.mapping['attribute'] + path = curve.mapping['path'] + if (attr, path) in attributes_set: + continue + #print("Fix attr/path {}/{}".format(attr, path), file=sys.stderr) + attributes_set.add((attr, path)) + + new_curve = curve_template.copy() + new_curve.mapping['attribute'] = attr + new_curve.mapping['path'] = path + + new_clip.mapping['m_FloatCurves'].sequence.append(new_curve) + new_clip.mapping['m_EditorCurves'].sequence.append(new_curve) + + #print("len float curves: {}".format(len(new_clip.mapping['m_FloatCurves'].sequence)), file=sys.stderr) + + def generateOffAnimationForGuid(self, guid_map, generated_anim_dir, guid): + # Looking at an animation. + if not guid in guid_map.keys(): + return + + animation_path = guid_map[guid] + print("Checking animation at {}".format(animation_path), file=sys.stderr) + parser = UnityParser() + parser.parseFile(animation_path) + anim = UnityAnimator() + anim.addNodes(parser.nodes) + + clip = anim.peekNodeOfClass('74') + + has_nonzero = False + curve_members = ["m_FloatCurves", "m_EditorCurves"] + for memb in curve_members: + for curve in clip.mapping['AnimationClip'].mapping[memb].sequence: + attr = curve.mapping['attribute'] + path = curve.mapping['path'] + + for m_curve in curve.mapping['curve'].mapping['m_Curve'].sequence: + if m_curve.mapping['value'] != '0': + has_nonzero = True + m_curve.mapping['value'] = '0' + + if not has_nonzero: + print("Animation does not set anything nonzero") + return + + print("Animation sets things nonzero, fixing") + + new_anim_path = "OFF_{}".format(os.path.basename(animation_path)) + new_anim_path = "{}/{}".format(generated_anim_dir, new_anim_path) + + with open(new_anim_path, "w") as f: + f.write(str(anim)) + + meta = Metadata() + with open(new_anim_path + ".meta", "w") as f: + f.write(str(meta)) + + def generateOffAnimationsAnimStates(self, guid_map, generated_anim_dir): + animator_state_id = '1102' + for node in self.nodes: + if node.class_id != animator_state_id: + continue + + # Looking at an animation state. + motion = node.mapping['AnimatorState'].mapping['m_Motion'] + if not 'guid' in motion.mapping: + continue + guid = motion.mapping['guid'] + self.generateOffAnimationForGuid(guid_map, generated_anim_dir, guid) + + + def generateOffAnimationsBlendTrees(self, guid_map, generated_anim_dir): + animator_state_id = '206' + for node in self.nodes: + if node.class_id != animator_state_id: + continue + + # Looking at an animation state. + for child in node.mapping['BlendTree'].mapping['m_Childs'].sequence: + motion = child.mapping['m_Motion'] + + if not 'guid' in motion.mapping: + continue + guid = motion.mapping['guid'] + self.generateOffAnimationForGuid(guid_map, generated_anim_dir, guid) + + def generateOffAnimations(self, guid_map, generated_anim_dir): + self.generateOffAnimationsAnimStates(guid_map, generated_anim_dir) + self.generateOffAnimationsBlendTrees(guid_map, generated_anim_dir) + + def addTransitionBooleanCondition(self, from_state, trans, param, branch): + # Populate the transition's condition logic. + cond = trans.mapping['AnimatorStateTransition'].mapping['m_Conditions'].addChildMapping() + if branch: + cond.mapping['m_ConditionMode'] = '1' + else: + cond.mapping['m_ConditionMode'] = '2' + cond.mapping['m_ConditionEvent'] = param + cond.mapping['m_EventThreshold'] = '0' + # Register the transition with the `from_state`. + if from_state: + from_state_trans = from_state.mapping['AnimatorState'].mapping['m_Transitions'].addChildMapping() + from_state_trans.mapping['fileID'] = trans.anchor + + def addTransitionIntegerEqualityCondition(self, from_state, trans, param, param_val): + # Populate the transition's condition logic. + cond = trans.mapping['AnimatorStateTransition'].mapping['m_Conditions'].addChildMapping() + cond.mapping['m_ConditionMode'] = '6' + cond.mapping['m_ConditionEvent'] = param + # Curiously, the typo ("treshold" only has 1 'h') is needed for this to + # work, but not for boolean conditions to work. + cond.mapping['m_EventTreshold'] = str(param_val) + # Register the transition with the `from_state`. + if from_state: + from_state_trans = from_state.mapping['AnimatorState'].mapping['m_Transitions'].addChildMapping() + from_state_trans.mapping['fileID'] = trans.anchor + + def addTransitionIntegerGreaterCondition(self, from_state, trans, param, param_val): + # Populate the transition's condition logic. + cond = trans.mapping['AnimatorStateTransition'].mapping['m_Conditions'].addChildMapping() + cond.mapping['m_ConditionMode'] = '3' + cond.mapping['m_ConditionEvent'] = param + cond.mapping['m_EventThreshold'] = str(param_val) + # Register the transition with the `from_state`. + if from_state: + from_state_trans = from_state.mapping['AnimatorState'].mapping['m_Transitions'].addChildMapping() + from_state_trans.mapping['fileID'] = trans.anchor + + # TODO(yum) this should be factored out into generate_fx.py + def addTasttToggle(self, off_anim_path, on_anim_path, toggle_param): + self.addParameter(toggle_param, bool) + + off_anim_meta = Metadata() + off_anim_meta.load(off_anim_path) + + on_anim_meta = Metadata() + on_anim_meta.load(on_anim_path) + + layer = self.addLayer('TaSTT_Toggle') + off_anim = self.addAnimatorState(layer, 'TaSTT_Toggle_Off', is_default_state = True) + self.setAnimatorStateAnimation(off_anim, off_anim_meta.guid) + on_anim = self.addAnimatorState(layer, 'TaSTT_Toggle_On') + self.setAnimatorStateAnimation(on_anim, on_anim_meta.guid) + + # TODO(yum) make a Transition class with methods for adding boolean + # conditions + off_to_on = self.addTransition(on_anim) + self.addTransitionBooleanCondition(off_anim, off_to_on, toggle_param, True) + + on_to_off = self.addTransition(off_anim) + self.addTransitionBooleanCondition(on_anim, on_to_off, toggle_param, False) + + def setNoopAnimations(self, guid_map, noop_anim_path): + noop_anim_meta = Metadata() + noop_anim_meta.load(noop_anim_path) + + for node in self.nodes: + if node.class_id != "1102": + continue + motion = node.mapping['AnimatorState'].mapping['m_Motion'] + replace = False + + if "fileID" in motion.mapping.keys() and \ + motion.mapping["fileID"] != "0": + continue + + if "guid" in motion.mapping.keys() and \ + motion.mapping["guid"] in guid_map: + continue + + motion.mapping["fileID"] = "7400000" + motion.mapping["guid"] = noop_anim_meta.guid + motion.mapping["type"] = "2" + +def unityYamlToString(nodes): + lines = [] + preamble = """ +%YAML 1.1 +%TAG !u! tag:unity3d.com,2011: +"""[1:][:-1] + lines.append(preamble) + for doc in nodes: + lines.append("--- !u!" + doc.class_id + " &" + doc.anchor) + lines.append(str(doc)) + result = '\n'.join(lines) + + for i in range(0,10): + result = result.replace("\n\n", "\n") + + return result + +class UnityParser: + STREAM_START = 100 + STREAM_END = 199 + + DOCUMENT_START = 200 + DOCUMENT_END = 299 + + MAPPING_START = 300 + MAPPING_KEY = 301 + + SEQUENCE_VALUE = 400 + + def __init__(self): + self.state = self.STREAM_START + self.cur_scalar = None + self.cur_node = None + + # Simple list of parsed documents. Populated by parse(). + self.nodes = [] + self.prev_states = [] + + def __str__(self): + return unityYamlToString(self.nodes) + + def pushState(self, state): + self.prev_states.append(self.state) + self.state = state + #print("state {} ({})".format(self.state, len(self.prev_states))) + + def popState(self): + self.state = self.prev_states[-1] + self.prev_states = self.prev_states[0:len(self.prev_states) - 1] + #print("state {} ({})".format(self.state, len(self.prev_states))) + return self.state + + def cleanYaml(self, yaml_str): + lines = [] + first_document = True + got_document = False + for line in yaml_str.split("\n"): + # Add end-of-document indicators. + if line.startswith("---"): + got_document = True + if not first_document: + lines.append("...\n") + first_document = False + + # Remove class ID tag from each block. + if line.startswith("---"): + parts = line.split() + lines.append(parts[0] + " " + parts[2] + "\n") + continue + lines.append(line) + + if got_document: + lines.append("...\n") + return '\n'.join(lines) + + def getClassIds(self, yaml_str): + anchor_to_class_id = {} + for line in yaml_str.split("\n"): + if not line.startswith("---"): + continue + + parts = line.split() + class_id = parts[1][3:] + anchor = parts[2][1:] + anchor_to_class_id[anchor] = class_id + + return anchor_to_class_id + + def parseFile(self, yaml_file): + yaml_str = "" + with open(yaml_file, "r") as f: + yaml_str = f.read() + return self.parse(yaml_str) + + def parse(self, yaml_str): + anchor_to_class_id = self.getClassIds(yaml_str) + yaml_str = self.cleanYaml(yaml_str) + + for event in yaml.parse(yaml_str): + if isinstance(event, yaml.StreamStartEvent): + if len(self.prev_states) > 0: + raise Exception("Multiple StreamStartEvents received") + self.pushState(self.STREAM_START) + + elif isinstance(event, yaml.StreamEndEvent): + if self.state != self.STREAM_START: + raise Exception("Document end received after state {}".format(self.state)) + self.popState() + if len(self.prev_states) > 0: + raise Exception("Extra states at stream end") + + elif isinstance(event, yaml.DocumentStartEvent): + if self.state != self.STREAM_START and self.state != self.DOCUMENT_END: + raise Exception("Document start received after state {}".format(self.state)) + self.pushState(self.DOCUMENT_START) + + elif isinstance(event, yaml.DocumentEndEvent): + if self.state != self.DOCUMENT_START: + raise Exception("Document end received after state {}".format(self.state)) + self.popState() + self.nodes.append(self.cur_node) + self.cur_node = None + + elif isinstance(event, yaml.MappingStartEvent): + if self.cur_node == None: + self.cur_node = UnityDocument() + self.cur_node.anchor = event.anchor + self.cur_node.class_id = anchor_to_class_id[event.anchor] + else: + self.cur_node = self.cur_node.addChildMapping(self.cur_scalar) + self.pushState(self.MAPPING_START) + + elif isinstance(event, yaml.MappingEndEvent): + if self.state != self.MAPPING_START: + raise Exception("Mapping end received after state {}".format(self.state)) + self.popState() + if self.state == self.MAPPING_KEY: + self.popState() + if self.cur_node.parent != None: + self.cur_node = self.cur_node.parent + + elif isinstance(event, yaml.SequenceStartEvent): + self.cur_node = self.cur_node.addChildSequence(self.cur_scalar) + self.pushState(self.SEQUENCE_VALUE) + + elif isinstance(event, yaml.SequenceEndEvent): + if self.state != self.SEQUENCE_VALUE: + raise Exception("Sequence end received after state {}".format(self.state)) + self.popState() + if self.state == self.MAPPING_KEY: + self.popState() + self.cur_node = self.cur_node.parent + + elif isinstance(event, yaml.ScalarEvent): + if self.state == self.MAPPING_START: + self.cur_scalar = event.value + self.pushState(self.MAPPING_KEY) + elif self.state == self.MAPPING_KEY: + self.cur_node.mapping[self.cur_scalar] = event.value + self.popState() + elif self.state == self.SEQUENCE_VALUE: + self.cur_node.sequence.append(event.value) + else: + raise Exception("Scalar event received after state {}".format(self.state)) + else: + raise Exception("Unhandled event {}".format(event)) + continue + +class MulticoreUnityParser: + def parseFile(self, yaml_file): + yaml_str = "" + with open(yaml_file, "r") as f: + yaml_str = f.read() + return self.parse(yaml_str) + + def parse(self, yaml_str): + lines = [] + documents = [] + first = True + n_lines = 0 + for line in yaml_str.split("\n"): + n_lines += 1 + if line.startswith("---"): + if not first: + documents.append("\n".join(lines)) + lines = [] + first = False + lines.append(line) + if len(lines) > 0: + documents.append("\n".join(lines)) + lines = [] + print("Got {} documents out of {} lines".format(len(documents), n_lines), file=sys.stderr) + + # Divide the work evenly among the # of CPUs we have available. + n_threads = os.cpu_count() + window_size = int(math.ceil(len(documents) / n_threads)) + merge_window = [] + merged_documents = [] + for i in range(0, len(documents)): + if i > 0 and i % window_size == 0: + merged_documents.append("\n".join(merge_window)) + merge_window = [] + merge_window.append(documents[i]) + if len(merge_window) > 0: + merged_documents.append("\n".join(merge_window)) + merge_window = [] + documents = merged_documents + + mgr = mp.Manager() + + print("Spawning {} threads".format(len(documents)), file=sys.stderr) + threads = [] + for document in documents: + res = mgr.dict() + thread = mp.Process(target = self.parseOneSerial, args = (document, res,)) + threads.append((thread, res)) + thread.start() + + print("Joining threads", file=sys.stderr) + nodes = [] + for thread, res in threads: + thread.join() + nodes += res['nodes'] + + print("Creating animator", file=sys.stderr) + result = UnityAnimator() + result.addNodes(nodes) + + return result + + def parseOneSerial(self, document, res): + parser = UnityParser() + parser.parse(document) + res['nodes'] = parser.nodes + + def parseFile(self, yaml_file): + yaml_str = "" + with open(yaml_file, "r") as f: + yaml_str = f.read() + return self.parse(yaml_str) + +def getGuidMap(d): + result = {} + for f in os.scandir(d): + path = f.path + if f.is_dir(): + result.update(getGuidMap(path)) + if not f.is_file(): + continue + suffix = ".meta" + if path.endswith(suffix): + with open(path, "r") as f: + for line in f: + if line.startswith("guid"): + guid = line.split()[1] + result[guid] = path[:-len(suffix)] + return result + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("cmd", type=str, help="One of merge, guid_map, fix_write_defaults") + parser.add_argument("--fx0", type=str, help="The first animator to merge") + parser.add_argument("--fx1", type=str, help="The second animator to merge") + parser.add_argument("--project_root", type=str, help="The path to the " + + "Unity project Assets folder") + parser.add_argument("--save_to", type=str, help="The path to save the " + + "result of the computation") + parser.add_argument("--guid_map", type=str, help="Path to guid.map, " + + "generated by a previous call to `guid_map`") + parser.add_argument("--guid_map_append", type=bool, help="If set, " + + "append to GUID map instead of overwriting.") + args = parser.parse_args() + + if args.cmd == "merge": + if not args.fx0 or not args.fx1: + print("--fx0 and --fx1 required", file=sys.stderr) + parser.print_help() + parser.exit(1) + + print("Parsing {}".format(args.fx0), file=sys.stderr) + parser0 = MulticoreUnityParser() + anim0 = parser0.parseFile(args.fx0) + + arg1 = "TaSTT_fx.controller" + print("Parsing {}".format(args.fx1), file=sys.stderr) + parser1 = MulticoreUnityParser() + anim1 = parser1.parseFile(args.fx1) + + print("Merging animators", file=sys.stderr) + anim0.merge(anim1) + + print("Serializing", file=sys.stderr) + print(unityYamlToString(anim0.nodes)) + + elif args.cmd == "guid_map": + if not args.project_root or not args.save_to: + print("--project_root and --save_to required") + parser.print_help() + parser.exit(1) + + print("Looking up GUIDs under {}".format(args.project_root), + file=sys.stderr) + guid_map = getGuidMap(args.project_root) + if args.guid_map_append: + tmp_map = {} + with open(args.save_to, "rb") as f: + tmp_map = pickle.load(f) + # combine guid_map and tmp_map + guid_map = {**guid_map, **tmp_map} + print("Saving to {}".format(args.save_to), file=sys.stderr) + with open(args.save_to, 'wb') as f: + pickle.dump(guid_map, f) + elif args.cmd == "fix_write_defaults": + if not args.fx0 or not args.guid_map: + print("--fx0 and --guid_map required") + parser.print_help() + parser.exit(1) + + guid_map = {} + with open(args.guid_map, 'rb') as f: + guid_map = pickle.load(f) + + print("Parsing {}".format(args.fx0), file=sys.stderr) + parser0 = MulticoreUnityParser() + anim = parser0.parseFile(args.fx0) + + print("Fixing write defaults", file=sys.stderr) + anim_dir = "generated/animations/" + os.makedirs(anim_dir, exist_ok=True) + anim.fixWriteDefaults(guid_map, anim_dir + "TaSTT_Reset_Animation.anim") + print(str(anim)) + + elif args.cmd == "gen_off_anims": + if not args.fx0 or not args.guid_map: + print("--fx0 and --guid_map required") + parser.print_help() + parser.exit(1) + + guid_map = {} + with open(args.guid_map, 'rb') as f: + guid_map = pickle.load(f) + + print("Parsing {}".format(args.fx0), file=sys.stderr) + parser0 = MulticoreUnityParser() + anim = parser0.parseFile(args.fx0) + + print("Generating off animations", file=sys.stderr) + anim_dir = "generated/animations/" + os.makedirs(anim_dir, exist_ok=True) + anim.generateOffAnimations(guid_map, "generated/animations") + + elif args.cmd == "add_toggle": + if not args.fx0: + print("--fx0 required") + parser.print_help() + parser.exit(1) + + print("Parsing {}".format(args.fx0), file=sys.stderr) + parser0 = MulticoreUnityParser() + anim = parser0.parseFile(args.fx0) + + print("Adding toggle", file=sys.stderr) + anim.addTasttToggle("Animations/TaSTT_Toggle_Off.anim", + "Animations/TaSTT_Toggle_On.anim", "TaSTT_Toggle") + print(str(anim)) + + elif args.cmd == "fast_parse_test": + if not args.fx0: + print("--fx0 required") + parser.print_help() + parser.exit(1) + + print("Parsing {}".format(args.fx0), file=sys.stderr) + parser0 = MulticoreUnityParser() + anim = parser0.parseFile(args.fx0) + print(str(anim)) + + elif args.cmd == "set_noop_anim": + if not args.fx0 or not args.guid_map: + print("--fx0 and --guid_map required") + parser.print_help() + parser.exit(1) + + guid_map = {} + with open(args.guid_map, 'rb') as f: + guid_map = pickle.load(f) + + print("Parsing {}".format(args.fx0), file=sys.stderr) + parser = MulticoreUnityParser() + anim = parser.parseFile(args.fx0) + + anim.setNoopAnimations(guid_map, "Animations/TaSTT_Do_Nothing.anim") + + print(str(anim)) + + else: + print("Unrecognized command: {}".format(args.cmd)) + diff --git a/Scripts/obfuscate.py b/Scripts/obfuscate.py new file mode 100644 index 0000000..8d01e10 --- /dev/null +++ b/Scripts/obfuscate.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python3 + +# This module is used to implement obfuscation of TaSTT network +# speech data. At a high level, TaSTT is simply streaming N bits of +# arbitrary data to a shader via VRChat's parameter sync mechanism. +# +# It would be trivial to mine this data for speech information, since +# we're sending unicode (or ASCII) characters to peers. +# +# To raise the cost for the casual data collector, we can obfuscate +# this data using a one-time pad in cipher-block chaining mode. +# +# Making things interesting, encrypted data will arrive at the Unity +# animator, which processes them in 8 bit chunks. They are written +# into contiguous blocks of the animator. Thus the shader can decrypt +# the board by decrypting each block. This is thus stronger than +# applying a one-time pad to each byte of the plaintext, since the +# statistical distribution of individual letters is destroyed. +# Obviously due to the lack of an initialization vector, the +# distribution of phrases (blocks) is preserved. + +import math +import os + +def genKey(n_bits = 128) -> bytearray: + return os.urandom(int(n_bits / 8)) + +def saveKey(filename: str, key: str): + with open(filename, "wb") as f: + f.write(key) + +def loadKey(filename: str) -> bytearray: + with open(filename, "rb") as f: + return f.read() + +# Apply a symmetric cypher to `data` using cypher-block chaining. +def obfuscate(data: bytearray, key: bytearray) -> str: + n_blocks = int(math.ceil(len(data) / len(key))) + # This is a misnomer. A true IV would be randomized, but we can't + # do that since the shader doesn't have access to it. We just use + # this to implement the "chaining" aspect of CBC. + iv = bytearray(b'\x00') * len(key) + result = bytearray() + for i in range(0, n_blocks): + block_begin = i * len(key) + block_end = (i + 1) * len(key) + block_plain = data[block_begin:block_end] + block_cypher = block_plain.copy() + for i in range(0, len(block_cypher)): + block_cypher[i] ^= iv[i] + block_cypher[i] ^= key[i] + result += block_cypher + iv = block_cypher + return result + +def deobfuscate(data: bytearray, key: bytearray) -> str: + n_blocks = int(math.ceil(len(data) / len(key))) + # This is a misnomer. A true IV would be randomized, but we can't + # do that since the shader doesn't have access to it. We just use + # this to implement the "chaining" aspect of CBC. + iv = bytearray(b'\x00') * len(key) + result = bytearray() + for i in range(0, n_blocks): + block_begin = i * len(key) + block_end = (i + 1) * len(key) + block_cypher = data[block_begin:block_end] + block_plain = block_cypher.copy() + for i in range(0, len(block_plain)): + block_plain[i] ^= key[i] + block_plain[i] ^= iv[i] + result += block_plain + iv = block_cypher + return result + +def test(): + key = genKey() + saveKey("test.key", key) + new_key = loadKey("test.key") + os.remove("test.key") + assert(key == new_key) + + plaintext_original = "Lorem ipsum dolor sit amet, consectetur adipiscing elit." + plaintext_bytes = bytearray(plaintext_original, "utf-8") + cyphertext = obfuscate(plaintext_bytes, key) + assert(len(plaintext_bytes) == len(cyphertext)) + plaintext_recovered = deobfuscate(cyphertext, key).decode("utf-8") + assert(plaintext_original == plaintext_recovered) + assert(plaintext_bytes != cyphertext) + +if __name__ == "__main__": + test() + diff --git a/Scripts/osc_ctrl.py b/Scripts/osc_ctrl.py new file mode 100644 index 0000000..34d1a36 --- /dev/null +++ b/Scripts/osc_ctrl.py @@ -0,0 +1,355 @@ +#!/usr/bin/env python3 + +import argparse +import random +import time +import fileinput +import generate_utils + +# python3 -m pip install python-osc +# License: public domain. +from pythonosc import udp_client + +from math import ceil +from math import floor +from generate_utils import getLayerParam +from generate_utils import getSelectParam +from generate_utils import getEnableParam +from generate_utils import getBoardIndex +from generate_utils import NUM_LAYERS +from generate_utils import BOARD_ROWS +from generate_utils import BOARD_COLS + +import emotes + +# Based on a couple experiments, this seems like about as fast as we can go +# before players start losing events. +SYNC_FREQ_HZ = 5.0 +SYNC_DELAY_S = 1.0 / SYNC_FREQ_HZ + +def usage(): + print("python3 -m pip install python-osc") + print("python3 ./osc_ctrl.py") + +def getClient(ip = "127.0.0.1", port = 9000): + return udp_client.SimpleUDPClient(ip, port) + +class EvilGlobalState(): + # Mapping from ascii char to encoded byte. + encoding = {} +state = EvilGlobalState() + +# The characters in the TaSTT are all numbered from top left to bottom right. +# This function provides a mapping from letter ('a') to index (26). +def generateEncoding(): + encoding = {} + for i in range(0, 65535): + encoding[chr(i)] = (i % 256, int(i / 256)) + return encoding +state.encoding = generateEncoding() + +# Encodes a list of lines into the character set used by the board. +# Pads lines with spaces and adds lines so that the total number of +# lines sent is a multiple of the number of rows in the board. +def encodeMessage(lines): + result = [] + lines_tmp = lines + [" "] * ((BOARD_ROWS - len(lines)) % BOARD_ROWS) + for line in lines_tmp: + first_word = True + for word in line.split(): + if first_word: + first_word = False + else: + result.append(state.encoding[' ']) + + emote_word, emote_word_idx = emotes.lookup(word) + if emote_word: + + word_align = 0 + if len(result) > 0: + word_align = (6 - len(result) % 6) % 6 + word = ' ' * word_align + word + + for i in range(0, word_align): + result.append(state.encoding[' ']) + + for i in range(0, 6): + result.append((emote_word_idx, 0xE0)) + continue + + for char in word: + if not char in state.encoding: + print("skip unrecognized char {}".format(char)) + continue + result.append(state.encoding[char]) + result += [state.encoding[' ']] * (BOARD_COLS - len(line)) + return result + +def updateCell(client, cell_idx, letter_encoded): + for byte in range(0, generate_utils.BYTES_PER_CHAR): + addr="/avatar/parameters/" + generate_utils.getBlendParam(cell_idx, byte) + letter_remapped = (-127.5 + letter_encoded[byte]) / 127.5 + client.send_message(addr, letter_remapped) + +def enable(client): + addr="/avatar/parameters/" + getEnableParam() + client.send_message(addr, True) + +def disable(client): + addr="/avatar/parameters/" + getEnableParam() + client.send_message(addr, False) + +# Send a cell all at once. +# `which_cell` is an integer in the range [0,NUM_REGIONS) +def sendMessageCellDiscrete(client, msg_cell, which_cell): + empty_cell = [state.encoding[' ']] * NUM_LAYERS + + if msg_cell != empty_cell: + addr="/avatar/parameters/" + generate_utils.getSpeechNoiseToggleParam() + client.send_message(addr, True) + + # Really long messages just wrap back around. + which_cell = (which_cell % generate_utils.NUM_REGIONS) + + enable(client) + + # Seek to the current cell. + addr="/avatar/parameters/" + getSelectParam() + client.send_message(addr, which_cell) + + # Update each letter + for i in range(0, len(msg_cell)): + updateCell(client, i, msg_cell[i]) + + # Wait for sync. + time.sleep(SYNC_DELAY_S) + + if msg_cell != empty_cell: + addr="/avatar/parameters/" + generate_utils.getSpeechNoiseToggleParam() + client.send_message(addr, False) + +# The board is broken down into contiguous collections of characters called +# cells. Each cell contains `NUM_LAYERS` characters. We can update one cell +# every ~1.0 seconds. Going faster causes the board to display garbage to +# remote players. +def splitMessage(msg): + lines = [] + line = "" + for word in msg.split(): + # Hack: if the word is an emote, we make it 6 characters wide, then + # encode it differently later on. + emote_word, emote_word_idx = emotes.lookup(word) + if emote_word: + word = word.ljust(6) + # Due to some fuckery I do in the shader, emotes have to be rendered on + # a 6-character boundary. So pad the word on the left with spaces + # to get it onto that boundary. + word_align = 0 + if len(line) > 0: + word_align = (6 - (len(line) + 1) % 6) % 6 + print("len line: {}".format(len(line))) + print("word align: {}".format(word_align)) + word = ' ' * word_align + word + + while len(word) > BOARD_COLS: + if len(line) != 0: + lines.append(line) + line = "" + + word_prefix = word[0:BOARD_COLS-1] + "-" + word_suffix = word[BOARD_COLS-1:] + #print("append prefix {}".format(word_prefix)) + lines.append(word_prefix) + word = word_suffix + + if len(line) == 0: + line = word + continue + + if len(line) + len(" ") + len(word) <= BOARD_COLS: + line += " " + word + continue + + #print("append line {}".format(line)) + lines.append(line) + line = word + + if len(line) > 0: + lines.append(line) + + return lines + +class OscTxState: + # The message last sent to the board. + last_msg_encoded = [] + empty_cells_to_send_per_call = 1 + nonempty_cells_to_send_per_call = 1 + + # 0 indicates it's closed. 1 indicates half size. 2 indicates full size. + board_size = 0 + +def resizeBoard(num_lines, tx_state, shrink_only): + + resize_params = [] + + resize_param0 = None + resize_param1 = None + + if num_lines > BOARD_ROWS / 2: + # Board must be expanded to full size. + if shrink_only: + return + + if tx_state.board_size == 2: + return + elif tx_state.board_size == 1: + resize_params.append((False, True)) + else: + resize_params.append((False, False)) + resize_params.append((False, True)) + tx_state.board_size = 2 + elif num_lines == 0: + if not shrink_only: + return + # Board must be shrunk to 0 size + if tx_state.board_size == 0: + return + elif tx_state.board_size == 1: + resize_params.append((True, True)) + else: + resize_params.append((True, False)) + resize_params.append((True, True)) + tx_state.board_size = 0 + else: + # Board must be expanded or shrunk to half size. + if tx_state.board_size == 0: + if shrink_only: + return + resize_params.append((False, False)) + elif tx_state.board_size == 1: + return + else: + if not shrink_only: + return + resize_params.append((True, False)) + tx_state.board_size = 1 + + for resize_param_pair in resize_params: + print("Resizing board... "), + addr="/avatar/parameters/" + generate_utils.getResize0Param() + client.send_message(addr, resize_param_pair[0]) + addr="/avatar/parameters/" + generate_utils.getResize1Param() + client.send_message(addr, resize_param_pair[1]) + + time.sleep(0.25) + + addr="/avatar/parameters/" + generate_utils.getResizeEnableParam() + client.send_message(addr, True) + + # The animation is 0.5 seconds, with another 0.5 second buffer after. We + # want to stop in that buffer. + time.sleep(0.5) + + addr="/avatar/parameters/" + generate_utils.getResizeEnableParam() + client.send_message(addr, False) + + # Wait a while for the animation to complete. + time.sleep(1) + print("done") + + +# Send a message to the board, but only overwrite cells that we know need to +# change. +# This may take multiple calls to complete. +# Returns 3 possible values: +# 0: Done sending. +# 1: Exhausted empty cell budget. +# 2: Exhausted nonempty cell budget. +SEND_MSG_LAZY_DONE = 0 +SEND_MSG_LAZY_SENT_EMPTY = 1 +SEND_MSG_LAZY_SENT_NON_EMPTY = 2 +def sendMessageLazy(client, msg, tx_state): + lines = splitMessage(msg) + msg_encoded = encodeMessage(lines) + msg_encoded_len = len(msg_encoded) + + empty_cells_sent = 0 + nonempty_cells_sent = 0 + n_cells = ceil(msg_encoded_len / NUM_LAYERS) + for cell in range(0, n_cells): + cell_begin = cell * NUM_LAYERS + cell_end = (cell + 1) * NUM_LAYERS + cell_msg = msg_encoded[cell_begin:cell_end] + last_cell_msg = [] + + # Skip cells we've already sent. This makes the board much more + # responsive. + if cell_end <= len(tx_state.last_msg_encoded): + last_cell_msg = tx_state.last_msg_encoded[cell_begin:cell_end] + if cell_msg == last_cell_msg: + continue + + if cell_msg == [state.encoding[' ']] * NUM_LAYERS: + if empty_cells_sent >= tx_state.empty_cells_to_send_per_call: + return SEND_MSG_LAZY_SENT_EMPTY + empty_cells_sent += 1 + else: + if nonempty_cells_sent >= tx_state.nonempty_cells_to_send_per_call: + return SEND_MSG_LAZY_SENT_NON_EMPTY + nonempty_cells_sent += 1 + + sendMessageCellDiscrete(client, cell_msg, cell) + # Pad last msg encoded to the end of the array + tx_state.last_msg_encoded += [state.encoding[" "]] * (cell_end - + len(tx_state.last_msg_encoded)) + tx_state.last_msg_encoded[cell_begin:cell_end] = cell_msg + + #resizeBoard(len(lines), tx_state, shrink_only=True) + return SEND_MSG_LAZY_DONE + +def sendRawMessage(client, msg): + n_cells = ceil(len(msg) / NUM_LAYERS) + for cell in range(0, n_cells): + cell_begin = cell * NUM_LAYERS + cell_end = (cell + 1) * NUM_LAYERS + cell_msg = msg[cell_begin:cell_end] + #print("Send cell {}".format(cell)) + sendMessageCellDiscrete(client, cell_msg, cell) + +def clear(client, tx_state): + disable(client) + + addr="/avatar/parameters/" + generate_utils.getClearBoardParam() + client.send_message(addr, True) + + time.sleep(SYNC_DELAY_S) + + addr="/avatar/parameters/" + generate_utils.getClearBoardParam() + client.send_message(addr, False) + + tx_state.last_msg_encoded = [] + +def indicateSpeech(client, is_speaking: bool): + addr = "/avatar/parameters/" + generate_utils.getIndicator0Param() + client.send_message(addr, is_speaking) + +def indicatePaging(client, is_paging: bool): + addr = "/avatar/parameters/" + generate_utils.getIndicator1Param() + client.send_message(addr, is_paging) + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("-i", default="127.0.0.1", help="OSC server IP") + parser.add_argument("-p", type=int, default=9000, help="OSC server port") + args = parser.parse_args() + + client = getClient(args.i, args.p) + + state.encoding = generateEncoding() + + tx_state = OscTxState() + for line in fileinput.input(): + while sendMessageLazy(client, line, tx_state) != SEND_MSG_LAZY_DONE: + continue + clear(client, tx_state) + diff --git a/Scripts/steamvr.py b/Scripts/steamvr.py new file mode 100644 index 0000000..ed4150c --- /dev/null +++ b/Scripts/steamvr.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python3 + +# python3 -m pip install openvr +# License: BSD-3.0 (requires showing notice in binary distributions) +import openvr as vr +import time + +EVENT_NONE = 0 +EVENT_RISING_EDGE = 1 +EVENT_FALLING_EDGE = 2 + +class SessionState: + def __init__(self): + self.system = vr.init(vr.VRApplication_Background) + self.last_packet = 0 + # Whether the configured input event is high or low. + self.event_high = False + +# Checks if the given button on the given controller is pressed. +# Defaults to joystick click / left hand. +# Returns three values: +# 0 - button not pressed +# 1 - button rising edge +# 2 - button falling edge +def pollButtonPress( + session_state: SessionState, + controller: vr.ETrackedControllerRole = vr.TrackedControllerRole_LeftHand, + button: vr.EVRButtonId = vr.k_EButton_Axis0 + ) -> int: + lh_idx = session_state.system.getTrackedDeviceIndexForControllerRole(vr.TrackedControllerRole_LeftHand) + #print("Left hand device idx: {}".format(lh_idx)) + + got_state, state = session_state.system.getControllerState(lh_idx) + if not got_state: + return EVENT_NONE + + if state.unPacketNum == session_state.last_packet: + return EVENT_NONE + + # Clicking joysticks and moving joysticks fire the same events. To + # differentiate movement from clicking, we create a dead zone: if the event + # fires while the stick isn't moved far from center, we assume it's a + # click, not movement. + dead_zone_radius = 0.5 + + # This is the ID of event for the joystick being clicked. + joy_click = vr.k_EButton_Axis0 + joy_click_mask = (1 << joy_click) + ret = EVENT_NONE + if (state.ulButtonPressed & joy_click_mask) != 0 and\ + (state.rAxis[0].x**2 + state.rAxis[0].y**2 < dead_zone_radius**2): + #print("button pressed: %016x" % state.ulButtonPressed) + #for i in range(0, 5): + # print("axis {} x: {} y: {}".format(i, state.rAxis[i].x, state.rAxis[i].y)) + if not session_state.event_high: + ret = EVENT_RISING_EDGE + session_state.event_high = True + elif session_state.event_high: + session_state.event_high = False + ret = EVENT_FALLING_EDGE + return ret + +if __name__ == "__main__": + session_state = SessionState() + while True: + time.sleep(0.1) + + event = pollButtonPress(session_state) + if event == EVENT_RISING_EDGE: + print("rising edge") + elif event == EVENT_FALLING_EDGE: + print("falling edge") + diff --git a/Scripts/string_matcher.py b/Scripts/string_matcher.py new file mode 100644 index 0000000..461f180 --- /dev/null +++ b/Scripts/string_matcher.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python3 + +# python3 -m pip install editdistance +# License: MIT. +import editdistance + +import typing + +DEBUG = False + +# Find the window where the distance between these two transcriptions is +# minimized and use it to stitch them together. +def matchStringList(old_words: typing.List[str], + new_words: typing.List[str], window_size = 6) -> str: + if old_words == new_words: + return " ".join(old_words) + elif len(old_words) >= window_size and len(new_words) >= window_size: + # Find the window where the cumulative string distance + # between the words in that window in the old/new transcription + # is minimized. + old_slice = old_words[len(old_words) - window_size:] + + best_match_i = None + best_match_d = window_size * 1000 + + for i in range(0, 1 + len(new_words) - window_size): + new_slice = new_words[i:i + window_size] + cur_d = 0 + for j in range(0, window_size): + cur_d += editdistance.eval(old_slice[j], new_slice[j]) + if cur_d < best_match_d: + best_match_i = i + best_match_d = cur_d + + old_prefix = old_words[0:len(old_words) - window_size] + overlap = new_words[best_match_i:best_match_i + window_size] + new_suffix = new_words[best_match_i + window_size:] + + #print("Best match i: {}".format(best_match_i)) + #print("Window size: {}".format(window_size)) + #print("Old prefix: {}".format(old_prefix)) + #print("Overlap: {}".format(overlap)) + #print("New suffix: {}".format(new_suffix)) + return " ".join(old_prefix + new_words[best_match_i:]) + else: + return " ".join(new_words) + +def matchSpaceDelimitedStrings(old_text: str, new_text: str, window_size = 4) -> str: + old_words = old_text.split() + new_words = new_text.split() + return matchStringList(old_words, new_words, window_size) + +def matchStrings(old_text: str, new_text: str, window_size = 3) -> str: + if old_text == new_text: + return old_text + elif len(old_text) >= window_size and len(new_text) >= window_size: + # Find the window where the cumulative string distance + # between the text in that window in the old/new transcription + # is minimized. + + best_match_i = None + best_match_j = None + best_match_d = window_size * 1000 + + # The number of old slices to look at. Since the old text can grow + # unboundedly, it's crucial that we don't compare to every possible + # slice in the old and new transcriptions (O(N^2) time complexity). + # This is still wildly inefficient, but good enough for continuous + # transcription in a game bound by a single CPU core, like VRChat. + max_old_slices = 300 + old_n_slices = min(max_old_slices, len(old_text)) + last_old_window = len(old_text) - window_size + first_old_window = max(last_old_window - old_n_slices, 0) + + for i in range(first_old_window, last_old_window + 1): + old_slice = old_text[i:i + window_size] + + for j in range(0, 1 + len(new_text) - window_size): + new_slice = new_text[j:j + window_size] + cur_d = editdistance.eval(old_slice, new_slice) + if cur_d < best_match_d: + best_match_i = i + best_match_j = j + best_match_d = cur_d + + if DEBUG: + print("optimum at old '{}' i={} new '{}' j={} d={}".format( + old_slice, i, new_slice, j, cur_d)) + + old_prefix = old_text[0:best_match_i] + overlap = new_text[best_match_j:best_match_j + window_size] + new_suffix = new_text[best_match_j + window_size:] + + if DEBUG: + print("Best match i: {}".format(best_match_i)) + print("Best match j: {}".format(best_match_j)) + print("Window size: {}".format(window_size)) + print("Old prefix: {}".format(old_prefix)) + print("Overlap: {}".format(overlap)) + print("New suffix: {}".format(new_suffix)) + print("Input 1: {}".format(old_text)) + print("Input 2: {}".format(new_text)) + print("Output: {}".format(old_prefix + + new_text[best_match_j:])) + return old_prefix + new_text[best_match_j:] + else: + return new_text + +if __name__ == "__main__": + # Identical transcriptions should not be changed. + assert(matchSpaceDelimitedStrings("This is a test case.", "This is a test case.", window_size = 3) == "This is a test case.") + # A suffix should be detected and ignored. + assert(matchSpaceDelimitedStrings("This is a test case.", "is a test case.", window_size = 3) == "This is a test case.") + # A lengthening suffix should be correctly appended. + assert(matchSpaceDelimitedStrings("This is a test", "is a test case.", window_size = 3) == "This is a test case.") + # A strictly longer transcription should override the old prefix. + assert(matchSpaceDelimitedStrings("This is a test", "This is a test case.", window_size = 3) == "This is a test case.") + # Paranoia: repetitive text broke the older implementation, so I included + # some test cases without fully understanding what the old problem was. + assert(matchSpaceDelimitedStrings("test test test", "test test test test test test", window_size + = 3) == "test test test test test test") + assert(matchSpaceDelimitedStrings("test test test test test test", "test test test", window_size + = 3) == "test test test test test test") + + print(matchStrings("foo bar", "bar baz")) + print(matchStrings("alpha beta", "beta gamma")) + + in1 = "Okay, what about now? Looks like it sort of works. Key word being sort of." + in2 = "okay what about now looks like it sort of works key word being sort of looks" + bad_out = "Okay, what about now? Looks like it sort of works. Key word being sort of works key word being sort of looks" + good_out = "Okay what about now looks like it sort of works key word being sort of looks" + good_out = "Okay, what about now? Looks like it sort of works. Key word being sort of looks" + print(matchStrings(in1, in2)) + assert(matchStrings(in1, in2) == good_out) + + in1 = "This repository can take" + in2 = "This repository contains the code for" + bad_out = "This repository can tode for" + good_out = "This repository contains the code for" + assert(matchStrings(in1, in2) == good_out) + + in1 = "See something." + in2 = "See something. Say something." + bad_out = in1 + good_out = in2 + print(matchStrings(in1, in2)) + assert(matchStrings(in1, in2) == bad_out) + + in1 = "a" * 1000 + in2 = "b" * 10 * 1000 + # This should be fast (< 1 second) + #matchStrings(in1, in2) + + print("Tests passed.") + diff --git a/Scripts/transcribe.py b/Scripts/transcribe.py new file mode 100644 index 0000000..62e6add --- /dev/null +++ b/Scripts/transcribe.py @@ -0,0 +1,353 @@ +#!/usr/bin/env python3 + +import argparse +import copy +from datetime import datetime +import os +import osc_ctrl +# python3 -m pip install pydub +# License: MIT. +from pydub import AudioSegment as pydub_AudioSegment +from pydub import effects as pydub_effects +# python3 -m pip install pyaudio +# License: MIT. +import pyaudio +import numpy as np +# python3 -m pip install playsound==1.2.2 +# License: MIT. +from playsound import playsound +import steamvr +import string_matcher +import sys +import threading +import time +import wave +# python3 -m pip install git+https://github.com/openai/whisper.git +# python3 -m pip install torch -f https://download.pytorch.org/whl/torch_stable.html +# License: MIT. +import whisper + +class AudioState: + CHUNK = 1024 + FORMAT = pyaudio.paInt16 + CHANNELS = 1 + # This matches the framerate expected by whisper. + RATE = 16000 + + # The maximum length that recordAudio() will put into frames before it + # starts dropping from the start. + MAX_LENGTH_S = 10 + MAX_LENGTH_S_WHISPER = 30 + # The minimum length that recordAudio() will wait for before saving audio. + MIN_LENGTH_S = 1 + + # PyAudio object + p = None + + # PyAudio stream object + stream = None + + text = "" + committed_text = "" + frames = [] + + # Locks access to `text`. + transcribe_lock = threading.Lock() + + # Locks access to `frames`, and audio stored on disk. + audio_lock = threading.Lock() + + # Used to tell the threads when to stop. + run_app = True + + transcribe_sleep_duration_min_s = 0.05 + transcribe_sleep_duration_max_s = 1.50 + transcribe_no_change_count = 0 + transcribe_sleep_duration = transcribe_sleep_duration_min_s + + tx_state = osc_ctrl.OscTxState() + + # The transcription thread transcribes without holding locks, then + # blocks on it. Thus we need some way to tell the transcription + # thread to drop that transcription. + drop_transcription = False + + # The language the user is speaking in. Default is English but user may set + # this to whatever they want. + language = whisper.tokenizer.TO_LANGUAGE_CODE["english"] + + audio_paused = False + + osc_client = osc_ctrl.getClient() + +def getMicStream(which_mic): + audio_state = AudioState() + audio_state.p = pyaudio.PyAudio() + + print("Finding index mic...") + got_match = False + device_index = -1 + focusrite_str = "Focusrite" + index_str = "Digital Audio Interface" + if which_mic == "index": + target_str = index_str + elif which_mic == "focusrite": + target_str = focusrite_str + else: + raise Exception("Unrecognized mic requested: {}".format(which_mic)) + while got_match == False: + info = audio_state.p.get_host_api_info_by_index(0) + numdevices = info.get('deviceCount') + + for i in range(0, numdevices): + if (audio_state.p.get_device_info_by_host_api_device_index(0, i).get('maxInputChannels')) > 0: + device_name = audio_state.p.get_device_info_by_host_api_device_index(0, i).get('name') + print("Input Device id ", i, " - ", device_name) + if target_str in device_name: + print("Got match: {}".format(device_name)) + device_index = i + got_match = True + break + if got_match == False: + print("No match, sleeping") + time.sleep(3) + + audio_state.stream = audio_state.p.open(format=audio_state.FORMAT, + channels=audio_state.CHANNELS, rate=audio_state.RATE, + input=True, frames_per_buffer=audio_state.CHUNK, + input_device_index=device_index) + + return audio_state + +# Continuously records audio as long as audio_state.run_app is set. +def recordAudio(audio_state): + print("Recording audio") + while audio_state.run_app: + data = audio_state.stream.read(audio_state.CHUNK) + + if audio_state.audio_paused: + time.sleep(0.1) + continue + + audio_state.frames.append(data) + max_frames = int(audio_state.RATE * audio_state.MAX_LENGTH_S / audio_state.CHUNK) + if len(audio_state.frames) > max_frames: + audio_state.frames = audio_state.frames[-1 * max_frames :] + + print("Done recording") + +def resetAudioLocked(audio_state): + audio_state.frames = [] + audio_state.transcribe_no_change_count = 0 + audio_state.transcribe_sleep_duration = \ + audio_state.transcribe_sleep_duration_min_s + + audio_state.committed_text = "" + audio_state.text = "" + +def resetDisplayLocked(audio_state): + osc_ctrl.clear(audio_state.osc_client, audio_state.tx_state) + +def resetAudio(audio_state): + audio_state.transcribe_lock.acquire() + audio_state.audio_lock.acquire() + resetAudioLocked(audio_state) + audio_state.audio_lock.release() + audio_state.transcribe_lock.release() + +# Transcribe the audio recorded in a file. +def transcribe(audio_state, model, frames): + + start_time = time.time() + + frames = audio_state.frames + # Convert from signed 16-bit int [-32768, 32767] to signed 16-bit float on + # [-1, 1]. + # We should technically acquire a lock to protect frames, but this is + # really slow and in practice it doesn't make the app crash, so who cares. + frames = np.asarray(audio_state.frames) + audio = np.frombuffer(frames, np.int16).flatten().astype(np.float32) / 32768.0 + + audio = whisper.pad_or_trim(audio, length = audio_state.RATE * + audio_state.MAX_LENGTH_S_WHISPER) + + mel = whisper.log_mel_spectrogram(audio).to(model.device) + + result = None + #for temp in (0.00, 0.05, 0.10, 0.15, 0.20): + #for temp in (0.00, 0.05): + for temp in (0.00,): + print("temp: {}".format(temp)) + options = whisper.DecodingOptions(language = audio_state.language, + beam_size = 5, temperature = temp, without_timestamps = True) + result = whisper.decode(model, mel, options) + + if result.avg_logprob < -1.0: + print("avg logprob: {}".format(result.avg_logprob)) + result = None + continue + + if result.compression_ratio > 2.4: + print("compression ratio: {}".format(result.compression_ratio)) + result = None + continue + + if result.no_speech_prob > 0.60: + print("no speech prob: {}".format(result.no_speech_prob)) + result = None + continue + + result = result.text + break + + return result + +def transcribeAudio(audio_state, model): + last_transcribe_time = time.time() + while audio_state.run_app == True: + # Pace this out + time.sleep(audio_state.transcribe_sleep_duration) + + # Increase sleep time. Code below will set sleep time back to minimum + # if a change is detected. + if audio_state.transcribe_no_change_count < 10: + audio_state.transcribe_no_change_count += 1 + longer_sleep_dur = audio_state.transcribe_sleep_duration + longer_sleep_dur += audio_state.transcribe_sleep_duration_min_s * (1.3**audio_state.transcribe_no_change_count) + audio_state.transcribe_sleep_duration = min( + audio_state.transcribe_sleep_duration_max_s, + longer_sleep_dur) + + text = transcribe(audio_state, model, audio_state.frames) + if not text: + print("no transcription, spin ({} seconds)".format(time.time() - last_transcribe_time)) + last_transcribe_time = time.time() + continue + + if audio_state.drop_transcription: + audio_state.drop_transcription = False + print("drop transcription ({} seconds)".format(time.time() - last_transcribe_time)) + last_transcribe_time = time.time() + continue + + words = ''.join(c for c in text.lower() if (c.isalpha() or c == " ")).split() + + now = time.time() + print("Transcription ({} seconds): {}".format( + now - last_transcribe_time, + audio_state.text)) + last_transcribe_time = now + + old_text = audio_state.text + + audio_state.text = string_matcher.matchStrings(audio_state.text, + text, window_size = 30) + if old_text != audio_state.text: + # We think the user said something, so reset the amount of + # time we sleep between transcriptions to the minimum. + audio_state.transcribe_no_change_count = 0 + audio_state.transcribe_sleep_duration = audio_state.transcribe_sleep_duration_min_s + +def sendAudio(audio_state): + while audio_state.run_app == True: + text = audio_state.committed_text + " " + audio_state.text + ret = osc_ctrl.sendMessageLazy(audio_state.osc_client, text, + audio_state.tx_state) + is_paging = (ret == osc_ctrl.SEND_MSG_LAZY_SENT_NON_EMPTY) + osc_ctrl.indicatePaging(audio_state.osc_client, is_paging) + + # Pace this out + time.sleep(0.01) + +def readControllerInput(audio_state): + session = steamvr.SessionState() + RECORD_STATE = 0 + PAUSE_STATE = 1 + state = PAUSE_STATE + osc_ctrl.indicateSpeech(audio_state.osc_client, False) + osc_ctrl.indicatePaging(audio_state.osc_client, False) + while audio_state.run_app == True: + time.sleep(0.05) + + event = steamvr.pollButtonPress(session) + + if event == steamvr.EVENT_RISING_EDGE: + print("event get") + if state == RECORD_STATE: + state = PAUSE_STATE + osc_ctrl.indicateSpeech(audio_state.osc_client, False) + playsound(os.path.abspath("Sounds/Noise_Off.wav")) + + audio_state.audio_paused = True + elif state == PAUSE_STATE: + state = RECORD_STATE + osc_ctrl.indicateSpeech(audio_state.osc_client, True) + playsound(os.path.abspath("Sounds/Noise_On.wav")) + + resetAudioLocked(audio_state) + resetDisplayLocked(audio_state) + audio_state.drop_transcription = True + audio_state.audio_paused = False + +def transcribeLoop(mic: str, language: str): + audio_state = getMicStream(mic) + audio_state.language = whisper.tokenizer.TO_LANGUAGE_CODE[language] + + record_audio_thd = threading.Thread(target = recordAudio, args = [audio_state]) + record_audio_thd.daemon = True + record_audio_thd.start() + + print("Safe to start talking") + + #model = whisper.load_model("tiny") + #model = whisper.load_model("base") + model = whisper.load_model("small") + #model = whisper.load_model("medium") + + transcribe_audio_thd = threading.Thread(target = transcribeAudio, args = [audio_state, model]) + transcribe_audio_thd.daemon = True + transcribe_audio_thd.start() + + send_audio_thd = threading.Thread(target = sendAudio, args = [audio_state]) + send_audio_thd.daemon = True + send_audio_thd.start() + + controller_input_thd = threading.Thread(target = readControllerInput, args = [audio_state]) + controller_input_thd.daemon = True + controller_input_thd.start() + + print("Press enter to start a new message.") + for line in sys.stdin: + audio_state.transcribe_lock.acquire() + audio_state.audio_lock.acquire() + resetAudioLocked(audio_state) + resetDisplayLocked(audio_state) + audio_state.drop_transcription = True + audio_state.audio_paused = False + audio_state.audio_lock.release() + audio_state.transcribe_lock.release() + if "exit" in line or "quit" in line: + break + + print("Joining threads") + audio_state.run_app = False + audio_state.run_app = False + record_audio_thd.join() + transcribe_audio_thd.join() + controller_input_thd.join() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--mic", type=str, help="Which mic to use. Options: index, focusrite. Default: index") + parser.add_argument("--language", type=str, help="Which language to use. Ex: english, japanese, chinese, french, german.") + args = parser.parse_args() + + if not args.mic: + args.mic = "index" + + if not args.language: + args.language = "english" + + transcribeLoop(args.mic, args.language) + diff --git a/emotes.py b/emotes.py deleted file mode 100644 index b922fdf..0000000 --- a/emotes.py +++ /dev/null @@ -1,130 +0,0 @@ -#!/usr/bin/env python3 - -import argparse -from math import floor -import os -# python3 -m pip install pillow -from PIL import Image -import sys - -# (row, col) -TEX_SZ = (2048, 2048) - -IMG_SZ_PX = 256 -IMG_PER_ROW = int(TEX_SZ[0] / IMG_SZ_PX) -IMG_PER_COL = int(TEX_SZ[1] / IMG_SZ_PX) - -# TODO(yum) this should live in a config file. -# Note: the name of the emote must be no longer than 6 characters. -IMG_TEX_DATA = [] -IMG_TEX_DATA.append(("Images/Emotes/xdd.png", "xdd")) -IMG_TEX_DATA.append(("Images/Emotes/pog.png", "pog")) -IMG_TEX_DATA.append(("Images/Emotes/lulw.png", "laugh")) -IMG_TEX_DATA.append(("Images/Emotes/bighardo.png", "hard")) -IMG_TEX_DATA.append(("Images/Emotes/peepoHappy.png", "happy")) -IMG_TEX_DATA.append(("Images/Emotes/peepoSad.png", "sad")) -IMG_TEX_DATA.append(("Images/Emotes/bedge.png", "bed")) -IMG_TEX_DATA.append(("Images/Emotes/reallymad.png", "mad")) -IMG_TEX_DATA.append(("Images/Emotes/clueless.png", "surely")) -IMG_TEX_DATA.append(("Images/Emotes/what.png", "what")) -IMG_TEX_DATA.append(("Images/Emotes/based.png", "based")) -IMG_TEX_DATA.append(("Images/Emotes/chad.png", "chad")) -IMG_TEX_DATA.append(("Images/Emotes/aware.png", "aware")) -IMG_TEX_DATA.append(("Images/Emotes/girl.png", "girl")) - -IMG_TEX_KEYWORD_TO_COORD = {} -for i in range(0, len(IMG_TEX_DATA)): - IMG_TEX_KEYWORD_TO_COORD[IMG_TEX_DATA[i][1]] = i - -# We treat images like words. To keep things simple, they're the same height as -# a word, and they're a fixed width. -IMG_SZ_LETTER_ROWS = 1 -IMG_SZ_LETTER_COLS = 6 - -def lookup(word: str): - word = word.lower() - word = ''.join(c for c in word.lower() if c.isalpha()) - if word in IMG_TEX_KEYWORD_TO_COORD.keys(): - return word, IMG_TEX_KEYWORD_TO_COORD[word] - return None, None - -def openTexture(tex_path: str): - if not os.path.exists(args.texture_path): - return Image.new("RGB", TEX_SZ) - tex = Image.open(args.texture_path) - if tex.size != TEX_SZ: - print("Texture at {} has mismatching size {}, creating new texture".format( - tex_path, tex.size), file=sys.stderr) - return Image.new("RGB", TEX_SZ) - return tex - -# Add an image to the texture at the coordinates (x, y). x and y should be in -# the range [0, IMG_PER_COL) and [0, IMG_PER_ROW) respectively. -def addImageToTexture(tex: Image, img_path: str, x: int, y:int): - # Transparent images will be composited on top of a black background. - img = Image.open(img_path).convert('RGBA') - img_bg = Image.new('RGBA', img.size, (0, 0, 0)) - img = Image.alpha_composite(img_bg, img).convert('RGB') - - max_px = IMG_SZ_PX - - # Scale the image up so it uses as much space as is given to it. - # I originally planned to support multiple scales, but this proved to be - # too much work - getting line wrapping to work with this would be a pain. - # So for now, all images are the same height as words. - scale = 1 - img_x, img_y = img.size - max_dim = max(img_x, img_y) - img_scale = (max_px / max_dim) * scale - new_sz = (int(floor(img.size[0] * img_scale)), - int(floor(img.size[1] * img_scale))) - print("Add image {}".format(img_path)) - print(" Original size: {}".format(img.size)) - print(" Scaled size: {}".format(new_sz)) - img = img.resize(new_sz) - - # Center the image within its new coordinate space. - padded_img_sz = (IMG_SZ_PX * scale, IMG_SZ_PX * scale) - padded_img = Image.new("RGB", padded_img_sz) - centered_x = int(floor((padded_img_sz[0] - new_sz[0]) / 2)) - centered_y = int(floor((padded_img_sz[1] - new_sz[1]) / 2)) - padded_img.paste(img, box=(centered_x, centered_y)) - img = padded_img - - # Break the image into tiles and write them into the texture. - for slot in range(0, scale * scale): - tile_x = slot % scale - tile_y = int(floor(slot / scale)) - tile_bbox = (tile_x * IMG_SZ_PX, tile_y * IMG_SZ_PX, (tile_x + 1) * IMG_SZ_PX, (tile_y + 1) * IMG_SZ_PX) - tile = img.crop(tile_bbox) - print(" tile {},{} (bbox={})".format(tile_x, tile_y, tile_bbox)) - - slot_x = x + slot % IMG_PER_ROW - slot_y = y + int(floor(slot / IMG_PER_ROW)) - slot_x_px = slot_x * IMG_SZ_PX - slot_y_px = slot_y * IMG_SZ_PX - print(" Add img at {},{} (px {},{})".format(slot_x, slot_y, slot_x_px, slot_y_px)) - - tex.paste(tile, box=(slot_x_px, slot_y_px)) - -def parseArgs(): - parser = argparse.ArgumentParser() - parser.add_argument("--texture_path", type=str, help="Path to save the generated texture.") - args = parser.parse_args() - - if not args.texture_path: - args.texture_path = "img_texture.png" - - return args - -if __name__ == "__main__": - args = parseArgs() - - tex = openTexture(args.texture_path) - for i in range(0, len(IMG_TEX_DATA)): - filename = IMG_TEX_DATA[i][0] - x = i % IMG_PER_ROW - y = int(floor(i / IMG_PER_ROW)) - addImageToTexture(tex, filename, x, y) - tex.save(args.texture_path) - diff --git a/generate_fonts.py b/generate_fonts.py deleted file mode 100644 index ef5bfc5..0000000 --- a/generate_fonts.py +++ /dev/null @@ -1,147 +0,0 @@ -#!/usr/bin/env python3 - -# python3 -m pip install pillow -# License: HPND license. -from PIL import Image, ImageFont, ImageDraw - -import math - -# Use a power of 2 pixels per character so we can evenly divide the plane. -font_pixels = 128 -full_ratio = 0.75 -half_ratio = 0.5 -full_sz = int(font_pixels * full_ratio) -half_sz = int(font_pixels * half_ratio) -layout_engine = ImageFont.Layout.BASIC - -unifont = ImageFont.truetype("Fonts/unifont-15.0.01.ttf", full_sz, layout_engine=layout_engine) -unifont_half = ImageFont.truetype("Fonts/unifont-15.0.01.ttf", half_sz, layout_engine=layout_engine) - -noto_sans_mono = ImageFont.truetype("Fonts/Noto_Sans_Mono/NotoSansMono-VariableFont_wdth,wght.ttf", full_sz, layout_engine=layout_engine) - -noto_sans_sc_half = ImageFont.truetype("Fonts/Noto_Sans_Simplified_Chinese/NotoSansSC-Regular.otf", half_sz, layout_engine=layout_engine) - -noto_sans_kr_half = ImageFont.truetype("Fonts/Noto_Sans_Korean/NotoSansKR-Regular.otf", half_sz, layout_engine=layout_engine) - -n_rows = 64 -n_cols = 128 - -class FontInfo: - def __init__(self, font, dy): - self.font = font - self.dy = dy - -def allow_range(allowlist, lo_hi, font = None, dy = 0): - for i in range(lo_hi[0], lo_hi[1] + 1): - allowlist[i] = FontInfo(font, dy) -def ban_range(allowlist, lo, hi): - for i in range(lo, hi + 1): - del allowlist[i] -allowlist = {} -# ASCII -basic_latin = (32, 126) -allow_range(allowlist, basic_latin, font=noto_sans_mono, dy = -20) -# Latin-1 supplement -latin_1_supplement = (0x00A1, 0x00ff) -allow_range(allowlist, latin_1_supplement, font = noto_sans_mono) -# Latin extended-A -latin_extended_a = (0x0100, 0x017f) -allow_range(allowlist, latin_extended_a, font = noto_sans_mono) -# Latin extended-B -latin_extended_b = (0x0180, 0x024f) -allow_range(allowlist, latin_extended_b, font = noto_sans_mono) -# Spacing modifier letters -ipa_extensions = (0x0250, 0x02af) -allow_range(allowlist, ipa_extensions, font = unifont) -# Greek and Coptic -greek = (0x0370, 0x03ff) -allow_range(allowlist, greek, font = noto_sans_mono) -ban_range(allowlist, 0x0378, 0x03a2) -# Cyrillic -cyrillic = (0x0400, 0x04ff) -allow_range(allowlist, cyrillic, font = unifont) -# Currency symbols -currency_symbols = (0x20a0, 0x20c0) -allow_range(allowlist, currency_symbols, font = noto_sans_mono) - -# CJK -# -hangul_jamo = (0x1100, 0x11FF) -allow_range(allowlist, hangul_jamo, font = noto_sans_kr_half) -# -general_punctuation = (0x2000, 0x206f) -allow_range(allowlist, general_punctuation, font = noto_sans_mono) -# -kangxi_radicals = (0x2f00, 0x2fdf) -allow_range(allowlist, kangxi_radicals, font = noto_sans_sc_half) -# -cjk_symbols_and_punctuation = (0x3000, 0x303f) -allow_range(allowlist, cjk_symbols_and_punctuation, font = noto_sans_sc_half) -# -hiragana = (0x3041, 0x309f) -allow_range(allowlist, hiragana, font = noto_sans_sc_half) -ban_range(allowlist, 0x3097, 0x3098) -# -katakana = (0x30a0, 0x30ff) -allow_range(allowlist, katakana, font = noto_sans_sc_half) -# -hangul_compatibility_jamo = (0x3130, 0x318f) -allow_range(allowlist, hangul_compatibility_jamo, font = noto_sans_sc_half) -# -enclosed_cjk_letters_and_months = (0x3200, 0x32FF) -allow_range(allowlist, enclosed_cjk_letters_and_months, font = noto_sans_sc_half) -# -cjk_compatibility = (0x3300, 0x33ff) -allow_range(allowlist, cjk_compatibility, font = noto_sans_sc_half) -# -cjk_unified_extension_a = (0x3400, 0x4dbf) -allow_range(allowlist, cjk_unified_extension_a, font = noto_sans_sc_half) -# -cjk_ideographs = (0x4e00, 0x9fff) -allow_range(allowlist, cjk_ideographs, font = noto_sans_sc_half) -# -hangul_syllables = (0xAC00, 0xD7A3) -allow_range(allowlist, hangul_syllables, font = noto_sans_kr_half) -# -halfwidth_and_fullwidth = (0xff00, 0xffef) -allow_range(allowlist, halfwidth_and_fullwidth, font = noto_sans_sc_half) - -def in_range(x, range_pair) -> bool: - return x >= range_pair[0] and x <= range_pair[1] - -max_char = max(allowlist) -print("max char: {}".format(max_char)) -print("num chars: {}".format(len(allowlist))) -total_rows = math.ceil(max_char / n_cols) -print("total rows {}".format(total_rows)) -total_textures = math.ceil(total_rows / n_rows) -print("total textures {}".format(total_textures)) - -for nth_texture in range(0, total_textures): - # Create an 8K grayscale ("L") or black and white ("1") image - image = Image.new(mode="1", size=(8192,8192), color=0) - draw = ImageDraw.Draw(image) - - row_begin = nth_texture * n_rows - - for row in range(row_begin, row_begin + n_rows): - line = "" - for col in range(0, n_cols): - # Generate the unicode character for this spot. - n = row * n_cols + col - char = None - font_info = None - if n in allowlist.keys(): - char = chr(n) - font_info = allowlist[n] - else: - char = " " - font_info = FontInfo(unifont, 0) - # Hack: Chinese, Japanese, and Korean characters are all double - # width and are all on textures [1,6]. To fit them in the same - # grid, we use a half-size font. - draw.text((col * font_pixels / 2, (row - row_begin) * font_pixels + - font_info.dy), char, font=font_info.font, fill=255) - - image.save("Fonts/Bitmaps/font-%01d.png" % nth_texture) - diff --git a/generate_params.py b/generate_params.py deleted file mode 100644 index 323502c..0000000 --- a/generate_params.py +++ /dev/null @@ -1,91 +0,0 @@ -#!/usr/bin/env python3 - -import generate_utils - -PARAM_HEADER = """ -%YAML 1.1 -%TAG !u! tag:unity3d.com,2011: ---- !u!114 &11400000 -MonoBehaviour: - m_ObjectHideFlags: 0 - m_CorrespondingSourceObject: {fileID: 0} - m_PrefabInstance: {fileID: 0} - m_PrefabAsset: {fileID: 0} - m_GameObject: {fileID: 0} - m_Enabled: 1 - m_EditorHideFlags: 0 - m_Script: {fileID: -1506855854, guid: 67cc4cb7839cd3741b63733d5adf0442, type: 3} - m_Name: TaSTT_params - m_EditorClassIdentifier: - parameters: -"""[1:][0:-1] - -INT_PARAM = """ - - name: %PARAM_NAME% - valueType: 0 - saved: 0 - defaultValue: 0 -"""[1:][0:-1] - -BOOL_PARAM = """ - - name: %PARAM_NAME% - valueType: 2 - saved: %SAVED% - defaultValue: 0 -"""[1:][0:-1] - -FLOAT_PARAM = """ - - name: %PARAM_NAME% - valueType: 1 - saved: 0 - defaultValue: %DEFAULT_FLOAT% -"""[1:][0:-1] - -# We're working with an 84-character board, and each FX layer is responsible -# for 8 of those characters. -params = {} -params["SAVED"] = "0" -params["DEFAULT_FLOAT"] = "0" -print(generate_utils.replaceMacros(PARAM_HEADER, params)) - -params["PARAM_NAME"] = generate_utils.getDummyParam() -print(generate_utils.replaceMacros(BOOL_PARAM, params)) - -params["PARAM_NAME"] = generate_utils.getEnableParam() -print(generate_utils.replaceMacros(BOOL_PARAM, params)) - -params["PARAM_NAME"] = generate_utils.getIndicator0Param() -print(generate_utils.replaceMacros(BOOL_PARAM, params)) - -params["PARAM_NAME"] = generate_utils.getIndicator1Param() -print(generate_utils.replaceMacros(BOOL_PARAM, params)) - -params["PARAM_NAME"] = generate_utils.getScaleParam() -params["DEFAULT_FLOAT"] = "0.2" -print(generate_utils.replaceMacros(FLOAT_PARAM, params)) -params["DEFAULT_FLOAT"] = "0" - -params["PARAM_NAME"] = generate_utils.getToggleParam() -print(generate_utils.replaceMacros(BOOL_PARAM, params)) - -params["PARAM_NAME"] = generate_utils.getSpeechNoiseToggleParam() -print(generate_utils.replaceMacros(BOOL_PARAM, params)) - -params["PARAM_NAME"] = generate_utils.getSpeechNoiseEnableParam() -params["SAVED"] = "1" -print(generate_utils.replaceMacros(BOOL_PARAM, params)) -params["SAVED"] = "0" - -params["PARAM_NAME"] = generate_utils.getLockWorldParam() -print(generate_utils.replaceMacros(BOOL_PARAM, params)) - -params["PARAM_NAME"] = generate_utils.getClearBoardParam() -print(generate_utils.replaceMacros(BOOL_PARAM, params)) - -params["PARAM_NAME"] = generate_utils.getSelectParam() -print(generate_utils.replaceMacros(INT_PARAM, params)) - -for byte in range(0, generate_utils.BYTES_PER_CHAR): - for i in range(0, generate_utils.NUM_LAYERS): - params["PARAM_NAME"] = generate_utils.getBlendParam(i, byte) - print(generate_utils.replaceMacros(FLOAT_PARAM, params)) diff --git a/generate_utils.py b/generate_utils.py deleted file mode 100644 index e8fcc8b..0000000 --- a/generate_utils.py +++ /dev/null @@ -1,130 +0,0 @@ -from math import ceil -from math import floor - -def replaceMacros(lines, macro_defs): - for k,v in macro_defs.items(): - lines = lines.replace("%" + k + "%", v) - return lines - -# Note, (BOARD_ROWS * BOARD_COLS % NUM_LAYERS) must equal 0. If not, writing to -# the last cell will (with the current implementation) wrap around to the front -# of the board. -BOARD_ROWS=4 -BOARD_COLS=48 -NUM_REGIONS = 24 -CHARS_PER_CELL=256 -BYTES_PER_CHAR=2 - -NUM_LAYERS=ceil((BOARD_ROWS * BOARD_COLS) / NUM_REGIONS) - -# Implementation detail. We use this parameter to return from the terminal -# state of the FX layer to the starting state. -def getDummyParam(): - return "TaSTT_Dummy" - -def getHipToggleParam(): - return "TaSTT_Hip_Toggle" - -def getHandToggleParam(): - return "TaSTT_Hand_Toggle" - -def getToggleParam(): - return "TaSTT_Toggle" - -def getScaleParam(): - return "TaSTT_Scale" - -# When this is set to true, the board will emit a soft beep sound. It's used to -# grab attention when speaking. -def getSpeechNoiseToggleParam(): - return "TaSTT_Speech_Noise_Toggle" - -# This is used to disable speaking noises. -def getSpeechNoiseEnableParam(): - return "TaSTT_Speech_Noise_Enable" - -# When this is set to true, the board clears. -def getClearBoardParam(): - return "TaSTT_Clear_Board" - -def getLockWorldParam(): - return "TaSTT_Lock_World" - -# Each layer controls a group of cells. There's only one letter per layer, thus -# this is also the name of the parameter which sets the letter for a layer. -def getLayerParam(which_layer: int, byte: int) -> str: - return "TaSTT_L%02dB%01d" % (which_layer, byte) - -def getLayerName(which_layer: int, byte: int) -> str: - return getLayerParam(which_layer, byte) - -def getBlendParam(which_layer: int, byte: int) -> str: - return "TaSTT_L%02dB%01d_Blend" % (which_layer, byte) - -def getDefaultStateName(which_layer:int , byte: int): - return "TaSTT_L%02dB%01d_Do_Nothing" % (which_layer, byte) - -def getActiveStateName(which_layer: int, byte: int): - return "TaSTT_L%02dB%01d_Active" % (which_layer, byte) - -def getSelectStateName(which_layer, select): - return "TaSTT_L%02d_S%02d_B%01d" % (which_layer, select, byte) - -def getBlendStateName(which_layer, select, byte): - return "TaSTT_L%02d_S%02d_B%01d_Blend" % (which_layer, select, byte) - -def getLetterStateName(which_layer, select, letter, byte): - return "TaSTT_L%02d_S%02d_L%03d_B%01d" % (which_layer, select, letter, byte) - -def getSelectParam() -> str: - return "TaSTT_Select" - -def getEnableParam(): - return "TaSTT_Enable" - -def getIndicator0Param(): - return "TaSTT_Indicator_0" - -def getIndicator1Param(): - return "TaSTT_Indicator_1" - -def getBoardIndex(which_layer, select): - # Because we divide the board into a multiple of 8 cells, some cells may - # describe animations which don't exist, depending on the size of the board. - # We work around this by simply wrapping those animations back to the top - # of the board, and rely on the OSC controller to simply not reference - # those cells. - return (select * NUM_LAYERS + which_layer) % (BOARD_ROWS * BOARD_COLS) - -def getShaderParamByRowColByte(row, col, byte): - return "_Letter_Row%02d_Col%02d_Byte%01d" % (row, col, byte) - -# Mapping from layer to shader param. -def getShaderParam(which_layer, select, byte): - index = getBoardIndex(which_layer, select) - - col = index % BOARD_COLS - row = floor(index / BOARD_COLS) - - return getShaderParamByRowCol(row, col, byte) - -# The name of the animation which writes `letter` at a specific position in the -# display. -def getLetterAnimationName(row, col, letter, nth_byte): - return "R%02dC%02dL%02dB%01d" % (row, col, letter, nth_byte) - -# The name of the animation which clears the entire board. -def getClearAnimationName(): - return "TaSTT_Clear_Board" - -def getAnimationNameByLayerAndIndex(which_layer, select, letter, nth_byte): - index = getBoardIndex(which_layer, select) - - col = index % BOARD_COLS - row = floor(index / BOARD_COLS) - - return "R%02dC%02dL%02dB%01d" % (row, col, letter, nth_byte) - -# Returns the path to the animation for the given shader parameter + letter. -def getAnimationPath(shader_param, letter): - return "generated/animations/%s_Letter%02d.anim" % (shader_param, letter) diff --git a/libtastt.py b/libtastt.py deleted file mode 100644 index bee535f..0000000 --- a/libtastt.py +++ /dev/null @@ -1,578 +0,0 @@ -#!/usr/bin/env python3 - -import argparse -import generate_utils -import libunity -import os -import pickle -import sys -import typing - -# TODO(yum) we're getting the encoding scheme from here, but I think it should -# be in a different layer. -import osc_ctrl - -LETTER_ANIMATION_TEMPLATE = """ -%YAML 1.1 -%TAG !u! tag:unity3d.com,2011: ---- !u!74 &7400000 -AnimationClip: - m_ObjectHideFlags: 0 - m_CorrespondingSourceObject: {fileID: 0} - m_PrefabInstance: {fileID: 0} - m_PrefabAsset: {fileID: 0} - m_Name: REPLACEME_ANIMATION_NAME - serializedVersion: 6 - m_Legacy: 0 - m_Compressed: 0 - m_UseHighQualityCurve: 1 - m_RotationCurves: [] - m_CompressedRotationCurves: [] - m_EulerCurves: [] - m_PositionCurves: [] - m_ScaleCurves: [] - m_FloatCurves: - - curve: - serializedVersion: 2 - m_Curve: - - serializedVersion: 3 - time: 0 - value: REPLACEME_LETTER_VALUE - inSlope: 0 - outSlope: 0 - tangentMode: 136 - weightedMode: 0 - inWeight: 0 - outWeight: 0 - - serializedVersion: 3 - time: 0.016666668 - value: REPLACEME_LETTER_VALUE - inSlope: 0 - outSlope: 0 - tangentMode: 136 - weightedMode: 0 - inWeight: 0 - outWeight: 0 - m_PreInfinity: 2 - m_PostInfinity: 2 - m_RotationOrder: 4 - attribute: material.REPLACEME_LETTER_PARAM - path: TaSTT - classID: 137 - script: {fileID: 0} - m_PPtrCurves: [] - m_SampleRate: 60 - m_WrapMode: 0 - m_Bounds: - m_Center: {x: 0, y: 0, z: 0} - m_Extent: {x: 0, y: 0, z: 0} - m_ClipBindingConstant: - genericBindings: - - serializedVersion: 2 - path: 2794480623 - attribute: 2284639795 - script: {fileID: 0} - typeID: 137 - customType: 22 - isPPtrCurve: 0 - pptrCurveMapping: [] - m_AnimationClipSettings: - serializedVersion: 2 - m_AdditiveReferencePoseClip: {fileID: 0} - m_AdditiveReferencePoseTime: 0 - m_StartTime: 0 - m_StopTime: 0 - m_OrientationOffsetY: 0 - m_Level: 0 - m_CycleOffset: 0 - m_HasAdditiveReferencePose: 0 - m_LoopTime: 1 - m_LoopBlend: 0 - m_LoopBlendOrientation: 0 - m_LoopBlendPositionY: 0 - m_LoopBlendPositionXZ: 0 - m_KeepOriginalOrientation: 0 - m_KeepOriginalPositionY: 1 - m_KeepOriginalPositionXZ: 0 - m_HeightFromFeet: 0 - m_Mirror: 0 - m_EditorCurves: - - curve: - serializedVersion: 2 - m_Curve: - - serializedVersion: 3 - time: 0 - value: REPLACEME_LETTER_VALUE - inSlope: 0 - outSlope: 0 - tangentMode: 136 - weightedMode: 0 - inWeight: 0 - outWeight: 0 - - serializedVersion: 3 - time: 0.016666668 - value: REPLACEME_LETTER_VALUE - inSlope: 0 - outSlope: 0 - tangentMode: 136 - weightedMode: 0 - inWeight: 0 - outWeight: 0 - m_PreInfinity: 2 - m_PostInfinity: 2 - m_RotationOrder: 4 - attribute: material.REPLACEME_LETTER_PARAM - path: TaSTT - classID: 137 - script: {fileID: 0} - m_EulerEditorCurves: [] - m_HasGenericRootTransform: 0 - m_HasMotionFloatCurves: 0 - m_Events: [] -""" - -ANIMATOR_TEMPLATE = """ ---- !u!91 &9100000 -AnimatorController: - m_ObjectHideFlags: 0 - m_CorrespondingSourceObject: {fileID: 0} - m_PrefabInstance: {fileID: 0} - m_PrefabAsset: {fileID: 0} - m_Name: TaSTT_fx - serializedVersion: 5 - m_AnimatorParameters: [] - m_AnimatorLayers: [] -""" - -# For whatever reason, running unrelated animations s.a. -# facial expressions can have a slight effect on supposedly -# unrelated parameters, causing letter to flip. Add a -# little buffer to reduce the odds that this effect causes -# a letter to change after it has been written. -UNITY_ANIMATION_FUDGE_MARGIN = 0.1 - -def generateClearAnimation(anim_dir, guid_map): - print("Generating board clearing animation", file=sys.stderr) - - parser = libunity.UnityParser() - parser.parse(LETTER_ANIMATION_TEMPLATE) - - anim_node = parser.nodes[0] - anim_clip = anim_node.mapping['AnimationClip'] - curve_template = anim_clip.mapping['m_FloatCurves'].sequence[0] - anim_clip.mapping['m_FloatCurves'].sequence = [] - anim_clip.mapping['m_EditorCurves'].sequence = [] - - letter = 0 - - for byte in range(0, generate_utils.BYTES_PER_CHAR): - for row in range(0, generate_utils.BOARD_ROWS): - for col in range(0, generate_utils.BOARD_COLS): - curve = curve_template.copy() - for keyframe in curve.mapping['curve'].mapping['m_Curve'].sequence: - keyframe.mapping['value'] = str(letter + - UNITY_ANIMATION_FUDGE_MARGIN) - curve.mapping['attribute'] = "material.{}".format(generate_utils.getShaderParamByRowColByte(row, col, byte)) - curve.mapping['path'] = "World Constraint/Container/TaSTT" - # Add curve to animation - anim_clip.mapping['m_FloatCurves'].sequence.append(curve) - anim_clip.mapping['m_EditorCurves'].sequence.append(curve) - # Serialize animation to file - anim_name = generate_utils.getClearAnimationName() - anim_path = anim_dir + anim_name + ".anim" - with open(anim_path, "w") as f: - f.write(libunity.unityYamlToString([anim_node])) - # Generate metadata - meta = libunity.Metadata() - with open(anim_path + ".meta", "w") as f: - f.write(str(meta)) - # Add metadata to guid map - guid_map[anim_path] = meta.guid - guid_map[meta.guid] = anim_path - -# Generate a toggle animation for a shader parameter. -def generateToggleAnimations(anim_dir, shader_param, guid_map): - print("Generating shader toggle animation", file=sys.stderr) - - parser = libunity.UnityParser() - parser.parse(LETTER_ANIMATION_TEMPLATE) - - # 0.0 represents false, 1.0 represents true. Don't forget that we add - # `UNITY_ANIMATION_FUDGE_MARGIN` to everything. - for shader_value in range(0, 2): - anim_node = parser.nodes[0] - anim_clip = anim_node.mapping['AnimationClip'] - curve_template = anim_clip.mapping['m_FloatCurves'].sequence[0] - anim_clip.mapping['m_FloatCurves'].sequence = [] - anim_clip.mapping['m_EditorCurves'].sequence = [] - - curve = curve_template.copy() - for keyframe in curve.mapping['curve'].mapping['m_Curve'].sequence: - keyframe.mapping['value'] = str(float(shader_value) + - UNITY_ANIMATION_FUDGE_MARGIN) - curve.mapping['attribute'] = "material.{}".format(shader_param) - curve.mapping['path'] = "World Constraint/Container/TaSTT" - # Add curve to animation - anim_clip.mapping['m_FloatCurves'].sequence.append(curve) - anim_clip.mapping['m_EditorCurves'].sequence.append(curve) - - # Serialize animation to file - anim_name = generate_utils.getClearAnimationName() - anim_suffix = "_Off" - if shader_value == 1: - anim_suffix = "_On" - anim_path = anim_dir + shader_param + anim_suffix + ".anim" - with open(anim_path, "w") as f: - f.write(libunity.unityYamlToString([anim_node])) - # Generate metadata - meta = libunity.Metadata() - with open(anim_path + ".meta", "w") as f: - f.write(str(meta)) - # Add metadata to guid map - guid_map[anim_path] = meta.guid - guid_map[meta.guid] = anim_path - -# Generate a toggle animation for a shader parameter. -def generateFloatAnimation(anim_name: str, anim_dir: str, - path: str, attribute: str, - value: float, - guid_map: typing.Dict[str, str]) -> str: - print("Generating float toggle animation {}/{}".format(path,attribute), - file=sys.stderr) - - parser = libunity.UnityParser() - parser.parse(LETTER_ANIMATION_TEMPLATE) - - # 0.0 represents false, 1.0 represents true. Don't forget that we add - # `UNITY_ANIMATION_FUDGE_MARGIN` to everything. - anim_node = parser.nodes[0] - anim_clip = anim_node.mapping['AnimationClip'] - curve_template = anim_clip.mapping['m_FloatCurves'].sequence[0] - anim_clip.mapping['m_FloatCurves'].sequence = [] - anim_clip.mapping['m_EditorCurves'].sequence = [] - - curve = curve_template.copy() - for keyframe in curve.mapping['curve'].mapping['m_Curve'].sequence: - keyframe.mapping['value'] = str(value) - curve.mapping['attribute'] = attribute - curve.mapping['path'] = path - # Add curve to animation - anim_clip.mapping['m_FloatCurves'].sequence.append(curve) - anim_clip.mapping['m_EditorCurves'].sequence.append(curve) - - # Serialize animation to file - anim_path = anim_dir + anim_name + ".anim" - with open(anim_path, "w") as f: - f.write(libunity.unityYamlToString([anim_node])) - # Generate metadata - meta = libunity.Metadata() - with open(anim_path + ".meta", "w") as f: - f.write(str(meta)) - # Add metadata to guid map - guid_map[anim_path] = meta.guid - guid_map[meta.guid] = anim_path - - return meta.guid - -def generateAnimations(anim_dir, guid_map): - generateClearAnimation(args.gen_anim_dir, guid_map) - - generateToggleAnimations(args.gen_anim_dir, generate_utils.getIndicator0Param(), guid_map) - generateToggleAnimations(args.gen_anim_dir, generate_utils.getIndicator1Param(), guid_map) - - print("Generating letter animations", file=sys.stderr) - - parser = libunity.UnityParser() - parser.parse(LETTER_ANIMATION_TEMPLATE) - - anim_node = parser.nodes[0] - anim_clip = anim_node.mapping['AnimationClip'] - curve_template = anim_clip.mapping['m_FloatCurves'].sequence[0] - anim_clip.mapping['m_FloatCurves'].sequence = [] - anim_clip.mapping['m_EditorCurves'].sequence = [] - - # To support more languages, we use 2 bytes per character, giving us a 64K character set. - for byte in range(0, generate_utils.BYTES_PER_CHAR): - for row in range(0, generate_utils.BOARD_ROWS): - print("Generating letter animations (row {}/{}) (byte {}/2)".format(row, - generate_utils.BOARD_ROWS, byte), file=sys.stderr) - for col in range(0, generate_utils.BOARD_COLS): - for letter in range(0, 2): - if letter == 1: - letter = generate_utils.CHARS_PER_CELL - 1 - - # Make a deep copy of the templates - node = anim_node.copy() - curve = curve_template.copy() - clip = node.mapping['AnimationClip'] - # Populate animation name - anim_name = generate_utils.getLetterAnimationName(row, col, letter, byte) - clip.mapping['m_Name'] = anim_name - # Populate letter value - for keyframe in curve.mapping['curve'].mapping['m_Curve'].sequence: - keyframe.mapping['value'] = str(letter + UNITY_ANIMATION_FUDGE_MARGIN) - # Populate path to letter parameter - curve.mapping['attribute'] = "material.{}".format(generate_utils.getShaderParamByRowColByte(row, col, byte)) - curve.mapping['path'] = "World Constraint/Container/TaSTT" - # Add curve to animation - clip.mapping['m_FloatCurves'].sequence.append(curve) - clip.mapping['m_EditorCurves'].sequence.append(curve) - # Serialize animation to file - anim_path = anim_dir + anim_name + ".anim" - with open(anim_path, "w") as f: - f.write(libunity.unityYamlToString([node])) - # Generate metadata - meta = libunity.Metadata() - with open(anim_path + ".meta", "w") as f: - f.write(str(meta)) - # Add metadata to guid map - guid_map[anim_path] = meta.guid - guid_map[meta.guid] = anim_path - -def generateFXController(anim: libunity.UnityAnimator) -> typing.Dict[int, libunity.UnityDocument]: - parser = libunity.UnityParser() - parser.parse(ANIMATOR_TEMPLATE) - anim.addNodes(parser.nodes) - - anim.addParameter(generate_utils.getEnableParam(), bool) - anim.addParameter(generate_utils.getDummyParam(), bool) - anim.addParameter(generate_utils.getHipToggleParam(), bool) - anim.addParameter(generate_utils.getHandToggleParam(), bool) - anim.addParameter(generate_utils.getToggleParam(), bool) - anim.addParameter(generate_utils.getSpeechNoiseEnableParam(), bool) - anim.addParameter(generate_utils.getClearBoardParam(), bool) - anim.addParameter(generate_utils.getIndicator0Param(), bool) - anim.addParameter(generate_utils.getIndicator1Param(), bool) - anim.addParameter(generate_utils.getScaleParam(), float) - - layers = {} - for byte in range(0, generate_utils.BYTES_PER_CHAR): - layers[byte] = {} - for i in range(0, generate_utils.NUM_LAYERS): - anim.addParameter(generate_utils.getBlendParam(i, byte), float) - - layer = anim.addLayer(generate_utils.getLayerName(i, byte)) - layers[byte][i] = layer - anim.addParameter(generate_utils.getSelectParam(), int) - - return layers - -def generateFXLayer(which_layer: int, anim: libunity.UnityAnimator, layer: - libunity.UnityDocument, gen_anim_dir: str, byte: int): - is_default_state = True - default_state = anim.addAnimatorState(layer, - generate_utils.getDefaultStateName(which_layer, byte), is_default_state) - - dy = 100 - active_state = anim.addAnimatorState(layer, - generate_utils.getActiveStateName(which_layer, byte), dy = dy) - - active_state_transition = anim.addTransition(active_state) - enable_param = generate_utils.getEnableParam() - anim.addTransitionBooleanCondition(default_state, active_state_transition, - enable_param, True) - - select_states = {} - for i in range(0, generate_utils.NUM_REGIONS): - dx = i * 200 - dy = 200 - - # Create blend tree for this region. - anim_lo_path = gen_anim_dir + \ - generate_utils.getAnimationNameByLayerAndIndex( - which_layer, i, 0, byte) + \ - ".anim" - guid_lo = guid_map[anim_lo_path] - anim_hi_path = gen_anim_dir + \ - generate_utils.getAnimationNameByLayerAndIndex( - which_layer, i, generate_utils.CHARS_PER_CELL - 1, byte) + \ - ".anim" - guid_hi = guid_map[anim_hi_path] - - select_states[i] = anim.addAnimatorBlendTree(layer, - generate_utils.getBlendStateName(which_layer, i, byte), - generate_utils.getBlendParam(which_layer, byte), - guid_lo, guid_hi, dx = dx, dy = dy) - state = select_states[i] - - # Create transition to state. - select_state_transition = anim.addTransition(state) - select_param = generate_utils.getSelectParam() - anim.addTransitionIntegerEqualityCondition(active_state, - select_state_transition, select_param, i) - - # Create return-home transition. - home_state_transition = anim.addTransition(default_state) - home_state_transition.mapping['AnimatorStateTransition'].mapping['m_InterruptionSource'] = '0' - dummy_param = generate_utils.getDummyParam() - anim.addTransitionBooleanCondition(state, - home_state_transition, dummy_param, False) - pass - -# Generic toggle adding utility. -# Generates the layer and parameter. -# Returns a map containing the off and on states, as well as the -# transitions between them. -def generateToggle(layer_name: str, - gen_anim_dir: str, - off_anim_basename: str, - on_anim_basename: str, - anim: libunity.UnityAnimator) -> typing.Dict[str, - libunity.UnityDocument]: - layer = anim.addLayer(layer_name) - - # For simplicity, use the layer name as the parameter name. - parameter_name = layer_name - anim.addParameter(parameter_name, bool) - - off_state = anim.addAnimatorState(layer, layer_name + "_Off", - is_default_state = True) - on_state = anim.addAnimatorState(layer, layer_name + "_On", dy=100) - - if off_anim_basename: - off_anim_path = gen_anim_dir + off_anim_basename - off_anim_meta = libunity.Metadata() - off_anim_meta.load(off_anim_path) - anim.setAnimatorStateAnimation(off_state, off_anim_meta.guid) - - if on_anim_basename: - on_anim_path = gen_anim_dir + on_anim_basename - on_anim_meta = libunity.Metadata() - on_anim_meta.load(on_anim_path) - anim.setAnimatorStateAnimation(on_state, on_anim_meta.guid) - - off_to_on_trans = anim.addTransition(on_state) - anim.addTransitionBooleanCondition(off_state, - off_to_on_trans, parameter_name, True) - - on_to_off_trans = anim.addTransition(off_state) - anim.addTransitionBooleanCondition(on_state, - on_to_off_trans, parameter_name, False) - - result = {} - result["off"] = off_state - result["on"] = on_state - result["off_to_on"] = off_to_on_trans - result["on_to_off"] = on_to_off_trans - - return result - -def generateScaleLayer(anim: libunity.UnityAnimator, - gen_anim_dir: str, - guid_map: typing.Dict[str, str]): - - scale_layer = anim.addLayer(generate_utils.getScaleParam()) - - path = "World Constraint/Container/TaSTT" - attribute = "blendShape.Scale" - - guid_lo = generateFloatAnimation("TaSTT_Scale_0", gen_anim_dir, - path, attribute, - 0.0, guid_map) - guid_hi = generateFloatAnimation("TaSTT_Scale_100", gen_anim_dir, - path, attribute, - 100.0, guid_map) - - anim.addAnimatorBlendTree(scale_layer, - generate_utils.getScaleParam(), - generate_utils.getScaleParam(), - guid_lo, guid_hi, - lo_threshold = 0.0, hi_threshold = 1.0); - - pass - -def generateFX(guid_map, gen_anim_dir): - anim = libunity.UnityAnimator() - - layers = generateFXController(anim) - - # TODO(yum) parallelize - for byte in range(0, generate_utils.BYTES_PER_CHAR): - for which_layer, layer in layers[byte].items(): - print("Generating layer {}/{}".format(which_layer, len(layers[byte].items())), file=sys.stderr) - generateFXLayer(which_layer, anim, layer, gen_anim_dir, byte) - - states = generateToggle( - generate_utils.getSpeechNoiseToggleParam(), - "Animations/", - "TaSTT_Speech_Noise_Off.anim", - "TaSTT_Speech_Noise_On.anim", - anim) - # Enable beeping only if user has turned it on. - anim.addTransitionBooleanCondition(states["off"], - states["off_to_on"], generate_utils.getSpeechNoiseEnableParam(), True) - # Enable beeping only if board is out. - anim.addTransitionBooleanCondition(states["off"], - states["off_to_on"], generate_utils.getToggleParam(), True) - - generateToggle(generate_utils.getToggleParam(), - "Animations/", - "TaSTT_Toggle_Off.anim", - "TaSTT_Toggle_On.anim", - anim) - generateToggle(generate_utils.getLockWorldParam(), - "Animations/", - "TaSTT_Lock_World_Disable.anim", - "TaSTT_Lock_World_Enable.anim", - anim) - generateToggle( - generate_utils.getClearBoardParam(), - gen_anim_dir, - None, # No animation in the `off` state. - generate_utils.getClearAnimationName() + ".anim", - anim) - generateToggle(generate_utils.getIndicator0Param(), - gen_anim_dir, - generate_utils.getIndicator0Param() + "_Off.anim", - generate_utils.getIndicator0Param() + "_On.anim", - anim) - generateToggle(generate_utils.getIndicator1Param(), - gen_anim_dir, - generate_utils.getIndicator1Param() + "_Off.anim", - generate_utils.getIndicator1Param() + "_On.anim", - anim) - generateScaleLayer(anim, gen_anim_dir, guid_map) - - return anim - -def parseArgs(): - parser = argparse.ArgumentParser() - parser.add_argument("cmd", type=str, help="") - parser.add_argument("--gen_dir", type=str, help="The directory under " + - "which all generated assets are placed") - parser.add_argument("--gen_anim_dir", type=str, help="The directory under " + - "which all generated animations are placed.") - parser.add_argument("--guid_map", type=str, help="The path to a file which will store guids") - args = parser.parse_args() - - if not args.gen_dir: - args.gen_dir = "generated/" - - if not args.gen_anim_dir: - args.gen_anim_dir = args.gen_dir + "animations/" - - if not args.guid_map: - args.guid_map = "guid.map" - - return args - -if __name__ == "__main__": - args = parseArgs() - - if args.cmd == "gen_anims": - guid_map = {} - with open(args.guid_map, 'rb') as f: - guid_map = pickle.load(f) - - os.makedirs(args.gen_anim_dir, exist_ok=True) - generateAnimations(args.gen_anim_dir, guid_map) - - with open(args.guid_map, 'wb') as f: - pickle.dump(guid_map, f) - elif args.cmd == "gen_fx": - guid_map = {} - with open(args.guid_map, 'rb') as f: - guid_map = pickle.load(f) - - print(str(generateFX(guid_map, args.gen_anim_dir))) - diff --git a/libunity.py b/libunity.py deleted file mode 100644 index f9e9e28..0000000 --- a/libunity.py +++ /dev/null @@ -1,1356 +0,0 @@ -#!/usr/bin/env python3 - -import argparse -import copy -import enum -import math -import os -import pickle -import random -import sys -# python3 -m pip install pyyaml -# License: MIT. -import yaml - -import multiprocessing as mp - -WRITE_DEFAULTS_ANIM_TEMPLATE = """ -%YAML 1.1 -%TAG !u! tag:unity3d.com,2011: ---- !u!74 &7400000 -AnimationClip: - m_ObjectHideFlags: 0 - m_CorrespondingSourceObject: {fileID: 0} - m_PrefabInstance: {fileID: 0} - m_PrefabAsset: {fileID: 0} - m_Name: TaSTT_Reset_Animations - serializedVersion: 6 - m_Legacy: 0 - m_Compressed: 0 - m_UseHighQualityCurve: 1 - m_RotationCurves: [] - m_CompressedRotationCurves: [] - m_EulerCurves: [] - m_PositionCurves: [] - m_ScaleCurves: [] - m_FloatCurves: - - curve: - serializedVersion: 2 - m_Curve: - - serializedVersion: 3 - time: 0 - value: 0 - inSlope: 0 - outSlope: 0 - tangentMode: 136 - weightedMode: 0 - inWeight: 0 - outWeight: 0 - m_PreInfinity: 2 - m_PostInfinity: 2 - m_RotationOrder: 4 - attribute: REPLACEME_ATTRIBUTE - path: REPLACEME_PATH - classID: 137 - script: {fileID: 0} - m_PPtrCurves: [] - m_SampleRate: 60 - m_WrapMode: 0 - m_Bounds: - m_Center: {x: 0, y: 0, z: 0} - m_Extent: {x: 0, y: 0, z: 0} - m_ClipBindingConstant: - genericBindings: - - serializedVersion: 2 - path: 2794480623 - attribute: 2284639795 - script: {fileID: 0} - typeID: 137 - customType: 22 - isPPtrCurve: 0 - pptrCurveMapping: [] - m_AnimationClipSettings: - serializedVersion: 2 - m_AdditiveReferencePoseClip: {fileID: 0} - m_AdditiveReferencePoseTime: 0 - m_StartTime: 0 - m_StopTime: 0 - m_OrientationOffsetY: 0 - m_Level: 0 - m_CycleOffset: 0 - m_HasAdditiveReferencePose: 0 - m_LoopTime: 1 - m_LoopBlend: 0 - m_LoopBlendOrientation: 0 - m_LoopBlendPositionY: 0 - m_LoopBlendPositionXZ: 0 - m_KeepOriginalOrientation: 0 - m_KeepOriginalPositionY: 1 - m_KeepOriginalPositionXZ: 0 - m_HeightFromFeet: 0 - m_Mirror: 0 - m_EditorCurves: - - curve: - serializedVersion: 2 - m_Curve: - - serializedVersion: 3 - time: 0 - value: 0 - inSlope: 0 - outSlope: 0 - tangentMode: 136 - weightedMode: 0 - inWeight: 0 - outWeight: 0 - m_PreInfinity: 2 - m_PostInfinity: 2 - m_RotationOrder: 4 - attribute: REPLACEME_ATTRIBUTE - path: REPLACEME_PATH - classID: 137 - script: {fileID: 0} - m_EulerEditorCurves: [] - m_HasGenericRootTransform: 0 - m_HasMotionFloatCurves: 0 - m_Events: [] -"""[1:][:-1] - -METADATA_TEMPLATE = """ -fileFormatVersion: 2 -guid: REPLACEME_GUID -NativeFormatImporter: - externalObjects: {} - mainObjectFileID: 7400000 - userData: - assetBundleName: - assetBundleVariant: -"""[1:][:-1] - -ANIMATION_STATE_TEMPLATE = """ ---- !u!1102 &110200000 -AnimatorState: - serializedVersion: 6 - m_ObjectHideFlags: 1 - m_CorrespondingSourceObject: {fileID: 0} - m_PrefabInstance: {fileID: 0} - m_PrefabAsset: {fileID: 0} - m_Name: REPLACEME_ANIMATION_NAME - m_Speed: 1 - m_CycleOffset: 0 - m_Transitions: [] - m_StateMachineBehaviours: [] - m_Position: {x: 50, y: 50, z: 0} - m_IKOnFeet: 0 - m_WriteDefaultValues: 0 - m_Mirror: 0 - m_SpeedParameterActive: 0 - m_MirrorParameterActive: 0 - m_CycleOffsetParameterActive: 0 - m_TimeParameterActive: 0 - m_Motion: {} - m_Tag: - m_SpeedParameter: - m_MirrorParameter: - m_CycleOffsetParameter: - m_TimeParameter: -"""[1:][:-1] - -TRANSITION_TEMPLATE = """ ---- !u!1101 &110100000 -AnimatorStateTransition: - m_ObjectHideFlags: 1 - m_CorrespondingSourceObject: {fileID: 0} - m_PrefabInstance: {fileID: 0} - m_PrefabAsset: {fileID: 0} - m_Name: - m_Conditions: [] - m_DstStateMachine: {fileID: 0} - m_DstState: {fileID: 0} - m_Solo: 0 - m_Mute: 0 - m_IsExit: 0 - serializedVersion: 3 - m_TransitionDuration: 0 - m_TransitionOffset: 0 - m_ExitTime: 0.0 - m_HasExitTime: 0 - m_HasFixedDuration: 1 - m_InterruptionSource: 2 - m_OrderedInterruption: 1 - m_CanTransitionToSelf: 1 -"""[1:][:-1] - -BLEND_TREE_TEMPLATE = """ ---- !u!206 &1071664566462684110 -BlendTree: - m_ObjectHideFlags: 1 - m_CorrespondingSourceObject: {fileID: 0} - m_PrefabInstance: {fileID: 0} - m_PrefabAsset: {fileID: 0} - m_Name: REPLACEME_BLEND_TREE_NAME - m_Childs: - - serializedVersion: 2 - m_Motion: {fileID: 7400000, guid: REPLACEME_GUID_LO, type: 2} - m_Threshold: -1 - m_Position: {x: 0, y: 0} - m_TimeScale: 1 - m_CycleOffset: 0 - m_DirectBlendParameter: REPLACEME_BLEND_PARAMETER - m_Mirror: 0 - - serializedVersion: 2 - m_Motion: {fileID: 7400000, guid: REPLACEME_GUID_HI, type: 2} - m_Threshold: 1 - m_Position: {x: 0, y: 0} - m_TimeScale: 1 - m_CycleOffset: 0 - m_DirectBlendParameter: REPLACEME_BLEND_PARAMETER - m_Mirror: 0 - m_BlendParameter: REPLACEME_BLEND_PARAMETER - m_BlendParameterY: REPLACEME_BLEND_PARAMETER - m_MinThreshold: -1 - m_MaxThreshold: 1 - m_UseAutomaticThresholds: 0 - m_NormalizedBlendValues: 0 - m_BlendType: 0 -"""[1:][:-1] - -class Metadata: - def __init__(self): - self.guid = "%032x" % random.randrange(16 ** 32) - - def load(self, path): - if not path.endswith(".meta"): - path = path + ".meta" - - self.guid = None - with open(path, "r") as f: - for line in f: - if line.startswith("guid"): - self.guid = line.split()[1] - - def __str__(self): - return METADATA_TEMPLATE.replace("REPLACEME_GUID", self.guid) - -class Node: - def __init__(self): - # Optional. In Unity, this is the fileID of an object. Not all YAML - # mappings have an anchor. - self.anchor = None - - # Pointer to the Node containing this one. - self.parent = None - -class Sequence(Node): - def __init__(self): - super().__init__() - self.sequence = [] - - def copy(self): - new = Sequence() - new.anchor = self.anchor - new.parent = self.parent - - for v in self.sequence: - if hasattr(v, "copy"): - new.sequence.append(v.copy()) - new.sequence[-1].parent = new - else: - new.sequence.append(v) - - return new - - def prettyPrint(self, first_indent=None, leading_newline=None): - depth = 0 - p = self.parent - while p != None: - depth += 1 - p = p.parent - indent = " " * depth - - lines = [] - first = True - for item in self.sequence: - cur_indent = indent - if first: - if first_indent != None: - cur_indent = first_indent - first = False - if hasattr(item, "prettyPrint"): - lines.append("{}- {}".format(cur_indent, item.prettyPrint(first_indent="", leading_newline=False))) - else: - lines.append("{}- {}".format(cur_indent, item)) - - if len(lines) == 0: - return "[]" - - return "\n" + '\n'.join(lines) - - def __str__(self): - return self.prettyPrint() - - def addChildMapping(self, anchor = None, add_to_head = False): - child = Mapping() - child.anchor = anchor - child.parent = self - child.sequence = [] - - if add_to_head: - self.sequence = [child] + self.sequence - else: - self.sequence.append(child) - - return child - - def addChildSequence(self, anchor = None): - child = Sequence() - child.anchor = anchor - child.parent = self - child.sequence = [] - - self.sequence.append(child) - - return child - - def forEach(self, cb): - for k in self.sequence: - cb(k) - -class Mapping(Node): - def __init__(self): - super().__init__() - self.mapping = {} - - def copy(self): - new = Mapping() - new.anchor = self.anchor - new.parent = self.parent - - for k, v in self.mapping.items(): - if hasattr(v, "copy"): - new.mapping[k] = v.copy() - new.mapping[k].parent = new - else: - new.mapping[k] = v - - return new - - def prettyPrint(self, first_indent=None, leading_newline=True): - depth = 0 - p = self.parent - while p != None: - depth += 1 - p = p.parent - indent = " " * depth - - lines = [] - first = True - for k, v in self.mapping.items(): - cur_indent = indent - if first: - if first_indent != None: - cur_indent = first_indent - first = False - lines.append("{}{}: {}".format(cur_indent, k, v)) - - result = '\n'.join(lines) - - # Inline 1-item mappings, matching Unity behavior. - if len(self.mapping.keys()) == 1 and len(result.split("\n")) == 1: - if first_indent == None: - return self.prettyPrint(first_indent="") - return "{" + lines[0] + "}" - - # Empty mappings are represented by '{}'. If we don't do this, Unity - # will assume that they are Sequences and get very sad. - if len(self.mapping.keys()) == 0: - return "{}" - - if leading_newline: - result = "\n" + result - - return result - - def __str__(self): - return self.prettyPrint() - - def addChildMapping(self, key, anchor = None): - child = Mapping() - child.anchor = anchor - child.parent = self - child.mapping = {} - - self.mapping[key] = child - - return child - - def addChildSequence(self, key, anchor = None): - child = Sequence() - child.anchor = anchor - child.parent = self - child.mapping = {} - - self.mapping[key] = child - - return child - - def forEach(self, cb): - for k, v in self.mapping.items(): - cb(v) - -class UnityDocument(Mapping): - def __init__(self): - super().__init__() - self.class_id = None - - def __str__(self): - return super().__str__() - - def copy(self): - result = super().copy() - result.class_id = self.class_id - return result - -# Class representing a Unity AnimatorController. Implements manipulations, like -# merging and reanchoring. -class UnityAnimator(): - def __init__(self): - self.nodes = [] - self.id_to_node = {} - self.next_id = 1000 * 1000 - - def __str__(self): - return unityYamlToString(self.nodes) - - def addNodes(self, nodes): - for node in nodes: - self.nodes.append(node) - anchor = node.anchor - if anchor == None: - raise Exception("Node is missing anchor: {}".format(str(node))) - if anchor in self.id_to_node: - raise Exception("Duplicate anchor: {}, node 1: {}, node 2: {}".format(anchor, str(node), str(self.id_to_node[anchor]))) - self.id_to_node[anchor] = node - - if int(anchor) > self.next_id: - self.next_id = int(anchor) + 1 - # I don't know why but this fixes a bug in the `fixWriteDefaults` - # codepath: two documents wind up with the same anchor. - self.next_id += 1 - - def allocateId(self) -> int: - result = self.next_id - self.next_id += 1 - return result - - # Checks if `old_id` is in `self.id_mapping`, and if so, returns the - # already-generated ID. Otherwise this allocates a new ID and - # records it in `self.id_mapping`. - def mapId(self, old_id: str) -> int: - new_id = None - if old_id in self.id_mapping.keys(): - new_id = self.id_mapping[old_id] - else: - new_id = self.allocateId() - self.id_mapping[old_id] = new_id - return new_id - - # Recursively iterate every mapping under `node` and assign new IDs to - # every identifier. Mappings are recorded in `self.id_mapping`. - def mergeIterator(self, node): - if hasattr(node, "mapping"): - # Don't relabel anything that's defined in an external file. - # TODO(yum) do this. - if 'fileID' in node.mapping and not 'guid' in node.mapping: - if node.mapping['fileID'] != '0': - old_id = node.mapping['fileID'] - new_id = self.mapId(old_id) - node.mapping['fileID'] = str(new_id) - if hasattr(node, "forEach"): - node.forEach(self.mergeIterator) - - def peekNodeOfClass(self, classId): - for node in self.nodes: - if node.class_id == classId: - return node - return None - - def popNodeOfClass(self, classId): - result = None - for node in self.nodes: - if node.class_id == classId: - result = node - self.nodes.remove(result) - break - if result: - del self.id_to_node[result.anchor] - return result - - def pushNode(self, node): - self.nodes.append(node) - self.id_to_node[node.anchor] = node - - # Merges two animator controllers and returns the result. Any identifiers - # in the animators are reassigned in a new namespace. The mappings from old - # identifiers to new identifiers are recorded in `self.id_mapping0` and - # `self.id_mapping1`. - def mergeAnimatorControllers(self, ctrl0, ctrl1): - ctrl0 = copy.deepcopy(ctrl0) - ctrl1 = copy.deepcopy(ctrl1) - - self.id_mapping0 = {} - self.id_mapping1 = {} - - p0 = ctrl0.mapping['AnimatorController'].mapping['m_AnimatorParameters'] - p1 = ctrl1.mapping['AnimatorController'].mapping['m_AnimatorParameters'] - - a0 = ctrl0.mapping['AnimatorController'].mapping['m_AnimatorLayers'] - a1 = ctrl1.mapping['AnimatorController'].mapping['m_AnimatorLayers'] - - self.id_mapping = self.id_mapping0 - p0.forEach(self.mergeIterator) - a0.forEach(self.mergeIterator) - - # Hack to prevent ctrl1 from getting a new ID for the animator. - # TODO(yum) delete this? - #del self.class_to_next_id['91'] - - self.id_mapping = self.id_mapping1 - p1.forEach(self.mergeIterator) - a1.forEach(self.mergeIterator) - - p0.sequence += p1.sequence - a0.sequence += a1.sequence - - for elm in p0.sequence: - elm.mapping['m_Controller'].mapping['fileID'] = ctrl0.anchor - for elm in a0.sequence: - elm.mapping['m_Controller'].mapping['fileID'] = ctrl0.anchor - - return ctrl0 - - def merge(self, other): - ctrl0 = self.popNodeOfClass('91') - ctrl1 = other.popNodeOfClass('91') - # Merge animators and populate `self.id_mapping0` and - # `self.id_mapping1. - merged_anim = self.mergeAnimatorControllers(ctrl0, ctrl1) - - # Mapping from class ID (string) to new class ID (int) - self.id_mapping = self.id_mapping0 - for node in self.nodes: - new_id = self.mapId(node.anchor) - node.anchor = str(new_id) - node.forEach(self.mergeIterator) - - self.id_mapping = self.id_mapping1 - for node in other.nodes: - new_id = self.mapId(node.anchor) - node.anchor = str(new_id) - node.forEach(self.mergeIterator) - - nodes = self.nodes - self.nodes = [] - self.id_to_node = {} - self.pushNode(merged_anim) - self.addNodes(nodes) - self.addNodes(other.nodes) - - # TODO(yum) support overwriting duplicates - def addParameter(self, param_name, param_type): - unity_type = None - if param_type == float: - unity_type = '1' - elif param_type == int: - unity_type = '3' - elif param_type == bool: - unity_type = '4' - anim = self.peekNodeOfClass('91') - params = anim.mapping['AnimatorController'].mapping['m_AnimatorParameters'] - param = params.addChildMapping() - param.mapping['m_Name'] = param_name - param.mapping['m_Type'] = unity_type - param.mapping['m_DefaultFloat'] = '0' - param.mapping['m_DefaultInt'] = '0' - param.mapping['m_DefaultBool'] = '0' - ctrl = param.addChildMapping('m_Controller') - ctrl.mapping['fileID'] = anim.anchor - - def addLayer(self, layer_name, add_to_head = False) -> UnityDocument: - # Add layer to controller - anim = self.peekNodeOfClass('91') - layers = anim.mapping['AnimatorController'].mapping['m_AnimatorLayers'] - layer = layers.addChildMapping(add_to_head = add_to_head) - layer.mapping['serializedVersion'] = '5' - layer.mapping['m_Name'] = layer_name - new_id = self.allocateId() - layer.addChildMapping('m_StateMachine').mapping['fileID'] = str(new_id) - layer.addChildMapping('m_Mask').mapping['fileID'] = '0' - layer.addChildSequence('m_Motions') - layer.addChildSequence('m_Behaviours') - layer.mapping['m_BlendingMode'] = '0' - layer.mapping['m_SyncedLayerIndex'] = '-1' - layer.mapping['m_DefaultWeight'] = '1' - layer.mapping['m_IKPass'] = '0' - layer.mapping['m_SyncedLayerAffectsTiming'] = '0' - layer.addChildMapping('m_Controller').mapping['fileID'] = anim.anchor - - # Create layer object - layer = UnityDocument() - layer.class_id = "1107" - layer.anchor = str(new_id) - mach = layer.addChildMapping('AnimatorStateMachine') - - mach.mapping['serializedVersion'] = '6' - - mach.mapping['m_ObjectHideFlags'] = '1' - mach.addChildMapping('m_CorrespondingSourceObject').mapping['fileID'] = '0' - mach.addChildMapping('m_PrefabInstance').mapping['fileID'] = '0' - mach.addChildMapping('m_PrefabAsset').mapping['fileID'] = '0' - mach.mapping['m_Name'] = layer_name - mach.addChildSequence('m_ChildStates') - mach.addChildSequence('m_ChildStateMachines') - mach.addChildSequence('m_AnyStateTransitions') - mach.addChildSequence('m_EntryTransitions') - mach.addChildMapping('m_StateMachineTransitions') - mach.addChildSequence('m_StateMachineBehaviours') - pos = mach.addChildMapping('m_AnyStatePosition') - pos.mapping['x'] = '50' - pos.mapping['y'] = '20' - pos.mapping['z'] = '0' - pos = mach.addChildMapping('m_EntryPosition') - pos.mapping['x'] = '50' - pos.mapping['y'] = '120' - pos.mapping['z'] = '0' - pos = mach.addChildMapping('m_ExitPosition') - pos.mapping['x'] = '800' - pos.mapping['y'] = '120' - pos.mapping['z'] = '0' - pos = mach.addChildMapping('m_ParentStateMachinePosition') - pos.mapping['x'] = '800' - pos.mapping['y'] = '20' - pos.mapping['z'] = '0' - mach.addChildMapping('m_DefaultState') - - self.nodes.append(layer) - return layer - - def addAnimatorState(self, layer, state_name, is_default_state = False, - dx = 0, dy = 0) -> UnityDocument: - # Create animation state - parser = UnityParser() - parser.parse(ANIMATION_STATE_TEMPLATE) - new_anim = UnityAnimator() - new_anim.addNodes(parser.nodes) - node = new_anim.nodes[0] - - new_id = self.allocateId() - node.class_id = "1102" - node.anchor = str(new_id) - state = node.mapping['AnimatorState'] - state.mapping['m_Name'] = state_name - #state.mapping['m_Motion'].mapping['guid'] = anim_guid - self.nodes.append(node) - - # Add state to layer - child_state = layer.mapping['AnimatorStateMachine'].mapping['m_ChildStates'].addChildMapping() - child_state.mapping['serializedVersion'] = '1' - child_state.addChildMapping('m_State').mapping['fileID'] = str(new_id) - state_pos = child_state.addChildMapping('m_Position') - state_pos.mapping['x'] = str(280 + dx) - state_pos.mapping['y'] = str(80 + dy) - state_pos.mapping['z'] = '0' - - if is_default_state: - layer.mapping['AnimatorStateMachine'].mapping['m_DefaultState'].mapping['fileID'] = str(new_id) - - return node - - def setAnimatorStateAnimation(self, anim_state, anim_guid): - anim_state.mapping['AnimatorState'].mapping['m_Motion'].mapping['guid'] = anim_guid - anim_state.mapping['AnimatorState'].mapping['m_Motion'].mapping['fileID'] = '7400000' - anim_state.mapping['AnimatorState'].mapping['m_Motion'].mapping['type'] = '2' - - # Adds a blend tree which uses the parameter named `param_name` to blend - # between anim_lo and anim_hi. Also creates the corresponding animation - # state. - def addAnimatorBlendTree(self, layer, state_name, param_name, - anim_guid_lo, anim_guid_hi, dx = 0, dy = 0, - lo_threshold = -1.0, hi_threshold = 1.0, - is_default_state = False) -> UnityDocument: - # Create the blend tree. - parser = UnityParser() - parser.parse(BLEND_TREE_TEMPLATE) - new_anim = UnityAnimator() - new_anim.addNodes(parser.nodes) - node = new_anim.nodes[0] - - new_id = self.allocateId() - node.class_id = "206" - node.anchor = str(new_id) - tree = node.mapping['BlendTree'] - tree.mapping['m_Name'] = state_name - # Low animation - tree.mapping['m_Childs'].sequence[0].mapping['m_Motion'].mapping['guid'] = anim_guid_lo - tree.mapping['m_Childs'].sequence[0].mapping['m_DirectBlendParameter'] = param_name - tree.mapping['m_Childs'].sequence[0].mapping['m_Threshold'] = str(lo_threshold) - # High animation - tree.mapping['m_Childs'].sequence[1].mapping['m_Motion'].mapping['guid'] = anim_guid_hi - tree.mapping['m_Childs'].sequence[1].mapping['m_DirectBlendParameter'] = param_name - tree.mapping['m_Childs'].sequence[1].mapping['m_Threshold'] = str(hi_threshold) - - tree.mapping['m_BlendParameter'] = param_name - tree.mapping['m_BlendParameterY'] = param_name - - self.nodes.append(node) - - # Create the corresponding animation state. - anim_state = self.addAnimatorState(layer, state_name, is_default_state, dx = dx, dy = - dy) - anim_state.mapping['AnimatorState'].mapping['m_Motion'].mapping['fileID'] = node.anchor - - return anim_state - - def addTransition(self, dst_state): - # Create animation state - parser = UnityParser() - parser.parse(TRANSITION_TEMPLATE) - new_transition = UnityAnimator() - new_transition.addNodes(parser.nodes) - node = new_transition.nodes[0] - - new_id = self.allocateId() - node.class_id = "1101" - node.anchor = str(new_id) - state = node.mapping['AnimatorStateTransition'] - state.mapping['m_DstState'].mapping['fileID'] = dst_state.anchor - self.nodes.append(node) - - return node - - def fixWriteDefaults(self, guid_map, generated_anim_path): - # TODO(yum) we should have an Animation class which encapsulates all - # this stuff. - parser = UnityParser() - parser.parse(WRITE_DEFAULTS_ANIM_TEMPLATE) - new_anim = UnityAnimator() - new_anim.addNodes(parser.nodes) - - new_clip = new_anim.peekNodeOfClass('74').mapping['AnimationClip'] - curve_template = new_clip.mapping['m_FloatCurves'].sequence[0] - new_clip.mapping['m_FloatCurves'].sequence = [] - new_clip.mapping['m_EditorCurves'].sequence = [] - - # Keep track of the (attribute, path) tuples we've already set to avoid - # animating the same thing twice. - attributes_set = set() - - animator_state_id = '1102' - for node in self.nodes: - if node.class_id != animator_state_id: - continue - - # Looking at an animator state. - if node.mapping['AnimatorState'].mapping['m_WriteDefaultValues'] != '1': - continue - - # Disable write defaults. - node.mapping['AnimatorState'].mapping['m_WriteDefaultValues'] = '0' - - # Looking at an animator state with write defaults. - motion = node.mapping['AnimatorState'].mapping['m_Motion'] - # Some animations have write defaults but don't trigger an - # animation. No idea what that's about. For now, just ignore. - if not 'guid' in motion.mapping: - continue - guid = motion.mapping['guid'] - - # Again, not really sure what's going on here, just ignore and - # revisit if we hit problems. - if not guid in guid_map.keys(): - continue - - # OK, we found an animation with write defaults, and we know where - # the animation lives. Crack it open and see what it's writing. - animation_path = guid_map[guid] - print("Animation has write defaults: {}".format(animation_path), file=sys.stderr) - parser = UnityParser() - parser.parseFile(animation_path) - anim = UnityAnimator() - anim.addNodes(parser.nodes) - - clip = anim.peekNodeOfClass('74') - - for curve in clip.mapping['AnimationClip'].mapping['m_FloatCurves'].sequence: - attr = curve.mapping['attribute'] - path = curve.mapping['path'] - if (attr, path) in attributes_set: - continue - #print("Fix attr/path {}/{}".format(attr, path), file=sys.stderr) - attributes_set.add((attr, path)) - - new_curve = curve_template.copy() - new_curve.mapping['attribute'] = attr - new_curve.mapping['path'] = path - - new_clip.mapping['m_FloatCurves'].sequence.append(new_curve) - new_clip.mapping['m_EditorCurves'].sequence.append(new_curve) - - #print("len float curves: {}".format(len(new_clip.mapping['m_FloatCurves'].sequence)), file=sys.stderr) - - def generateOffAnimationForGuid(self, guid_map, generated_anim_dir, guid): - # Looking at an animation. - if not guid in guid_map.keys(): - return - - animation_path = guid_map[guid] - print("Checking animation at {}".format(animation_path), file=sys.stderr) - parser = UnityParser() - parser.parseFile(animation_path) - anim = UnityAnimator() - anim.addNodes(parser.nodes) - - clip = anim.peekNodeOfClass('74') - - has_nonzero = False - curve_members = ["m_FloatCurves", "m_EditorCurves"] - for memb in curve_members: - for curve in clip.mapping['AnimationClip'].mapping[memb].sequence: - attr = curve.mapping['attribute'] - path = curve.mapping['path'] - - for m_curve in curve.mapping['curve'].mapping['m_Curve'].sequence: - if m_curve.mapping['value'] != '0': - has_nonzero = True - m_curve.mapping['value'] = '0' - - if not has_nonzero: - print("Animation does not set anything nonzero") - return - - print("Animation sets things nonzero, fixing") - - new_anim_path = "OFF_{}".format(os.path.basename(animation_path)) - new_anim_path = "{}/{}".format(generated_anim_dir, new_anim_path) - - with open(new_anim_path, "w") as f: - f.write(str(anim)) - - meta = Metadata() - with open(new_anim_path + ".meta", "w") as f: - f.write(str(meta)) - - def generateOffAnimationsAnimStates(self, guid_map, generated_anim_dir): - animator_state_id = '1102' - for node in self.nodes: - if node.class_id != animator_state_id: - continue - - # Looking at an animation state. - motion = node.mapping['AnimatorState'].mapping['m_Motion'] - if not 'guid' in motion.mapping: - continue - guid = motion.mapping['guid'] - self.generateOffAnimationForGuid(guid_map, generated_anim_dir, guid) - - - def generateOffAnimationsBlendTrees(self, guid_map, generated_anim_dir): - animator_state_id = '206' - for node in self.nodes: - if node.class_id != animator_state_id: - continue - - # Looking at an animation state. - for child in node.mapping['BlendTree'].mapping['m_Childs'].sequence: - motion = child.mapping['m_Motion'] - - if not 'guid' in motion.mapping: - continue - guid = motion.mapping['guid'] - self.generateOffAnimationForGuid(guid_map, generated_anim_dir, guid) - - def generateOffAnimations(self, guid_map, generated_anim_dir): - self.generateOffAnimationsAnimStates(guid_map, generated_anim_dir) - self.generateOffAnimationsBlendTrees(guid_map, generated_anim_dir) - - def addTransitionBooleanCondition(self, from_state, trans, param, branch): - # Populate the transition's condition logic. - cond = trans.mapping['AnimatorStateTransition'].mapping['m_Conditions'].addChildMapping() - if branch: - cond.mapping['m_ConditionMode'] = '1' - else: - cond.mapping['m_ConditionMode'] = '2' - cond.mapping['m_ConditionEvent'] = param - cond.mapping['m_EventThreshold'] = '0' - # Register the transition with the `from_state`. - if from_state: - from_state_trans = from_state.mapping['AnimatorState'].mapping['m_Transitions'].addChildMapping() - from_state_trans.mapping['fileID'] = trans.anchor - - def addTransitionIntegerEqualityCondition(self, from_state, trans, param, param_val): - # Populate the transition's condition logic. - cond = trans.mapping['AnimatorStateTransition'].mapping['m_Conditions'].addChildMapping() - cond.mapping['m_ConditionMode'] = '6' - cond.mapping['m_ConditionEvent'] = param - # Curiously, the typo ("treshold" only has 1 'h') is needed for this to - # work, but not for boolean conditions to work. - cond.mapping['m_EventTreshold'] = str(param_val) - # Register the transition with the `from_state`. - if from_state: - from_state_trans = from_state.mapping['AnimatorState'].mapping['m_Transitions'].addChildMapping() - from_state_trans.mapping['fileID'] = trans.anchor - - def addTransitionIntegerGreaterCondition(self, from_state, trans, param, param_val): - # Populate the transition's condition logic. - cond = trans.mapping['AnimatorStateTransition'].mapping['m_Conditions'].addChildMapping() - cond.mapping['m_ConditionMode'] = '3' - cond.mapping['m_ConditionEvent'] = param - cond.mapping['m_EventThreshold'] = str(param_val) - # Register the transition with the `from_state`. - if from_state: - from_state_trans = from_state.mapping['AnimatorState'].mapping['m_Transitions'].addChildMapping() - from_state_trans.mapping['fileID'] = trans.anchor - - # TODO(yum) this should be factored out into generate_fx.py - def addTasttToggle(self, off_anim_path, on_anim_path, toggle_param): - self.addParameter(toggle_param, bool) - - off_anim_meta = Metadata() - off_anim_meta.load(off_anim_path) - - on_anim_meta = Metadata() - on_anim_meta.load(on_anim_path) - - layer = self.addLayer('TaSTT_Toggle') - off_anim = self.addAnimatorState(layer, 'TaSTT_Toggle_Off', is_default_state = True) - self.setAnimatorStateAnimation(off_anim, off_anim_meta.guid) - on_anim = self.addAnimatorState(layer, 'TaSTT_Toggle_On') - self.setAnimatorStateAnimation(on_anim, on_anim_meta.guid) - - # TODO(yum) make a Transition class with methods for adding boolean - # conditions - off_to_on = self.addTransition(on_anim) - self.addTransitionBooleanCondition(off_anim, off_to_on, toggle_param, True) - - on_to_off = self.addTransition(off_anim) - self.addTransitionBooleanCondition(on_anim, on_to_off, toggle_param, False) - - def setNoopAnimations(self, guid_map, noop_anim_path): - noop_anim_meta = Metadata() - noop_anim_meta.load(noop_anim_path) - - for node in self.nodes: - if node.class_id != "1102": - continue - motion = node.mapping['AnimatorState'].mapping['m_Motion'] - replace = False - - if "fileID" in motion.mapping.keys() and \ - motion.mapping["fileID"] != "0": - continue - - if "guid" in motion.mapping.keys() and \ - motion.mapping["guid"] in guid_map: - continue - - motion.mapping["fileID"] = "7400000" - motion.mapping["guid"] = noop_anim_meta.guid - motion.mapping["type"] = "2" - -def unityYamlToString(nodes): - lines = [] - preamble = """ -%YAML 1.1 -%TAG !u! tag:unity3d.com,2011: -"""[1:][:-1] - lines.append(preamble) - for doc in nodes: - lines.append("--- !u!" + doc.class_id + " &" + doc.anchor) - lines.append(str(doc)) - result = '\n'.join(lines) - - for i in range(0,10): - result = result.replace("\n\n", "\n") - - return result - -class UnityParser: - STREAM_START = 100 - STREAM_END = 199 - - DOCUMENT_START = 200 - DOCUMENT_END = 299 - - MAPPING_START = 300 - MAPPING_KEY = 301 - - SEQUENCE_VALUE = 400 - - def __init__(self): - self.state = self.STREAM_START - self.cur_scalar = None - self.cur_node = None - - # Simple list of parsed documents. Populated by parse(). - self.nodes = [] - self.prev_states = [] - - def __str__(self): - return unityYamlToString(self.nodes) - - def pushState(self, state): - self.prev_states.append(self.state) - self.state = state - #print("state {} ({})".format(self.state, len(self.prev_states))) - - def popState(self): - self.state = self.prev_states[-1] - self.prev_states = self.prev_states[0:len(self.prev_states) - 1] - #print("state {} ({})".format(self.state, len(self.prev_states))) - return self.state - - def cleanYaml(self, yaml_str): - lines = [] - first_document = True - got_document = False - for line in yaml_str.split("\n"): - # Add end-of-document indicators. - if line.startswith("---"): - got_document = True - if not first_document: - lines.append("...\n") - first_document = False - - # Remove class ID tag from each block. - if line.startswith("---"): - parts = line.split() - lines.append(parts[0] + " " + parts[2] + "\n") - continue - lines.append(line) - - if got_document: - lines.append("...\n") - return '\n'.join(lines) - - def getClassIds(self, yaml_str): - anchor_to_class_id = {} - for line in yaml_str.split("\n"): - if not line.startswith("---"): - continue - - parts = line.split() - class_id = parts[1][3:] - anchor = parts[2][1:] - anchor_to_class_id[anchor] = class_id - - return anchor_to_class_id - - def parseFile(self, yaml_file): - yaml_str = "" - with open(yaml_file, "r") as f: - yaml_str = f.read() - return self.parse(yaml_str) - - def parse(self, yaml_str): - anchor_to_class_id = self.getClassIds(yaml_str) - yaml_str = self.cleanYaml(yaml_str) - - for event in yaml.parse(yaml_str): - if isinstance(event, yaml.StreamStartEvent): - if len(self.prev_states) > 0: - raise Exception("Multiple StreamStartEvents received") - self.pushState(self.STREAM_START) - - elif isinstance(event, yaml.StreamEndEvent): - if self.state != self.STREAM_START: - raise Exception("Document end received after state {}".format(self.state)) - self.popState() - if len(self.prev_states) > 0: - raise Exception("Extra states at stream end") - - elif isinstance(event, yaml.DocumentStartEvent): - if self.state != self.STREAM_START and self.state != self.DOCUMENT_END: - raise Exception("Document start received after state {}".format(self.state)) - self.pushState(self.DOCUMENT_START) - - elif isinstance(event, yaml.DocumentEndEvent): - if self.state != self.DOCUMENT_START: - raise Exception("Document end received after state {}".format(self.state)) - self.popState() - self.nodes.append(self.cur_node) - self.cur_node = None - - elif isinstance(event, yaml.MappingStartEvent): - if self.cur_node == None: - self.cur_node = UnityDocument() - self.cur_node.anchor = event.anchor - self.cur_node.class_id = anchor_to_class_id[event.anchor] - else: - self.cur_node = self.cur_node.addChildMapping(self.cur_scalar) - self.pushState(self.MAPPING_START) - - elif isinstance(event, yaml.MappingEndEvent): - if self.state != self.MAPPING_START: - raise Exception("Mapping end received after state {}".format(self.state)) - self.popState() - if self.state == self.MAPPING_KEY: - self.popState() - if self.cur_node.parent != None: - self.cur_node = self.cur_node.parent - - elif isinstance(event, yaml.SequenceStartEvent): - self.cur_node = self.cur_node.addChildSequence(self.cur_scalar) - self.pushState(self.SEQUENCE_VALUE) - - elif isinstance(event, yaml.SequenceEndEvent): - if self.state != self.SEQUENCE_VALUE: - raise Exception("Sequence end received after state {}".format(self.state)) - self.popState() - if self.state == self.MAPPING_KEY: - self.popState() - self.cur_node = self.cur_node.parent - - elif isinstance(event, yaml.ScalarEvent): - if self.state == self.MAPPING_START: - self.cur_scalar = event.value - self.pushState(self.MAPPING_KEY) - elif self.state == self.MAPPING_KEY: - self.cur_node.mapping[self.cur_scalar] = event.value - self.popState() - elif self.state == self.SEQUENCE_VALUE: - self.cur_node.sequence.append(event.value) - else: - raise Exception("Scalar event received after state {}".format(self.state)) - else: - raise Exception("Unhandled event {}".format(event)) - continue - -class MulticoreUnityParser: - def parseFile(self, yaml_file): - yaml_str = "" - with open(yaml_file, "r") as f: - yaml_str = f.read() - return self.parse(yaml_str) - - def parse(self, yaml_str): - lines = [] - documents = [] - first = True - n_lines = 0 - for line in yaml_str.split("\n"): - n_lines += 1 - if line.startswith("---"): - if not first: - documents.append("\n".join(lines)) - lines = [] - first = False - lines.append(line) - if len(lines) > 0: - documents.append("\n".join(lines)) - lines = [] - print("Got {} documents out of {} lines".format(len(documents), n_lines), file=sys.stderr) - - # Divide the work evenly among the # of CPUs we have available. - n_threads = os.cpu_count() - window_size = int(math.ceil(len(documents) / n_threads)) - merge_window = [] - merged_documents = [] - for i in range(0, len(documents)): - if i > 0 and i % window_size == 0: - merged_documents.append("\n".join(merge_window)) - merge_window = [] - merge_window.append(documents[i]) - if len(merge_window) > 0: - merged_documents.append("\n".join(merge_window)) - merge_window = [] - documents = merged_documents - - mgr = mp.Manager() - - print("Spawning {} threads".format(len(documents)), file=sys.stderr) - threads = [] - for document in documents: - res = mgr.dict() - thread = mp.Process(target = self.parseOneSerial, args = (document, res,)) - threads.append((thread, res)) - thread.start() - - print("Joining threads", file=sys.stderr) - nodes = [] - for thread, res in threads: - thread.join() - nodes += res['nodes'] - - print("Creating animator", file=sys.stderr) - result = UnityAnimator() - result.addNodes(nodes) - - return result - - def parseOneSerial(self, document, res): - parser = UnityParser() - parser.parse(document) - res['nodes'] = parser.nodes - - def parseFile(self, yaml_file): - yaml_str = "" - with open(yaml_file, "r") as f: - yaml_str = f.read() - return self.parse(yaml_str) - -def getGuidMap(d): - result = {} - for f in os.scandir(d): - path = f.path - if f.is_dir(): - result.update(getGuidMap(path)) - if not f.is_file(): - continue - suffix = ".meta" - if path.endswith(suffix): - with open(path, "r") as f: - for line in f: - if line.startswith("guid"): - guid = line.split()[1] - result[guid] = path[:-len(suffix)] - return result - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("cmd", type=str, help="One of merge, guid_map, fix_write_defaults") - parser.add_argument("--fx0", type=str, help="The first animator to merge") - parser.add_argument("--fx1", type=str, help="The second animator to merge") - parser.add_argument("--project_root", type=str, help="The path to the " + - "Unity project Assets folder") - parser.add_argument("--save_to", type=str, help="The path to save the " + - "result of the computation") - parser.add_argument("--guid_map", type=str, help="Path to guid.map, " + - "generated by a previous call to `guid_map`") - parser.add_argument("--guid_map_append", type=bool, help="If set, " + - "append to GUID map instead of overwriting.") - args = parser.parse_args() - - if args.cmd == "merge": - if not args.fx0 or not args.fx1: - print("--fx0 and --fx1 required", file=sys.stderr) - parser.print_help() - parser.exit(1) - - print("Parsing {}".format(args.fx0), file=sys.stderr) - parser0 = MulticoreUnityParser() - anim0 = parser0.parseFile(args.fx0) - - arg1 = "TaSTT_fx.controller" - print("Parsing {}".format(args.fx1), file=sys.stderr) - parser1 = MulticoreUnityParser() - anim1 = parser1.parseFile(args.fx1) - - print("Merging animators", file=sys.stderr) - anim0.merge(anim1) - - print("Serializing", file=sys.stderr) - print(unityYamlToString(anim0.nodes)) - - elif args.cmd == "guid_map": - if not args.project_root or not args.save_to: - print("--project_root and --save_to required") - parser.print_help() - parser.exit(1) - - print("Looking up GUIDs under {}".format(args.project_root), - file=sys.stderr) - guid_map = getGuidMap(args.project_root) - if args.guid_map_append: - tmp_map = {} - with open(args.save_to, "rb") as f: - tmp_map = pickle.load(f) - # combine guid_map and tmp_map - guid_map = {**guid_map, **tmp_map} - print("Saving to {}".format(args.save_to), file=sys.stderr) - with open(args.save_to, 'wb') as f: - pickle.dump(guid_map, f) - elif args.cmd == "fix_write_defaults": - if not args.fx0 or not args.guid_map: - print("--fx0 and --guid_map required") - parser.print_help() - parser.exit(1) - - guid_map = {} - with open(args.guid_map, 'rb') as f: - guid_map = pickle.load(f) - - print("Parsing {}".format(args.fx0), file=sys.stderr) - parser0 = MulticoreUnityParser() - anim = parser0.parseFile(args.fx0) - - print("Fixing write defaults", file=sys.stderr) - anim_dir = "generated/animations/" - os.makedirs(anim_dir, exist_ok=True) - anim.fixWriteDefaults(guid_map, anim_dir + "TaSTT_Reset_Animation.anim") - print(str(anim)) - - elif args.cmd == "gen_off_anims": - if not args.fx0 or not args.guid_map: - print("--fx0 and --guid_map required") - parser.print_help() - parser.exit(1) - - guid_map = {} - with open(args.guid_map, 'rb') as f: - guid_map = pickle.load(f) - - print("Parsing {}".format(args.fx0), file=sys.stderr) - parser0 = MulticoreUnityParser() - anim = parser0.parseFile(args.fx0) - - print("Generating off animations", file=sys.stderr) - anim_dir = "generated/animations/" - os.makedirs(anim_dir, exist_ok=True) - anim.generateOffAnimations(guid_map, "generated/animations") - - elif args.cmd == "add_toggle": - if not args.fx0: - print("--fx0 required") - parser.print_help() - parser.exit(1) - - print("Parsing {}".format(args.fx0), file=sys.stderr) - parser0 = MulticoreUnityParser() - anim = parser0.parseFile(args.fx0) - - print("Adding toggle", file=sys.stderr) - anim.addTasttToggle("Animations/TaSTT_Toggle_Off.anim", - "Animations/TaSTT_Toggle_On.anim", "TaSTT_Toggle") - print(str(anim)) - - elif args.cmd == "fast_parse_test": - if not args.fx0: - print("--fx0 required") - parser.print_help() - parser.exit(1) - - print("Parsing {}".format(args.fx0), file=sys.stderr) - parser0 = MulticoreUnityParser() - anim = parser0.parseFile(args.fx0) - print(str(anim)) - - elif args.cmd == "set_noop_anim": - if not args.fx0 or not args.guid_map: - print("--fx0 and --guid_map required") - parser.print_help() - parser.exit(1) - - guid_map = {} - with open(args.guid_map, 'rb') as f: - guid_map = pickle.load(f) - - print("Parsing {}".format(args.fx0), file=sys.stderr) - parser = MulticoreUnityParser() - anim = parser.parseFile(args.fx0) - - anim.setNoopAnimations(guid_map, "Animations/TaSTT_Do_Nothing.anim") - - print(str(anim)) - - else: - print("Unrecognized command: {}".format(args.cmd)) - diff --git a/obfuscate.py b/obfuscate.py deleted file mode 100644 index 8d01e10..0000000 --- a/obfuscate.py +++ /dev/null @@ -1,92 +0,0 @@ -#!/usr/bin/env python3 - -# This module is used to implement obfuscation of TaSTT network -# speech data. At a high level, TaSTT is simply streaming N bits of -# arbitrary data to a shader via VRChat's parameter sync mechanism. -# -# It would be trivial to mine this data for speech information, since -# we're sending unicode (or ASCII) characters to peers. -# -# To raise the cost for the casual data collector, we can obfuscate -# this data using a one-time pad in cipher-block chaining mode. -# -# Making things interesting, encrypted data will arrive at the Unity -# animator, which processes them in 8 bit chunks. They are written -# into contiguous blocks of the animator. Thus the shader can decrypt -# the board by decrypting each block. This is thus stronger than -# applying a one-time pad to each byte of the plaintext, since the -# statistical distribution of individual letters is destroyed. -# Obviously due to the lack of an initialization vector, the -# distribution of phrases (blocks) is preserved. - -import math -import os - -def genKey(n_bits = 128) -> bytearray: - return os.urandom(int(n_bits / 8)) - -def saveKey(filename: str, key: str): - with open(filename, "wb") as f: - f.write(key) - -def loadKey(filename: str) -> bytearray: - with open(filename, "rb") as f: - return f.read() - -# Apply a symmetric cypher to `data` using cypher-block chaining. -def obfuscate(data: bytearray, key: bytearray) -> str: - n_blocks = int(math.ceil(len(data) / len(key))) - # This is a misnomer. A true IV would be randomized, but we can't - # do that since the shader doesn't have access to it. We just use - # this to implement the "chaining" aspect of CBC. - iv = bytearray(b'\x00') * len(key) - result = bytearray() - for i in range(0, n_blocks): - block_begin = i * len(key) - block_end = (i + 1) * len(key) - block_plain = data[block_begin:block_end] - block_cypher = block_plain.copy() - for i in range(0, len(block_cypher)): - block_cypher[i] ^= iv[i] - block_cypher[i] ^= key[i] - result += block_cypher - iv = block_cypher - return result - -def deobfuscate(data: bytearray, key: bytearray) -> str: - n_blocks = int(math.ceil(len(data) / len(key))) - # This is a misnomer. A true IV would be randomized, but we can't - # do that since the shader doesn't have access to it. We just use - # this to implement the "chaining" aspect of CBC. - iv = bytearray(b'\x00') * len(key) - result = bytearray() - for i in range(0, n_blocks): - block_begin = i * len(key) - block_end = (i + 1) * len(key) - block_cypher = data[block_begin:block_end] - block_plain = block_cypher.copy() - for i in range(0, len(block_plain)): - block_plain[i] ^= key[i] - block_plain[i] ^= iv[i] - result += block_plain - iv = block_cypher - return result - -def test(): - key = genKey() - saveKey("test.key", key) - new_key = loadKey("test.key") - os.remove("test.key") - assert(key == new_key) - - plaintext_original = "Lorem ipsum dolor sit amet, consectetur adipiscing elit." - plaintext_bytes = bytearray(plaintext_original, "utf-8") - cyphertext = obfuscate(plaintext_bytes, key) - assert(len(plaintext_bytes) == len(cyphertext)) - plaintext_recovered = deobfuscate(cyphertext, key).decode("utf-8") - assert(plaintext_original == plaintext_recovered) - assert(plaintext_bytes != cyphertext) - -if __name__ == "__main__": - test() - diff --git a/osc_ctrl.py b/osc_ctrl.py deleted file mode 100644 index 34d1a36..0000000 --- a/osc_ctrl.py +++ /dev/null @@ -1,355 +0,0 @@ -#!/usr/bin/env python3 - -import argparse -import random -import time -import fileinput -import generate_utils - -# python3 -m pip install python-osc -# License: public domain. -from pythonosc import udp_client - -from math import ceil -from math import floor -from generate_utils import getLayerParam -from generate_utils import getSelectParam -from generate_utils import getEnableParam -from generate_utils import getBoardIndex -from generate_utils import NUM_LAYERS -from generate_utils import BOARD_ROWS -from generate_utils import BOARD_COLS - -import emotes - -# Based on a couple experiments, this seems like about as fast as we can go -# before players start losing events. -SYNC_FREQ_HZ = 5.0 -SYNC_DELAY_S = 1.0 / SYNC_FREQ_HZ - -def usage(): - print("python3 -m pip install python-osc") - print("python3 ./osc_ctrl.py") - -def getClient(ip = "127.0.0.1", port = 9000): - return udp_client.SimpleUDPClient(ip, port) - -class EvilGlobalState(): - # Mapping from ascii char to encoded byte. - encoding = {} -state = EvilGlobalState() - -# The characters in the TaSTT are all numbered from top left to bottom right. -# This function provides a mapping from letter ('a') to index (26). -def generateEncoding(): - encoding = {} - for i in range(0, 65535): - encoding[chr(i)] = (i % 256, int(i / 256)) - return encoding -state.encoding = generateEncoding() - -# Encodes a list of lines into the character set used by the board. -# Pads lines with spaces and adds lines so that the total number of -# lines sent is a multiple of the number of rows in the board. -def encodeMessage(lines): - result = [] - lines_tmp = lines + [" "] * ((BOARD_ROWS - len(lines)) % BOARD_ROWS) - for line in lines_tmp: - first_word = True - for word in line.split(): - if first_word: - first_word = False - else: - result.append(state.encoding[' ']) - - emote_word, emote_word_idx = emotes.lookup(word) - if emote_word: - - word_align = 0 - if len(result) > 0: - word_align = (6 - len(result) % 6) % 6 - word = ' ' * word_align + word - - for i in range(0, word_align): - result.append(state.encoding[' ']) - - for i in range(0, 6): - result.append((emote_word_idx, 0xE0)) - continue - - for char in word: - if not char in state.encoding: - print("skip unrecognized char {}".format(char)) - continue - result.append(state.encoding[char]) - result += [state.encoding[' ']] * (BOARD_COLS - len(line)) - return result - -def updateCell(client, cell_idx, letter_encoded): - for byte in range(0, generate_utils.BYTES_PER_CHAR): - addr="/avatar/parameters/" + generate_utils.getBlendParam(cell_idx, byte) - letter_remapped = (-127.5 + letter_encoded[byte]) / 127.5 - client.send_message(addr, letter_remapped) - -def enable(client): - addr="/avatar/parameters/" + getEnableParam() - client.send_message(addr, True) - -def disable(client): - addr="/avatar/parameters/" + getEnableParam() - client.send_message(addr, False) - -# Send a cell all at once. -# `which_cell` is an integer in the range [0,NUM_REGIONS) -def sendMessageCellDiscrete(client, msg_cell, which_cell): - empty_cell = [state.encoding[' ']] * NUM_LAYERS - - if msg_cell != empty_cell: - addr="/avatar/parameters/" + generate_utils.getSpeechNoiseToggleParam() - client.send_message(addr, True) - - # Really long messages just wrap back around. - which_cell = (which_cell % generate_utils.NUM_REGIONS) - - enable(client) - - # Seek to the current cell. - addr="/avatar/parameters/" + getSelectParam() - client.send_message(addr, which_cell) - - # Update each letter - for i in range(0, len(msg_cell)): - updateCell(client, i, msg_cell[i]) - - # Wait for sync. - time.sleep(SYNC_DELAY_S) - - if msg_cell != empty_cell: - addr="/avatar/parameters/" + generate_utils.getSpeechNoiseToggleParam() - client.send_message(addr, False) - -# The board is broken down into contiguous collections of characters called -# cells. Each cell contains `NUM_LAYERS` characters. We can update one cell -# every ~1.0 seconds. Going faster causes the board to display garbage to -# remote players. -def splitMessage(msg): - lines = [] - line = "" - for word in msg.split(): - # Hack: if the word is an emote, we make it 6 characters wide, then - # encode it differently later on. - emote_word, emote_word_idx = emotes.lookup(word) - if emote_word: - word = word.ljust(6) - # Due to some fuckery I do in the shader, emotes have to be rendered on - # a 6-character boundary. So pad the word on the left with spaces - # to get it onto that boundary. - word_align = 0 - if len(line) > 0: - word_align = (6 - (len(line) + 1) % 6) % 6 - print("len line: {}".format(len(line))) - print("word align: {}".format(word_align)) - word = ' ' * word_align + word - - while len(word) > BOARD_COLS: - if len(line) != 0: - lines.append(line) - line = "" - - word_prefix = word[0:BOARD_COLS-1] + "-" - word_suffix = word[BOARD_COLS-1:] - #print("append prefix {}".format(word_prefix)) - lines.append(word_prefix) - word = word_suffix - - if len(line) == 0: - line = word - continue - - if len(line) + len(" ") + len(word) <= BOARD_COLS: - line += " " + word - continue - - #print("append line {}".format(line)) - lines.append(line) - line = word - - if len(line) > 0: - lines.append(line) - - return lines - -class OscTxState: - # The message last sent to the board. - last_msg_encoded = [] - empty_cells_to_send_per_call = 1 - nonempty_cells_to_send_per_call = 1 - - # 0 indicates it's closed. 1 indicates half size. 2 indicates full size. - board_size = 0 - -def resizeBoard(num_lines, tx_state, shrink_only): - - resize_params = [] - - resize_param0 = None - resize_param1 = None - - if num_lines > BOARD_ROWS / 2: - # Board must be expanded to full size. - if shrink_only: - return - - if tx_state.board_size == 2: - return - elif tx_state.board_size == 1: - resize_params.append((False, True)) - else: - resize_params.append((False, False)) - resize_params.append((False, True)) - tx_state.board_size = 2 - elif num_lines == 0: - if not shrink_only: - return - # Board must be shrunk to 0 size - if tx_state.board_size == 0: - return - elif tx_state.board_size == 1: - resize_params.append((True, True)) - else: - resize_params.append((True, False)) - resize_params.append((True, True)) - tx_state.board_size = 0 - else: - # Board must be expanded or shrunk to half size. - if tx_state.board_size == 0: - if shrink_only: - return - resize_params.append((False, False)) - elif tx_state.board_size == 1: - return - else: - if not shrink_only: - return - resize_params.append((True, False)) - tx_state.board_size = 1 - - for resize_param_pair in resize_params: - print("Resizing board... "), - addr="/avatar/parameters/" + generate_utils.getResize0Param() - client.send_message(addr, resize_param_pair[0]) - addr="/avatar/parameters/" + generate_utils.getResize1Param() - client.send_message(addr, resize_param_pair[1]) - - time.sleep(0.25) - - addr="/avatar/parameters/" + generate_utils.getResizeEnableParam() - client.send_message(addr, True) - - # The animation is 0.5 seconds, with another 0.5 second buffer after. We - # want to stop in that buffer. - time.sleep(0.5) - - addr="/avatar/parameters/" + generate_utils.getResizeEnableParam() - client.send_message(addr, False) - - # Wait a while for the animation to complete. - time.sleep(1) - print("done") - - -# Send a message to the board, but only overwrite cells that we know need to -# change. -# This may take multiple calls to complete. -# Returns 3 possible values: -# 0: Done sending. -# 1: Exhausted empty cell budget. -# 2: Exhausted nonempty cell budget. -SEND_MSG_LAZY_DONE = 0 -SEND_MSG_LAZY_SENT_EMPTY = 1 -SEND_MSG_LAZY_SENT_NON_EMPTY = 2 -def sendMessageLazy(client, msg, tx_state): - lines = splitMessage(msg) - msg_encoded = encodeMessage(lines) - msg_encoded_len = len(msg_encoded) - - empty_cells_sent = 0 - nonempty_cells_sent = 0 - n_cells = ceil(msg_encoded_len / NUM_LAYERS) - for cell in range(0, n_cells): - cell_begin = cell * NUM_LAYERS - cell_end = (cell + 1) * NUM_LAYERS - cell_msg = msg_encoded[cell_begin:cell_end] - last_cell_msg = [] - - # Skip cells we've already sent. This makes the board much more - # responsive. - if cell_end <= len(tx_state.last_msg_encoded): - last_cell_msg = tx_state.last_msg_encoded[cell_begin:cell_end] - if cell_msg == last_cell_msg: - continue - - if cell_msg == [state.encoding[' ']] * NUM_LAYERS: - if empty_cells_sent >= tx_state.empty_cells_to_send_per_call: - return SEND_MSG_LAZY_SENT_EMPTY - empty_cells_sent += 1 - else: - if nonempty_cells_sent >= tx_state.nonempty_cells_to_send_per_call: - return SEND_MSG_LAZY_SENT_NON_EMPTY - nonempty_cells_sent += 1 - - sendMessageCellDiscrete(client, cell_msg, cell) - # Pad last msg encoded to the end of the array - tx_state.last_msg_encoded += [state.encoding[" "]] * (cell_end - - len(tx_state.last_msg_encoded)) - tx_state.last_msg_encoded[cell_begin:cell_end] = cell_msg - - #resizeBoard(len(lines), tx_state, shrink_only=True) - return SEND_MSG_LAZY_DONE - -def sendRawMessage(client, msg): - n_cells = ceil(len(msg) / NUM_LAYERS) - for cell in range(0, n_cells): - cell_begin = cell * NUM_LAYERS - cell_end = (cell + 1) * NUM_LAYERS - cell_msg = msg[cell_begin:cell_end] - #print("Send cell {}".format(cell)) - sendMessageCellDiscrete(client, cell_msg, cell) - -def clear(client, tx_state): - disable(client) - - addr="/avatar/parameters/" + generate_utils.getClearBoardParam() - client.send_message(addr, True) - - time.sleep(SYNC_DELAY_S) - - addr="/avatar/parameters/" + generate_utils.getClearBoardParam() - client.send_message(addr, False) - - tx_state.last_msg_encoded = [] - -def indicateSpeech(client, is_speaking: bool): - addr = "/avatar/parameters/" + generate_utils.getIndicator0Param() - client.send_message(addr, is_speaking) - -def indicatePaging(client, is_paging: bool): - addr = "/avatar/parameters/" + generate_utils.getIndicator1Param() - client.send_message(addr, is_paging) - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("-i", default="127.0.0.1", help="OSC server IP") - parser.add_argument("-p", type=int, default=9000, help="OSC server port") - args = parser.parse_args() - - client = getClient(args.i, args.p) - - state.encoding = generateEncoding() - - tx_state = OscTxState() - for line in fileinput.input(): - while sendMessageLazy(client, line, tx_state) != SEND_MSG_LAZY_DONE: - continue - clear(client, tx_state) - diff --git a/steamvr.py b/steamvr.py deleted file mode 100644 index ed4150c..0000000 --- a/steamvr.py +++ /dev/null @@ -1,73 +0,0 @@ -#!/usr/bin/env python3 - -# python3 -m pip install openvr -# License: BSD-3.0 (requires showing notice in binary distributions) -import openvr as vr -import time - -EVENT_NONE = 0 -EVENT_RISING_EDGE = 1 -EVENT_FALLING_EDGE = 2 - -class SessionState: - def __init__(self): - self.system = vr.init(vr.VRApplication_Background) - self.last_packet = 0 - # Whether the configured input event is high or low. - self.event_high = False - -# Checks if the given button on the given controller is pressed. -# Defaults to joystick click / left hand. -# Returns three values: -# 0 - button not pressed -# 1 - button rising edge -# 2 - button falling edge -def pollButtonPress( - session_state: SessionState, - controller: vr.ETrackedControllerRole = vr.TrackedControllerRole_LeftHand, - button: vr.EVRButtonId = vr.k_EButton_Axis0 - ) -> int: - lh_idx = session_state.system.getTrackedDeviceIndexForControllerRole(vr.TrackedControllerRole_LeftHand) - #print("Left hand device idx: {}".format(lh_idx)) - - got_state, state = session_state.system.getControllerState(lh_idx) - if not got_state: - return EVENT_NONE - - if state.unPacketNum == session_state.last_packet: - return EVENT_NONE - - # Clicking joysticks and moving joysticks fire the same events. To - # differentiate movement from clicking, we create a dead zone: if the event - # fires while the stick isn't moved far from center, we assume it's a - # click, not movement. - dead_zone_radius = 0.5 - - # This is the ID of event for the joystick being clicked. - joy_click = vr.k_EButton_Axis0 - joy_click_mask = (1 << joy_click) - ret = EVENT_NONE - if (state.ulButtonPressed & joy_click_mask) != 0 and\ - (state.rAxis[0].x**2 + state.rAxis[0].y**2 < dead_zone_radius**2): - #print("button pressed: %016x" % state.ulButtonPressed) - #for i in range(0, 5): - # print("axis {} x: {} y: {}".format(i, state.rAxis[i].x, state.rAxis[i].y)) - if not session_state.event_high: - ret = EVENT_RISING_EDGE - session_state.event_high = True - elif session_state.event_high: - session_state.event_high = False - ret = EVENT_FALLING_EDGE - return ret - -if __name__ == "__main__": - session_state = SessionState() - while True: - time.sleep(0.1) - - event = pollButtonPress(session_state) - if event == EVENT_RISING_EDGE: - print("rising edge") - elif event == EVENT_FALLING_EDGE: - print("falling edge") - diff --git a/string_matcher.py b/string_matcher.py deleted file mode 100644 index 461f180..0000000 --- a/string_matcher.py +++ /dev/null @@ -1,155 +0,0 @@ -#!/usr/bin/env python3 - -# python3 -m pip install editdistance -# License: MIT. -import editdistance - -import typing - -DEBUG = False - -# Find the window where the distance between these two transcriptions is -# minimized and use it to stitch them together. -def matchStringList(old_words: typing.List[str], - new_words: typing.List[str], window_size = 6) -> str: - if old_words == new_words: - return " ".join(old_words) - elif len(old_words) >= window_size and len(new_words) >= window_size: - # Find the window where the cumulative string distance - # between the words in that window in the old/new transcription - # is minimized. - old_slice = old_words[len(old_words) - window_size:] - - best_match_i = None - best_match_d = window_size * 1000 - - for i in range(0, 1 + len(new_words) - window_size): - new_slice = new_words[i:i + window_size] - cur_d = 0 - for j in range(0, window_size): - cur_d += editdistance.eval(old_slice[j], new_slice[j]) - if cur_d < best_match_d: - best_match_i = i - best_match_d = cur_d - - old_prefix = old_words[0:len(old_words) - window_size] - overlap = new_words[best_match_i:best_match_i + window_size] - new_suffix = new_words[best_match_i + window_size:] - - #print("Best match i: {}".format(best_match_i)) - #print("Window size: {}".format(window_size)) - #print("Old prefix: {}".format(old_prefix)) - #print("Overlap: {}".format(overlap)) - #print("New suffix: {}".format(new_suffix)) - return " ".join(old_prefix + new_words[best_match_i:]) - else: - return " ".join(new_words) - -def matchSpaceDelimitedStrings(old_text: str, new_text: str, window_size = 4) -> str: - old_words = old_text.split() - new_words = new_text.split() - return matchStringList(old_words, new_words, window_size) - -def matchStrings(old_text: str, new_text: str, window_size = 3) -> str: - if old_text == new_text: - return old_text - elif len(old_text) >= window_size and len(new_text) >= window_size: - # Find the window where the cumulative string distance - # between the text in that window in the old/new transcription - # is minimized. - - best_match_i = None - best_match_j = None - best_match_d = window_size * 1000 - - # The number of old slices to look at. Since the old text can grow - # unboundedly, it's crucial that we don't compare to every possible - # slice in the old and new transcriptions (O(N^2) time complexity). - # This is still wildly inefficient, but good enough for continuous - # transcription in a game bound by a single CPU core, like VRChat. - max_old_slices = 300 - old_n_slices = min(max_old_slices, len(old_text)) - last_old_window = len(old_text) - window_size - first_old_window = max(last_old_window - old_n_slices, 0) - - for i in range(first_old_window, last_old_window + 1): - old_slice = old_text[i:i + window_size] - - for j in range(0, 1 + len(new_text) - window_size): - new_slice = new_text[j:j + window_size] - cur_d = editdistance.eval(old_slice, new_slice) - if cur_d < best_match_d: - best_match_i = i - best_match_j = j - best_match_d = cur_d - - if DEBUG: - print("optimum at old '{}' i={} new '{}' j={} d={}".format( - old_slice, i, new_slice, j, cur_d)) - - old_prefix = old_text[0:best_match_i] - overlap = new_text[best_match_j:best_match_j + window_size] - new_suffix = new_text[best_match_j + window_size:] - - if DEBUG: - print("Best match i: {}".format(best_match_i)) - print("Best match j: {}".format(best_match_j)) - print("Window size: {}".format(window_size)) - print("Old prefix: {}".format(old_prefix)) - print("Overlap: {}".format(overlap)) - print("New suffix: {}".format(new_suffix)) - print("Input 1: {}".format(old_text)) - print("Input 2: {}".format(new_text)) - print("Output: {}".format(old_prefix + - new_text[best_match_j:])) - return old_prefix + new_text[best_match_j:] - else: - return new_text - -if __name__ == "__main__": - # Identical transcriptions should not be changed. - assert(matchSpaceDelimitedStrings("This is a test case.", "This is a test case.", window_size = 3) == "This is a test case.") - # A suffix should be detected and ignored. - assert(matchSpaceDelimitedStrings("This is a test case.", "is a test case.", window_size = 3) == "This is a test case.") - # A lengthening suffix should be correctly appended. - assert(matchSpaceDelimitedStrings("This is a test", "is a test case.", window_size = 3) == "This is a test case.") - # A strictly longer transcription should override the old prefix. - assert(matchSpaceDelimitedStrings("This is a test", "This is a test case.", window_size = 3) == "This is a test case.") - # Paranoia: repetitive text broke the older implementation, so I included - # some test cases without fully understanding what the old problem was. - assert(matchSpaceDelimitedStrings("test test test", "test test test test test test", window_size - = 3) == "test test test test test test") - assert(matchSpaceDelimitedStrings("test test test test test test", "test test test", window_size - = 3) == "test test test test test test") - - print(matchStrings("foo bar", "bar baz")) - print(matchStrings("alpha beta", "beta gamma")) - - in1 = "Okay, what about now? Looks like it sort of works. Key word being sort of." - in2 = "okay what about now looks like it sort of works key word being sort of looks" - bad_out = "Okay, what about now? Looks like it sort of works. Key word being sort of works key word being sort of looks" - good_out = "Okay what about now looks like it sort of works key word being sort of looks" - good_out = "Okay, what about now? Looks like it sort of works. Key word being sort of looks" - print(matchStrings(in1, in2)) - assert(matchStrings(in1, in2) == good_out) - - in1 = "This repository can take" - in2 = "This repository contains the code for" - bad_out = "This repository can tode for" - good_out = "This repository contains the code for" - assert(matchStrings(in1, in2) == good_out) - - in1 = "See something." - in2 = "See something. Say something." - bad_out = in1 - good_out = in2 - print(matchStrings(in1, in2)) - assert(matchStrings(in1, in2) == bad_out) - - in1 = "a" * 1000 - in2 = "b" * 10 * 1000 - # This should be fast (< 1 second) - #matchStrings(in1, in2) - - print("Tests passed.") - diff --git a/transcribe.py b/transcribe.py deleted file mode 100644 index 62e6add..0000000 --- a/transcribe.py +++ /dev/null @@ -1,353 +0,0 @@ -#!/usr/bin/env python3 - -import argparse -import copy -from datetime import datetime -import os -import osc_ctrl -# python3 -m pip install pydub -# License: MIT. -from pydub import AudioSegment as pydub_AudioSegment -from pydub import effects as pydub_effects -# python3 -m pip install pyaudio -# License: MIT. -import pyaudio -import numpy as np -# python3 -m pip install playsound==1.2.2 -# License: MIT. -from playsound import playsound -import steamvr -import string_matcher -import sys -import threading -import time -import wave -# python3 -m pip install git+https://github.com/openai/whisper.git -# python3 -m pip install torch -f https://download.pytorch.org/whl/torch_stable.html -# License: MIT. -import whisper - -class AudioState: - CHUNK = 1024 - FORMAT = pyaudio.paInt16 - CHANNELS = 1 - # This matches the framerate expected by whisper. - RATE = 16000 - - # The maximum length that recordAudio() will put into frames before it - # starts dropping from the start. - MAX_LENGTH_S = 10 - MAX_LENGTH_S_WHISPER = 30 - # The minimum length that recordAudio() will wait for before saving audio. - MIN_LENGTH_S = 1 - - # PyAudio object - p = None - - # PyAudio stream object - stream = None - - text = "" - committed_text = "" - frames = [] - - # Locks access to `text`. - transcribe_lock = threading.Lock() - - # Locks access to `frames`, and audio stored on disk. - audio_lock = threading.Lock() - - # Used to tell the threads when to stop. - run_app = True - - transcribe_sleep_duration_min_s = 0.05 - transcribe_sleep_duration_max_s = 1.50 - transcribe_no_change_count = 0 - transcribe_sleep_duration = transcribe_sleep_duration_min_s - - tx_state = osc_ctrl.OscTxState() - - # The transcription thread transcribes without holding locks, then - # blocks on it. Thus we need some way to tell the transcription - # thread to drop that transcription. - drop_transcription = False - - # The language the user is speaking in. Default is English but user may set - # this to whatever they want. - language = whisper.tokenizer.TO_LANGUAGE_CODE["english"] - - audio_paused = False - - osc_client = osc_ctrl.getClient() - -def getMicStream(which_mic): - audio_state = AudioState() - audio_state.p = pyaudio.PyAudio() - - print("Finding index mic...") - got_match = False - device_index = -1 - focusrite_str = "Focusrite" - index_str = "Digital Audio Interface" - if which_mic == "index": - target_str = index_str - elif which_mic == "focusrite": - target_str = focusrite_str - else: - raise Exception("Unrecognized mic requested: {}".format(which_mic)) - while got_match == False: - info = audio_state.p.get_host_api_info_by_index(0) - numdevices = info.get('deviceCount') - - for i in range(0, numdevices): - if (audio_state.p.get_device_info_by_host_api_device_index(0, i).get('maxInputChannels')) > 0: - device_name = audio_state.p.get_device_info_by_host_api_device_index(0, i).get('name') - print("Input Device id ", i, " - ", device_name) - if target_str in device_name: - print("Got match: {}".format(device_name)) - device_index = i - got_match = True - break - if got_match == False: - print("No match, sleeping") - time.sleep(3) - - audio_state.stream = audio_state.p.open(format=audio_state.FORMAT, - channels=audio_state.CHANNELS, rate=audio_state.RATE, - input=True, frames_per_buffer=audio_state.CHUNK, - input_device_index=device_index) - - return audio_state - -# Continuously records audio as long as audio_state.run_app is set. -def recordAudio(audio_state): - print("Recording audio") - while audio_state.run_app: - data = audio_state.stream.read(audio_state.CHUNK) - - if audio_state.audio_paused: - time.sleep(0.1) - continue - - audio_state.frames.append(data) - max_frames = int(audio_state.RATE * audio_state.MAX_LENGTH_S / audio_state.CHUNK) - if len(audio_state.frames) > max_frames: - audio_state.frames = audio_state.frames[-1 * max_frames :] - - print("Done recording") - -def resetAudioLocked(audio_state): - audio_state.frames = [] - audio_state.transcribe_no_change_count = 0 - audio_state.transcribe_sleep_duration = \ - audio_state.transcribe_sleep_duration_min_s - - audio_state.committed_text = "" - audio_state.text = "" - -def resetDisplayLocked(audio_state): - osc_ctrl.clear(audio_state.osc_client, audio_state.tx_state) - -def resetAudio(audio_state): - audio_state.transcribe_lock.acquire() - audio_state.audio_lock.acquire() - resetAudioLocked(audio_state) - audio_state.audio_lock.release() - audio_state.transcribe_lock.release() - -# Transcribe the audio recorded in a file. -def transcribe(audio_state, model, frames): - - start_time = time.time() - - frames = audio_state.frames - # Convert from signed 16-bit int [-32768, 32767] to signed 16-bit float on - # [-1, 1]. - # We should technically acquire a lock to protect frames, but this is - # really slow and in practice it doesn't make the app crash, so who cares. - frames = np.asarray(audio_state.frames) - audio = np.frombuffer(frames, np.int16).flatten().astype(np.float32) / 32768.0 - - audio = whisper.pad_or_trim(audio, length = audio_state.RATE * - audio_state.MAX_LENGTH_S_WHISPER) - - mel = whisper.log_mel_spectrogram(audio).to(model.device) - - result = None - #for temp in (0.00, 0.05, 0.10, 0.15, 0.20): - #for temp in (0.00, 0.05): - for temp in (0.00,): - print("temp: {}".format(temp)) - options = whisper.DecodingOptions(language = audio_state.language, - beam_size = 5, temperature = temp, without_timestamps = True) - result = whisper.decode(model, mel, options) - - if result.avg_logprob < -1.0: - print("avg logprob: {}".format(result.avg_logprob)) - result = None - continue - - if result.compression_ratio > 2.4: - print("compression ratio: {}".format(result.compression_ratio)) - result = None - continue - - if result.no_speech_prob > 0.60: - print("no speech prob: {}".format(result.no_speech_prob)) - result = None - continue - - result = result.text - break - - return result - -def transcribeAudio(audio_state, model): - last_transcribe_time = time.time() - while audio_state.run_app == True: - # Pace this out - time.sleep(audio_state.transcribe_sleep_duration) - - # Increase sleep time. Code below will set sleep time back to minimum - # if a change is detected. - if audio_state.transcribe_no_change_count < 10: - audio_state.transcribe_no_change_count += 1 - longer_sleep_dur = audio_state.transcribe_sleep_duration - longer_sleep_dur += audio_state.transcribe_sleep_duration_min_s * (1.3**audio_state.transcribe_no_change_count) - audio_state.transcribe_sleep_duration = min( - audio_state.transcribe_sleep_duration_max_s, - longer_sleep_dur) - - text = transcribe(audio_state, model, audio_state.frames) - if not text: - print("no transcription, spin ({} seconds)".format(time.time() - last_transcribe_time)) - last_transcribe_time = time.time() - continue - - if audio_state.drop_transcription: - audio_state.drop_transcription = False - print("drop transcription ({} seconds)".format(time.time() - last_transcribe_time)) - last_transcribe_time = time.time() - continue - - words = ''.join(c for c in text.lower() if (c.isalpha() or c == " ")).split() - - now = time.time() - print("Transcription ({} seconds): {}".format( - now - last_transcribe_time, - audio_state.text)) - last_transcribe_time = now - - old_text = audio_state.text - - audio_state.text = string_matcher.matchStrings(audio_state.text, - text, window_size = 30) - if old_text != audio_state.text: - # We think the user said something, so reset the amount of - # time we sleep between transcriptions to the minimum. - audio_state.transcribe_no_change_count = 0 - audio_state.transcribe_sleep_duration = audio_state.transcribe_sleep_duration_min_s - -def sendAudio(audio_state): - while audio_state.run_app == True: - text = audio_state.committed_text + " " + audio_state.text - ret = osc_ctrl.sendMessageLazy(audio_state.osc_client, text, - audio_state.tx_state) - is_paging = (ret == osc_ctrl.SEND_MSG_LAZY_SENT_NON_EMPTY) - osc_ctrl.indicatePaging(audio_state.osc_client, is_paging) - - # Pace this out - time.sleep(0.01) - -def readControllerInput(audio_state): - session = steamvr.SessionState() - RECORD_STATE = 0 - PAUSE_STATE = 1 - state = PAUSE_STATE - osc_ctrl.indicateSpeech(audio_state.osc_client, False) - osc_ctrl.indicatePaging(audio_state.osc_client, False) - while audio_state.run_app == True: - time.sleep(0.05) - - event = steamvr.pollButtonPress(session) - - if event == steamvr.EVENT_RISING_EDGE: - print("event get") - if state == RECORD_STATE: - state = PAUSE_STATE - osc_ctrl.indicateSpeech(audio_state.osc_client, False) - playsound(os.path.abspath("Sounds/Noise_Off.wav")) - - audio_state.audio_paused = True - elif state == PAUSE_STATE: - state = RECORD_STATE - osc_ctrl.indicateSpeech(audio_state.osc_client, True) - playsound(os.path.abspath("Sounds/Noise_On.wav")) - - resetAudioLocked(audio_state) - resetDisplayLocked(audio_state) - audio_state.drop_transcription = True - audio_state.audio_paused = False - -def transcribeLoop(mic: str, language: str): - audio_state = getMicStream(mic) - audio_state.language = whisper.tokenizer.TO_LANGUAGE_CODE[language] - - record_audio_thd = threading.Thread(target = recordAudio, args = [audio_state]) - record_audio_thd.daemon = True - record_audio_thd.start() - - print("Safe to start talking") - - #model = whisper.load_model("tiny") - #model = whisper.load_model("base") - model = whisper.load_model("small") - #model = whisper.load_model("medium") - - transcribe_audio_thd = threading.Thread(target = transcribeAudio, args = [audio_state, model]) - transcribe_audio_thd.daemon = True - transcribe_audio_thd.start() - - send_audio_thd = threading.Thread(target = sendAudio, args = [audio_state]) - send_audio_thd.daemon = True - send_audio_thd.start() - - controller_input_thd = threading.Thread(target = readControllerInput, args = [audio_state]) - controller_input_thd.daemon = True - controller_input_thd.start() - - print("Press enter to start a new message.") - for line in sys.stdin: - audio_state.transcribe_lock.acquire() - audio_state.audio_lock.acquire() - resetAudioLocked(audio_state) - resetDisplayLocked(audio_state) - audio_state.drop_transcription = True - audio_state.audio_paused = False - audio_state.audio_lock.release() - audio_state.transcribe_lock.release() - if "exit" in line or "quit" in line: - break - - print("Joining threads") - audio_state.run_app = False - audio_state.run_app = False - record_audio_thd.join() - transcribe_audio_thd.join() - controller_input_thd.join() - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("--mic", type=str, help="Which mic to use. Options: index, focusrite. Default: index") - parser.add_argument("--language", type=str, help="Which language to use. Ex: english, japanese, chinese, french, german.") - args = parser.parse_args() - - if not args.mic: - args.mic = "index" - - if not args.language: - args.language = "english" - - transcribeLoop(args.mic, args.language) - -- cgit v1.2.3