summaryrefslogtreecommitdiffstats
path: root/Scripts
diff options
context:
space:
mode:
Diffstat (limited to 'Scripts')
-rw-r--r--Scripts/osc_ctrl.py375
-rw-r--r--Scripts/paging.py121
-rw-r--r--Scripts/text_wrapping.py55
-rw-r--r--Scripts/transcribe.py108
4 files changed, 303 insertions, 356 deletions
diff --git a/Scripts/osc_ctrl.py b/Scripts/osc_ctrl.py
index d4c8bf3..3b25778 100644
--- a/Scripts/osc_ctrl.py
+++ b/Scripts/osc_ctrl.py
@@ -1,40 +1,20 @@
#!/usr/bin/env python3
import argparse
-import random
-import time
-import fileinput
+from generate_utils import config
import generate_utils
-
-# python3 -m pip install python-osc
-# License: public domain.
+from paging import MultiLinePager
from pythonosc import udp_client
-
-from math import ceil
-from math import floor
-from generate_utils import getSelectParam
-from generate_utils import getEnableParam
-from generate_utils import config
-
-import emotes
+import time
# Based on a couple experiments, this seems like about as fast as we can go
# before players start losing events.
SYNC_FREQ_HZ = 5.0
SYNC_DELAY_S = 1.0 / SYNC_FREQ_HZ
-def usage():
- print("python3 -m pip install python-osc")
- print("python3 ./osc_ctrl.py")
-
def getClient(ip = "127.0.0.1", port = 9000):
return udp_client.SimpleUDPClient(ip, port)
-class EvilGlobalState():
- # Mapping from ascii char to encoded byte.
- encoding = {}
-state = EvilGlobalState()
-
# The characters in the TaSTT are all numbered from top left to bottom right.
# This function provides a mapping from letter ('a') to index (26).
def generateEncoding():
@@ -42,304 +22,103 @@ def generateEncoding():
for i in range(0, 65535):
encoding[chr(i)] = (i % 256, int(i / 256))
return encoding
-state.encoding = generateEncoding()
-# Encodes a list of lines into the character set used by the board.
-# Pads lines with spaces and adds lines so that the total number of
-# lines sent is a multiple of the number of rows in the board.
-def encodeMessage(lines):
- result = []
- lines_tmp = lines + [" "] * ((config.BOARD_ROWS - len(lines)) % config.BOARD_ROWS)
- for line in lines_tmp:
- first_word = True
- for word in line.split():
- if first_word:
- first_word = False
- else:
- result.append(state.encoding[' '])
+class OscState:
+ def __init__(self, chars_per_sync: int, rows: int, cols: int,
+ ip = "127.0.0.1", port = 9000):
+ self.client = getClient(ip, port)
+ self.pager = MultiLinePager(chars_per_sync, rows, cols)
+ self.encoding= generateEncoding()
- emote_word, emote_word_idx = emotes.lookup(word)
- if emote_word:
+ def reset(self):
+ self.pager.reset()
- word_align = 0
- if len(result) > 0:
- word_align = (6 - len(result) % 6) % 6
- word = ' ' * word_align + word
+def encodeMessage(encoding, msg):
+ encoded = []
+ for char in msg:
+ encoded.append(encoding[char])
+ return encoded
- for i in range(0, word_align):
- result.append(state.encoding[' '])
+def lockWorld(client, lock: bool):
+ addr = "/avatar/parameters/" + generate_utils.getLockWorldParam()
+ client.send_message(addr, lock)
- for i in range(0, 6):
- result.append((emote_word_idx, 0xE0))
- continue
+def toggleBoard(client, show: bool):
+ addr = "/avatar/parameters/" + generate_utils.getToggleParam()
+ client.send_message(addr, show)
- for char in word:
- if not char in state.encoding:
- print("skip unrecognized char {}".format(char))
- continue
- result.append(state.encoding[char])
- result += [state.encoding[' ']] * (config.BOARD_COLS - len(line))
- return result
+def indicateSpeech(client, is_speaking: bool):
+ addr = "/avatar/parameters/" + generate_utils.getIndicator0Param()
+ client.send_message(addr, is_speaking)
-def updateCell(client, cell_idx, letter_encoded):
- for byte in range(0, generate_utils.config.BYTES_PER_CHAR):
- addr="/avatar/parameters/" + generate_utils.getBlendParam(cell_idx, byte)
- letter_remapped = (-127.5 + letter_encoded[byte]) / 127.5
- client.send_message(addr, letter_remapped)
+def indicatePaging(client, is_paging: bool):
+ addr = "/avatar/parameters/" + generate_utils.getIndicator1Param()
+ client.send_message(addr, is_paging)
def enable(client):
- addr="/avatar/parameters/" + getEnableParam()
+ addr="/avatar/parameters/" + generate_utils.getEnableParam()
client.send_message(addr, True)
def disable(client):
- addr="/avatar/parameters/" + getEnableParam()
+ addr="/avatar/parameters/" + generate_utils.getEnableParam()
client.send_message(addr, False)
-# Send a cell all at once.
-# `which_cell` is an integer in the range [0,NUM_REGIONS)
-def sendMessageCellDiscrete(client, msg_cell, which_cell):
- empty_cell = [state.encoding[' ']] * generate_utils.config.CHARS_PER_SYNC
+def clear(osc_state: OscState):
+ disable(osc_state.client)
- if msg_cell != empty_cell:
- addr="/avatar/parameters/" + generate_utils.getSpeechNoiseToggleParam()
- client.send_message(addr, True)
-
- # Really long messages just wrap back around.
- which_cell = (which_cell % generate_utils.config.numRegions(config.CHARS_PER_SYNC - 1))
-
- enable(client)
-
- # Seek to the current cell.
- addr="/avatar/parameters/" + getSelectParam()
- client.send_message(addr, which_cell)
-
- # Update each letter
- for i in range(0, len(msg_cell)):
- updateCell(client, i, msg_cell[i])
+ addr="/avatar/parameters/" + generate_utils.getClearBoardParam()
+ osc_state.client.send_message(addr, True)
- # Wait for sync.
time.sleep(SYNC_DELAY_S)
- if msg_cell != empty_cell:
- addr="/avatar/parameters/" + generate_utils.getSpeechNoiseToggleParam()
- client.send_message(addr, False)
-
-# The board is broken down into contiguous collections of characters called
-# cells. Each cell contains `CHARS_PER_SYNC` characters. We can update one cell
-# every ~1.0 seconds. Going faster causes the board to display garbage to
-# remote players.
-def splitMessage(msg):
- lines = []
- line = ""
- for word in msg.split():
- # Hack: if the word is an emote, we make it 6 characters wide, then
- # encode it differently later on.
- emote_word, emote_word_idx = emotes.lookup(word)
- if emote_word:
- word = word.ljust(6)
- # Due to some fuckery I do in the shader, emotes have to be rendered on
- # a 6-character boundary. So pad the word on the left with spaces
- # to get it onto that boundary.
- word_align = 0
- if len(line) > 0:
- word_align = (6 - (len(line) + 1) % 6) % 6
- print("len line: {}".format(len(line)))
- print("word align: {}".format(word_align))
- word = ' ' * word_align + word
-
- while len(word) > config.BOARD_COLS:
- if len(line) != 0:
- lines.append(line)
- line = ""
-
- word_prefix = word[0:config.BOARD_COLS-1] + "-"
- word_suffix = word[config.BOARD_COLS-1:]
- #print("append prefix {}".format(word_prefix))
- lines.append(word_prefix)
- word = word_suffix
-
- if len(line) == 0:
- line = word
- continue
-
- if len(line) + len(" ") + len(word) <= config.BOARD_COLS:
- line += " " + word
- continue
-
- #print("append line {}".format(line))
- lines.append(line)
- line = word
-
- if len(line) > 0:
- lines.append(line)
-
- return lines
-
-class OscTxState:
- # The message last sent to the board.
- last_msg_encoded = []
- empty_cells_to_send_per_call = 1
- nonempty_cells_to_send_per_call = 1
-
- # 0 indicates it's closed. 1 indicates half size. 2 indicates full size.
- board_size = 0
-
-def resizeBoard(num_lines, tx_state, shrink_only):
-
- resize_params = []
-
- resize_param0 = None
- resize_param1 = None
-
- if num_lines > config.BOARD_ROWS / 2:
- # Board must be expanded to full size.
- if shrink_only:
- return
-
- if tx_state.board_size == 2:
- return
- elif tx_state.board_size == 1:
- resize_params.append((False, True))
- else:
- resize_params.append((False, False))
- resize_params.append((False, True))
- tx_state.board_size = 2
- elif num_lines == 0:
- if not shrink_only:
- return
- # Board must be shrunk to 0 size
- if tx_state.board_size == 0:
- return
- elif tx_state.board_size == 1:
- resize_params.append((True, True))
- else:
- resize_params.append((True, False))
- resize_params.append((True, True))
- tx_state.board_size = 0
- else:
- # Board must be expanded or shrunk to half size.
- if tx_state.board_size == 0:
- if shrink_only:
- return
- resize_params.append((False, False))
- elif tx_state.board_size == 1:
- return
- else:
- if not shrink_only:
- return
- resize_params.append((True, False))
- tx_state.board_size = 1
-
- for resize_param_pair in resize_params:
- print("Resizing board... "),
- addr="/avatar/parameters/" + generate_utils.getResize0Param()
- client.send_message(addr, resize_param_pair[0])
- addr="/avatar/parameters/" + generate_utils.getResize1Param()
- client.send_message(addr, resize_param_pair[1])
-
- time.sleep(0.25)
-
- addr="/avatar/parameters/" + generate_utils.getResizeEnableParam()
- client.send_message(addr, True)
-
- # The animation is 0.5 seconds, with another 0.5 second buffer after. We
- # want to stop in that buffer.
- time.sleep(0.5)
-
- addr="/avatar/parameters/" + generate_utils.getResizeEnableParam()
- client.send_message(addr, False)
-
- # Wait a while for the animation to complete.
- time.sleep(1)
- print("done")
-
-
-# Send a message to the board, but only overwrite cells that we know need to
-# change.
-# This may take multiple calls to complete.
-# Returns 3 possible values:
-# 0: Done sending.
-# 1: Exhausted empty cell budget.
-# 2: Exhausted nonempty cell budget.
-SEND_MSG_LAZY_DONE = 0
-SEND_MSG_LAZY_SENT_EMPTY = 1
-SEND_MSG_LAZY_SENT_NON_EMPTY = 2
-def sendMessageLazy(client, msg, tx_state):
- lines = splitMessage(msg)
- msg_encoded = encodeMessage(lines)
- msg_encoded_len = len(msg_encoded)
-
- empty_cells_sent = 0
- nonempty_cells_sent = 0
- n_cells = floor(msg_encoded_len / config.CHARS_PER_SYNC)
- for cell in range(0, n_cells):
- cell_begin = cell * config.CHARS_PER_SYNC
- cell_end = (cell + 1) * config.CHARS_PER_SYNC
- cell_msg = msg_encoded[cell_begin:cell_end]
- last_cell_msg = []
+ addr="/avatar/parameters/" + generate_utils.getClearBoardParam()
+ osc_state.client.send_message(addr, False)
- # Skip cells we've already sent. This makes the board much more
- # responsive.
- if cell_end <= len(tx_state.last_msg_encoded):
- last_cell_msg = tx_state.last_msg_encoded[cell_begin:cell_end]
- if cell_msg == last_cell_msg:
- continue
+ osc_state.reset()
- if cell_msg == [state.encoding[' ']] * config.CHARS_PER_SYNC:
- if empty_cells_sent >= tx_state.empty_cells_to_send_per_call:
- return SEND_MSG_LAZY_SENT_EMPTY
- empty_cells_sent += 1
- else:
- if nonempty_cells_sent >= tx_state.nonempty_cells_to_send_per_call:
- return SEND_MSG_LAZY_SENT_NON_EMPTY
- nonempty_cells_sent += 1
+def updateRegion(client, region_idx, letter_encoded):
+ for byte in range(0, generate_utils.config.BYTES_PER_CHAR):
+ addr="/avatar/parameters/" + generate_utils.getBlendParam(region_idx, byte)
+ letter_remapped = (-127.5 + letter_encoded[byte]) / 127.5
+ client.send_message(addr, letter_remapped)
- sendMessageCellDiscrete(client, cell_msg, cell)
- # Pad last msg encoded to the end of the array
- tx_state.last_msg_encoded += [state.encoding[" "]] * (cell_end -
- len(tx_state.last_msg_encoded))
- tx_state.last_msg_encoded[cell_begin:cell_end] = cell_msg
+# Sends one slice of `msg` to the board then returns. Slices are sent
+# in FIFO order; e.g., the most recently spoken words are sent last.
+# Returns True if done paging, False otherwise.
+def pageMessage(osc_state: OscState, msg: str) -> bool:
+ msg_slice, slice_idx = osc_state.pager.getNextSlice(msg)
+ if slice_idx == -1:
+ return True
+ print("sending page {}: {} ({})".format(slice_idx, msg_slice,
+ len(msg_slice)))
+
+ empty_slice = " " * len(msg_slice)
+ if msg_slice != empty_slice:
+ addr="/avatar/parameters/" + generate_utils.getSpeechNoiseToggleParam()
+ osc_state.client.send_message(addr, True)
- #resizeBoard(len(lines), tx_state, shrink_only=True)
- return SEND_MSG_LAZY_DONE
+ # Really long messages just wrap back around.
+ which_region = (slice_idx % generate_utils.config.numRegions(0))
+ print("send to region {}".format(which_region))
-def sendRawMessage(client, msg):
- n_cells = ceil(len(msg) / config.CHARS_PER_SYNC)
- for cell in range(0, n_cells):
- cell_begin = cell * config.CHARS_PER_SYNC
- cell_end = (cell + 1) * config.CHARS_PER_SYNC
- cell_msg = msg[cell_begin:cell_end]
- #print("Send cell {}".format(cell))
- sendMessageCellDiscrete(client, cell_msg, cell)
+ enable(osc_state.client)
-def clear(client, tx_state):
- disable(client)
+ # Seek to the current region.
+ addr="/avatar/parameters/" + generate_utils.getSelectParam()
+ osc_state.client.send_message(addr, which_region)
- addr="/avatar/parameters/" + generate_utils.getClearBoardParam()
- client.send_message(addr, True)
+ # Update each letter.
+ encoded = encodeMessage(osc_state.encoding, msg_slice)
+ print("len encoded: {}".format(len(encoded)))
+ for i in range(0, len(msg_slice)):
+ updateRegion(osc_state.client, i, encoded[i])
+ # Wait for parameter sync.
time.sleep(SYNC_DELAY_S)
- addr="/avatar/parameters/" + generate_utils.getClearBoardParam()
- client.send_message(addr, False)
-
- tx_state.last_msg_encoded = []
-
-def lockWorld(client, lock: bool):
- addr = "/avatar/parameters/" + generate_utils.getLockWorldParam()
- client.send_message(addr, lock)
-
-def toggleBoard(client, show: bool):
- addr = "/avatar/parameters/" + generate_utils.getToggleParam()
- client.send_message(addr, show)
-
-def indicateSpeech(client, is_speaking: bool):
- addr = "/avatar/parameters/" + generate_utils.getIndicator0Param()
- client.send_message(addr, is_speaking)
-
-def indicatePaging(client, is_paging: bool):
- addr = "/avatar/parameters/" + generate_utils.getIndicator1Param()
- client.send_message(addr, is_paging)
+ if msg_slice != empty_slice:
+ addr="/avatar/parameters/" + generate_utils.getSpeechNoiseToggleParam()
+ osc_state.client.send_message(addr, False)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
@@ -347,13 +126,3 @@ if __name__ == "__main__":
parser.add_argument("-p", type=int, default=9000, help="OSC server port")
args = parser.parse_args()
- client = getClient(args.i, args.p)
-
- state.encoding = generateEncoding()
-
- tx_state = OscTxState()
- for line in fileinput.input():
- while sendMessageLazy(client, line, tx_state) != SEND_MSG_LAZY_DONE:
- continue
- clear(client, tx_state)
-
diff --git a/Scripts/paging.py b/Scripts/paging.py
new file mode 100644
index 0000000..8cde278
--- /dev/null
+++ b/Scripts/paging.py
@@ -0,0 +1,121 @@
+#!/usr/bin/env python3
+
+from math import ceil
+from text_wrapping import TextWrapper
+
+def getSlice(msg: str, idx: int, slice_len: int) -> str:
+ begin = idx * slice_len
+ end = (idx + 1) * slice_len
+ msg_len = len(msg)
+ if msg_len >= end:
+ return msg[begin:end]
+ if msg_len > begin:
+ return msg[begin:end] + (" " * (end - msg_len))
+ return None
+
+def setSlice(msg: str, idx: int, slice_len: int, msg_slice: str) -> str:
+ begin = idx * slice_len
+ end = (idx + 1) * slice_len
+ prefix = msg[0:begin]
+ prefix += " " * (begin - len(prefix))
+ suffix = msg[end:]
+ msg = prefix + msg_slice + suffix
+ return msg
+
+class SingleLinePager:
+ def __init__(self, slice_len: int):
+ self.msg = ""
+ self.slice_len = slice_len
+
+ def reset(self):
+ self.msg = ""
+
+ def getNextSlice(self, msg) -> tuple[str, int]:
+ for i in range(0, ceil(len(msg) / self.slice_len)):
+ old_slice = getSlice(self.msg, i, self.slice_len)
+ new_slice = getSlice(msg, i, self.slice_len)
+ if old_slice != new_slice:
+ self.msg = setSlice(self.msg, i, self.slice_len, new_slice)
+ return new_slice, i
+ return "", -1
+
+class MultiLinePager:
+ def __init__(self, slice_len: int, rows: int, cols: int):
+ # This is a list of lists of SingleLinePagers.
+ # It represents a list of pages, each containing a list of lines.
+ self.pages = []
+ self.slice_len = slice_len
+ self.rows = rows
+ self.cols = cols
+
+ def reset(self):
+ self.pages = []
+
+ def getNextSlice(self, msg) -> tuple[str, int]:
+ pages = TextWrapper(self.rows, self.cols).wrap(msg)
+
+ # Wrapping split the input message along line boundaries and along page
+ # boundaries. However, we're going to treat each page like a single
+ # line, so that `slice_idx` can be used as a region index. Therefore,
+ # we need exactly one SingleLinePager per page.
+ for pi in range(len(self.pages), len(pages)):
+ self.pages.append(SingleLinePager(self.slice_len))
+
+ for pi in range(0, len(pages)):
+ line = "".join(pages[pi])
+ pager = self.pages[pi]
+ msg_slice, slice_idx = pager.getNextSlice(line)
+ if slice_idx != -1:
+ return msg_slice, slice_idx
+ return "", -1
+
+if __name__ == "__main__":
+ assert(getSlice("abcdefghij", 0, 1) == "a")
+ assert(getSlice("abcdefghij", 9, 1) == "j")
+ assert(getSlice("abcdefghij", 0, 2) == "ab")
+ assert(getSlice("abcdefghij", 1, 2) == "cd")
+ assert(getSlice("abcdefghij", 3, 3) == "j ")
+ assert(getSlice("abcdefghij", 10, 1) == None)
+ assert(getSlice("abcdefghij", 11, 1) == None)
+
+ assert(setSlice("abcdefghij", 1, 2, "kl") == "abklefghij")
+ assert(setSlice("abc", 1, 2, "de") == "abde")
+ assert(setSlice("abc", 0, 2, "de") == "dec")
+
+ slice_len = 2
+ p = SingleLinePager(slice_len)
+ p.msg = "test"
+ assert(p.getNextSlice("test")[0] == "")
+ assert(p.getNextSlice("tast")[0] == "ta")
+ assert(p.getNextSlice("tast")[0] == "")
+
+ p.msg = ""
+ assert(p.getNextSlice("test")[0] == "te")
+ assert(p.msg == "te")
+ assert(p.getNextSlice("test")[0] == "st")
+ assert(p.msg == "test")
+ assert(p.getNextSlice("test")[0] == "")
+ assert(p.msg == "test")
+ assert(p.getNextSlice("tests")[0] == "s ")
+
+ slice_len = 2
+ rows = 2
+ cols = 4
+ p = MultiLinePager(slice_len, rows, cols)
+ assert(p.getNextSlice("")[0] == "")
+ assert(p.getNextSlice("yo")[0] == "yo")
+ assert(p.getNextSlice("yogi")[0] == "gi")
+ assert(p.getNextSlice("yugi")[0] == "yu")
+ assert(p.getNextSlice("yugi is a")[0] == "is")
+ assert(p.getNextSlice("yugi is a")[0] == " a")
+ assert(p.getNextSlice("yugi is a pussy")[0] == "pu")
+ assert(p.getNextSlice("yugi is a pussy")[0] == "s-")
+ assert(p.getNextSlice("yugi is a pussy")[0] == "sy")
+
+ p = MultiLinePager(slice_len, rows, cols)
+ assert(p.getNextSlice("yo")[0] == "yo")
+ assert(p.getNextSlice("yo")[0] == " ")
+ assert(p.getNextSlice("yo")[0] == " ")
+ assert(p.getNextSlice("yo")[0] == " ")
+ assert(p.getNextSlice("yo")[0] == "")
+
diff --git a/Scripts/text_wrapping.py b/Scripts/text_wrapping.py
new file mode 100644
index 0000000..7576b78
--- /dev/null
+++ b/Scripts/text_wrapping.py
@@ -0,0 +1,55 @@
+#!/usr/bin/env python3
+
+class TextWrapper:
+ def __init__(self, rows, cols):
+ self.rows = rows
+ self.cols = cols
+
+ # Split `msg` along line boundaries. Long words tend to just go onto new
+ # lines. Words that are too long to fit on any line are hyphenated and
+ # split.
+ # Lines are padded with space (" ") characters so they're all `self.cols`
+ # characters long. Pages are padded with lines full of space characters so
+ # they're all `self.rows` lines long.
+ def wrap(self, msg: str) -> list[list[str]]:
+ pages = []
+ lines = []
+ line = ""
+ for word in msg.split():
+ if len(line) + 1 + len(word) <= self.cols:
+ if len(line):
+ line += " "
+ line += word
+ continue
+ # Word won't fit onto this line. End the line.
+ if len(line):
+ line += " " * (self.cols - len(line))
+ lines.append(line)
+ line = ""
+ while len(word) > self.cols:
+ prefix = word[0:self.cols-1] + "-"
+ lines.append(prefix)
+ suffix = word[self.cols-1:]
+ word = suffix
+ if len(word):
+ line = word
+ if len(line):
+ line += " " * (self.cols - len(line))
+ lines.append(line)
+ while len(lines):
+ pages.append(lines[0:self.rows])
+ lines = lines[self.rows:]
+ if len(pages):
+ num_extra_lines = (self.rows - (len(pages[-1]) % self.rows)) % self.rows
+ pages[-1] += [" " * self.cols] * num_extra_lines
+ return pages
+
+if __name__ == "__main__":
+ w = TextWrapper(2, 5)
+
+ assert(w.wrap("foo") == [["foo ", " "]])
+ assert(w.wrap("foo bar") == [["foo ", "bar "]])
+ assert(w.wrap("bagel") == [["bagel", " "]])
+ assert(w.wrap("bagels") == [["bage-", "ls "]])
+ assert(w.wrap("hot bagels") == [["hot ", "bage-"], ["ls ", " "]])
+
diff --git a/Scripts/transcribe.py b/Scripts/transcribe.py
index ba7d8fe..2729331 100644
--- a/Scripts/transcribe.py
+++ b/Scripts/transcribe.py
@@ -26,57 +26,58 @@ import wave
import whisper
class AudioState:
- CHUNK = 1024
- FORMAT = pyaudio.paInt16
- CHANNELS = 1
- # This matches the framerate expected by whisper.
- RATE = 16000
+ def __init__(self):
+ self.CHUNK = 1024
+ self.FORMAT = pyaudio.paInt16
+ self.CHANNELS = 1
+ # This matches the framerate expected by whisper.
+ self.RATE = 16000
- # The maximum length that recordAudio() will put into frames before it
- # starts dropping from the start.
- MAX_LENGTH_S = 10
- MAX_LENGTH_S_WHISPER = 30
- # The minimum length that recordAudio() will wait for before saving audio.
- MIN_LENGTH_S = 1
+ # The maximum length that recordAudio() will put into frames before it
+ # starts dropping from the start.
+ self.MAX_LENGTH_S = 10
+ self.MAX_LENGTH_S_WHISPER = 30
+ # The minimum length that recordAudio() will wait for before saving audio.
+ self.MIN_LENGTH_S = 1
- # PyAudio object
- p = None
+ # PyAudio object
+ self.p = None
- # PyAudio stream object
- stream = None
+ # PyAudio stream object
+ self.stream = None
- text = ""
- committed_text = ""
- frames = []
+ self.text = ""
+ self.committed_text = ""
+ self.frames = []
- # Locks access to `text`.
- transcribe_lock = threading.Lock()
+ # Locks access to `text`.
+ self.transcribe_lock = threading.Lock()
- # Locks access to `frames`, and audio stored on disk.
- audio_lock = threading.Lock()
+ # Locks access to `frames`, and audio stored on disk.
+ self.audio_lock = threading.Lock()
- # Used to tell the threads when to stop.
- run_app = True
+ # Used to tell the threads when to stop.
+ self.run_app = True
- transcribe_sleep_duration_min_s = 0.05
- transcribe_sleep_duration_max_s = 5.00
- transcribe_no_change_count = 0
- transcribe_sleep_duration = transcribe_sleep_duration_min_s
+ self.transcribe_sleep_duration_min_s = 0.05
+ self.transcribe_sleep_duration_max_s = 5.00
+ self.transcribe_no_change_count = 0
+ self.transcribe_sleep_duration = self.transcribe_sleep_duration_min_s
- tx_state = osc_ctrl.OscTxState()
+ # The transcription thread transcribes without holding locks, then
+ # blocks on it. Thus we need some way to tell the transcription
+ # thread to drop that transcription.
+ self.drop_transcription = False
- # The transcription thread transcribes without holding locks, then
- # blocks on it. Thus we need some way to tell the transcription
- # thread to drop that transcription.
- drop_transcription = False
+ # The language the user is speaking in. Default is English but user may set
+ # this to whatever they want.
+ self.language = whisper.tokenizer.TO_LANGUAGE_CODE["english"]
- # The language the user is speaking in. Default is English but user may set
- # this to whatever they want.
- language = whisper.tokenizer.TO_LANGUAGE_CODE["english"]
+ self.audio_paused = False
- audio_paused = False
-
- osc_client = osc_ctrl.getClient()
+ self.osc_state = osc_ctrl.OscState(generate_utils.config.CHARS_PER_SYNC,
+ generate_utils.config.BOARD_ROWS,
+ generate_utils.config.BOARD_COLS)
def sleepInterruptible(self, dur_s, stride_ms = 5):
dur_ms = dur_s * 1000.0
@@ -185,7 +186,7 @@ def resetAudioLocked(audio_state):
audio_state.text = ""
def resetDisplayLocked(audio_state):
- osc_ctrl.clear(audio_state.osc_client, audio_state.tx_state)
+ osc_ctrl.clear(audio_state.osc_state)
def resetAudio(audio_state):
audio_state.transcribe_lock.acquire()
@@ -292,10 +293,9 @@ def transcribeAudio(audio_state, model):
def sendAudio(audio_state):
while audio_state.run_app == True:
text = audio_state.committed_text + " " + audio_state.text
- ret = osc_ctrl.sendMessageLazy(audio_state.osc_client, text,
- audio_state.tx_state)
- is_paging = (ret == osc_ctrl.SEND_MSG_LAZY_SENT_NON_EMPTY)
- osc_ctrl.indicatePaging(audio_state.osc_client, is_paging)
+ ret = osc_ctrl.pageMessage(audio_state.osc_state, text)
+ is_paging = (ret == False)
+ osc_ctrl.indicatePaging(audio_state.osc_state.client, is_paging)
# Pace this out
time.sleep(0.01)
@@ -305,8 +305,8 @@ def readControllerInput(audio_state):
RECORD_STATE = 0
PAUSE_STATE = 1
state = PAUSE_STATE
- osc_ctrl.indicateSpeech(audio_state.osc_client, False)
- osc_ctrl.indicatePaging(audio_state.osc_client, False)
+ osc_ctrl.indicateSpeech(audio_state.osc_state.client, False)
+ osc_ctrl.indicatePaging(audio_state.osc_state.client, False)
last_rising = time.time()
while audio_state.run_app == True:
@@ -321,8 +321,8 @@ def readControllerInput(audio_state):
if now - last_rising > 0.5:
# Long hold
state = PAUSE_STATE
- osc_ctrl.indicateSpeech(audio_state.osc_client, False)
- osc_ctrl.toggleBoard(audio_state.osc_client, False)
+ osc_ctrl.indicateSpeech(audio_state.osc_state.client, False)
+ osc_ctrl.toggleBoard(audio_state.osc_state.client, False)
#playsound(os.path.abspath("../Sounds/Noise_Off.wav"))
resetAudioLocked(audio_state)
@@ -333,17 +333,17 @@ def readControllerInput(audio_state):
# Short hold
if state == RECORD_STATE:
state = PAUSE_STATE
- osc_ctrl.indicateSpeech(audio_state.osc_client, False)
- osc_ctrl.lockWorld(audio_state.osc_client, True)
+ osc_ctrl.indicateSpeech(audio_state.osc_state.client, False)
+ osc_ctrl.lockWorld(audio_state.osc_state.client, True)
audio_state.audio_paused = True
playsound(os.path.abspath("../Sounds/Noise_Off.wav"))
elif state == PAUSE_STATE:
state = RECORD_STATE
- osc_ctrl.indicateSpeech(audio_state.osc_client, True)
- osc_ctrl.toggleBoard(audio_state.osc_client, True)
- osc_ctrl.lockWorld(audio_state.osc_client, False)
+ osc_ctrl.indicateSpeech(audio_state.osc_state.client, True)
+ osc_ctrl.toggleBoard(audio_state.osc_state.client, True)
+ osc_ctrl.lockWorld(audio_state.osc_state.client, False)
resetAudioLocked(audio_state)
resetDisplayLocked(audio_state)
@@ -398,6 +398,8 @@ def transcribeLoop(mic: str, language: str, model: str):
controller_input_thd.join()
if __name__ == "__main__":
+ sys.stdout.reconfigure(encoding="utf-8")
+
# Set cwd to the directory holding the script
abspath = os.path.abspath(__file__)
dname = os.path.dirname(abspath)