diff options
Diffstat (limited to 'Scripts/osc_ctrl.py')
| -rw-r--r-- | Scripts/osc_ctrl.py | 375 |
1 files changed, 72 insertions, 303 deletions
diff --git a/Scripts/osc_ctrl.py b/Scripts/osc_ctrl.py index d4c8bf3..3b25778 100644 --- a/Scripts/osc_ctrl.py +++ b/Scripts/osc_ctrl.py @@ -1,40 +1,20 @@ #!/usr/bin/env python3 import argparse -import random -import time -import fileinput +from generate_utils import config import generate_utils - -# python3 -m pip install python-osc -# License: public domain. +from paging import MultiLinePager from pythonosc import udp_client - -from math import ceil -from math import floor -from generate_utils import getSelectParam -from generate_utils import getEnableParam -from generate_utils import config - -import emotes +import time # Based on a couple experiments, this seems like about as fast as we can go # before players start losing events. SYNC_FREQ_HZ = 5.0 SYNC_DELAY_S = 1.0 / SYNC_FREQ_HZ -def usage(): - print("python3 -m pip install python-osc") - print("python3 ./osc_ctrl.py") - def getClient(ip = "127.0.0.1", port = 9000): return udp_client.SimpleUDPClient(ip, port) -class EvilGlobalState(): - # Mapping from ascii char to encoded byte. - encoding = {} -state = EvilGlobalState() - # The characters in the TaSTT are all numbered from top left to bottom right. # This function provides a mapping from letter ('a') to index (26). def generateEncoding(): @@ -42,304 +22,103 @@ def generateEncoding(): for i in range(0, 65535): encoding[chr(i)] = (i % 256, int(i / 256)) return encoding -state.encoding = generateEncoding() -# Encodes a list of lines into the character set used by the board. -# Pads lines with spaces and adds lines so that the total number of -# lines sent is a multiple of the number of rows in the board. -def encodeMessage(lines): - result = [] - lines_tmp = lines + [" "] * ((config.BOARD_ROWS - len(lines)) % config.BOARD_ROWS) - for line in lines_tmp: - first_word = True - for word in line.split(): - if first_word: - first_word = False - else: - result.append(state.encoding[' ']) +class OscState: + def __init__(self, chars_per_sync: int, rows: int, cols: int, + ip = "127.0.0.1", port = 9000): + self.client = getClient(ip, port) + self.pager = MultiLinePager(chars_per_sync, rows, cols) + self.encoding= generateEncoding() - emote_word, emote_word_idx = emotes.lookup(word) - if emote_word: + def reset(self): + self.pager.reset() - word_align = 0 - if len(result) > 0: - word_align = (6 - len(result) % 6) % 6 - word = ' ' * word_align + word +def encodeMessage(encoding, msg): + encoded = [] + for char in msg: + encoded.append(encoding[char]) + return encoded - for i in range(0, word_align): - result.append(state.encoding[' ']) +def lockWorld(client, lock: bool): + addr = "/avatar/parameters/" + generate_utils.getLockWorldParam() + client.send_message(addr, lock) - for i in range(0, 6): - result.append((emote_word_idx, 0xE0)) - continue +def toggleBoard(client, show: bool): + addr = "/avatar/parameters/" + generate_utils.getToggleParam() + client.send_message(addr, show) - for char in word: - if not char in state.encoding: - print("skip unrecognized char {}".format(char)) - continue - result.append(state.encoding[char]) - result += [state.encoding[' ']] * (config.BOARD_COLS - len(line)) - return result +def indicateSpeech(client, is_speaking: bool): + addr = "/avatar/parameters/" + generate_utils.getIndicator0Param() + client.send_message(addr, is_speaking) -def updateCell(client, cell_idx, letter_encoded): - for byte in range(0, generate_utils.config.BYTES_PER_CHAR): - addr="/avatar/parameters/" + generate_utils.getBlendParam(cell_idx, byte) - letter_remapped = (-127.5 + letter_encoded[byte]) / 127.5 - client.send_message(addr, letter_remapped) +def indicatePaging(client, is_paging: bool): + addr = "/avatar/parameters/" + generate_utils.getIndicator1Param() + client.send_message(addr, is_paging) def enable(client): - addr="/avatar/parameters/" + getEnableParam() + addr="/avatar/parameters/" + generate_utils.getEnableParam() client.send_message(addr, True) def disable(client): - addr="/avatar/parameters/" + getEnableParam() + addr="/avatar/parameters/" + generate_utils.getEnableParam() client.send_message(addr, False) -# Send a cell all at once. -# `which_cell` is an integer in the range [0,NUM_REGIONS) -def sendMessageCellDiscrete(client, msg_cell, which_cell): - empty_cell = [state.encoding[' ']] * generate_utils.config.CHARS_PER_SYNC +def clear(osc_state: OscState): + disable(osc_state.client) - if msg_cell != empty_cell: - addr="/avatar/parameters/" + generate_utils.getSpeechNoiseToggleParam() - client.send_message(addr, True) - - # Really long messages just wrap back around. - which_cell = (which_cell % generate_utils.config.numRegions(config.CHARS_PER_SYNC - 1)) - - enable(client) - - # Seek to the current cell. - addr="/avatar/parameters/" + getSelectParam() - client.send_message(addr, which_cell) - - # Update each letter - for i in range(0, len(msg_cell)): - updateCell(client, i, msg_cell[i]) + addr="/avatar/parameters/" + generate_utils.getClearBoardParam() + osc_state.client.send_message(addr, True) - # Wait for sync. time.sleep(SYNC_DELAY_S) - if msg_cell != empty_cell: - addr="/avatar/parameters/" + generate_utils.getSpeechNoiseToggleParam() - client.send_message(addr, False) - -# The board is broken down into contiguous collections of characters called -# cells. Each cell contains `CHARS_PER_SYNC` characters. We can update one cell -# every ~1.0 seconds. Going faster causes the board to display garbage to -# remote players. -def splitMessage(msg): - lines = [] - line = "" - for word in msg.split(): - # Hack: if the word is an emote, we make it 6 characters wide, then - # encode it differently later on. - emote_word, emote_word_idx = emotes.lookup(word) - if emote_word: - word = word.ljust(6) - # Due to some fuckery I do in the shader, emotes have to be rendered on - # a 6-character boundary. So pad the word on the left with spaces - # to get it onto that boundary. - word_align = 0 - if len(line) > 0: - word_align = (6 - (len(line) + 1) % 6) % 6 - print("len line: {}".format(len(line))) - print("word align: {}".format(word_align)) - word = ' ' * word_align + word - - while len(word) > config.BOARD_COLS: - if len(line) != 0: - lines.append(line) - line = "" - - word_prefix = word[0:config.BOARD_COLS-1] + "-" - word_suffix = word[config.BOARD_COLS-1:] - #print("append prefix {}".format(word_prefix)) - lines.append(word_prefix) - word = word_suffix - - if len(line) == 0: - line = word - continue - - if len(line) + len(" ") + len(word) <= config.BOARD_COLS: - line += " " + word - continue - - #print("append line {}".format(line)) - lines.append(line) - line = word - - if len(line) > 0: - lines.append(line) - - return lines - -class OscTxState: - # The message last sent to the board. - last_msg_encoded = [] - empty_cells_to_send_per_call = 1 - nonempty_cells_to_send_per_call = 1 - - # 0 indicates it's closed. 1 indicates half size. 2 indicates full size. - board_size = 0 - -def resizeBoard(num_lines, tx_state, shrink_only): - - resize_params = [] - - resize_param0 = None - resize_param1 = None - - if num_lines > config.BOARD_ROWS / 2: - # Board must be expanded to full size. - if shrink_only: - return - - if tx_state.board_size == 2: - return - elif tx_state.board_size == 1: - resize_params.append((False, True)) - else: - resize_params.append((False, False)) - resize_params.append((False, True)) - tx_state.board_size = 2 - elif num_lines == 0: - if not shrink_only: - return - # Board must be shrunk to 0 size - if tx_state.board_size == 0: - return - elif tx_state.board_size == 1: - resize_params.append((True, True)) - else: - resize_params.append((True, False)) - resize_params.append((True, True)) - tx_state.board_size = 0 - else: - # Board must be expanded or shrunk to half size. - if tx_state.board_size == 0: - if shrink_only: - return - resize_params.append((False, False)) - elif tx_state.board_size == 1: - return - else: - if not shrink_only: - return - resize_params.append((True, False)) - tx_state.board_size = 1 - - for resize_param_pair in resize_params: - print("Resizing board... "), - addr="/avatar/parameters/" + generate_utils.getResize0Param() - client.send_message(addr, resize_param_pair[0]) - addr="/avatar/parameters/" + generate_utils.getResize1Param() - client.send_message(addr, resize_param_pair[1]) - - time.sleep(0.25) - - addr="/avatar/parameters/" + generate_utils.getResizeEnableParam() - client.send_message(addr, True) - - # The animation is 0.5 seconds, with another 0.5 second buffer after. We - # want to stop in that buffer. - time.sleep(0.5) - - addr="/avatar/parameters/" + generate_utils.getResizeEnableParam() - client.send_message(addr, False) - - # Wait a while for the animation to complete. - time.sleep(1) - print("done") - - -# Send a message to the board, but only overwrite cells that we know need to -# change. -# This may take multiple calls to complete. -# Returns 3 possible values: -# 0: Done sending. -# 1: Exhausted empty cell budget. -# 2: Exhausted nonempty cell budget. -SEND_MSG_LAZY_DONE = 0 -SEND_MSG_LAZY_SENT_EMPTY = 1 -SEND_MSG_LAZY_SENT_NON_EMPTY = 2 -def sendMessageLazy(client, msg, tx_state): - lines = splitMessage(msg) - msg_encoded = encodeMessage(lines) - msg_encoded_len = len(msg_encoded) - - empty_cells_sent = 0 - nonempty_cells_sent = 0 - n_cells = floor(msg_encoded_len / config.CHARS_PER_SYNC) - for cell in range(0, n_cells): - cell_begin = cell * config.CHARS_PER_SYNC - cell_end = (cell + 1) * config.CHARS_PER_SYNC - cell_msg = msg_encoded[cell_begin:cell_end] - last_cell_msg = [] + addr="/avatar/parameters/" + generate_utils.getClearBoardParam() + osc_state.client.send_message(addr, False) - # Skip cells we've already sent. This makes the board much more - # responsive. - if cell_end <= len(tx_state.last_msg_encoded): - last_cell_msg = tx_state.last_msg_encoded[cell_begin:cell_end] - if cell_msg == last_cell_msg: - continue + osc_state.reset() - if cell_msg == [state.encoding[' ']] * config.CHARS_PER_SYNC: - if empty_cells_sent >= tx_state.empty_cells_to_send_per_call: - return SEND_MSG_LAZY_SENT_EMPTY - empty_cells_sent += 1 - else: - if nonempty_cells_sent >= tx_state.nonempty_cells_to_send_per_call: - return SEND_MSG_LAZY_SENT_NON_EMPTY - nonempty_cells_sent += 1 +def updateRegion(client, region_idx, letter_encoded): + for byte in range(0, generate_utils.config.BYTES_PER_CHAR): + addr="/avatar/parameters/" + generate_utils.getBlendParam(region_idx, byte) + letter_remapped = (-127.5 + letter_encoded[byte]) / 127.5 + client.send_message(addr, letter_remapped) - sendMessageCellDiscrete(client, cell_msg, cell) - # Pad last msg encoded to the end of the array - tx_state.last_msg_encoded += [state.encoding[" "]] * (cell_end - - len(tx_state.last_msg_encoded)) - tx_state.last_msg_encoded[cell_begin:cell_end] = cell_msg +# Sends one slice of `msg` to the board then returns. Slices are sent +# in FIFO order; e.g., the most recently spoken words are sent last. +# Returns True if done paging, False otherwise. +def pageMessage(osc_state: OscState, msg: str) -> bool: + msg_slice, slice_idx = osc_state.pager.getNextSlice(msg) + if slice_idx == -1: + return True + print("sending page {}: {} ({})".format(slice_idx, msg_slice, + len(msg_slice))) + + empty_slice = " " * len(msg_slice) + if msg_slice != empty_slice: + addr="/avatar/parameters/" + generate_utils.getSpeechNoiseToggleParam() + osc_state.client.send_message(addr, True) - #resizeBoard(len(lines), tx_state, shrink_only=True) - return SEND_MSG_LAZY_DONE + # Really long messages just wrap back around. + which_region = (slice_idx % generate_utils.config.numRegions(0)) + print("send to region {}".format(which_region)) -def sendRawMessage(client, msg): - n_cells = ceil(len(msg) / config.CHARS_PER_SYNC) - for cell in range(0, n_cells): - cell_begin = cell * config.CHARS_PER_SYNC - cell_end = (cell + 1) * config.CHARS_PER_SYNC - cell_msg = msg[cell_begin:cell_end] - #print("Send cell {}".format(cell)) - sendMessageCellDiscrete(client, cell_msg, cell) + enable(osc_state.client) -def clear(client, tx_state): - disable(client) + # Seek to the current region. + addr="/avatar/parameters/" + generate_utils.getSelectParam() + osc_state.client.send_message(addr, which_region) - addr="/avatar/parameters/" + generate_utils.getClearBoardParam() - client.send_message(addr, True) + # Update each letter. + encoded = encodeMessage(osc_state.encoding, msg_slice) + print("len encoded: {}".format(len(encoded))) + for i in range(0, len(msg_slice)): + updateRegion(osc_state.client, i, encoded[i]) + # Wait for parameter sync. time.sleep(SYNC_DELAY_S) - addr="/avatar/parameters/" + generate_utils.getClearBoardParam() - client.send_message(addr, False) - - tx_state.last_msg_encoded = [] - -def lockWorld(client, lock: bool): - addr = "/avatar/parameters/" + generate_utils.getLockWorldParam() - client.send_message(addr, lock) - -def toggleBoard(client, show: bool): - addr = "/avatar/parameters/" + generate_utils.getToggleParam() - client.send_message(addr, show) - -def indicateSpeech(client, is_speaking: bool): - addr = "/avatar/parameters/" + generate_utils.getIndicator0Param() - client.send_message(addr, is_speaking) - -def indicatePaging(client, is_paging: bool): - addr = "/avatar/parameters/" + generate_utils.getIndicator1Param() - client.send_message(addr, is_paging) + if msg_slice != empty_slice: + addr="/avatar/parameters/" + generate_utils.getSpeechNoiseToggleParam() + osc_state.client.send_message(addr, False) if __name__ == "__main__": parser = argparse.ArgumentParser() @@ -347,13 +126,3 @@ if __name__ == "__main__": parser.add_argument("-p", type=int, default=9000, help="OSC server port") args = parser.parse_args() - client = getClient(args.i, args.p) - - state.encoding = generateEncoding() - - tx_state = OscTxState() - for line in fileinput.input(): - while sendMessageLazy(client, line, tx_state) != SEND_MSG_LAZY_DONE: - continue - clear(client, tx_state) - |
