diff options
Diffstat (limited to 'Scripts/osc_ctrl.py')
| -rw-r--r-- | Scripts/osc_ctrl.py | 355 |
1 files changed, 355 insertions, 0 deletions
diff --git a/Scripts/osc_ctrl.py b/Scripts/osc_ctrl.py new file mode 100644 index 0000000..34d1a36 --- /dev/null +++ b/Scripts/osc_ctrl.py @@ -0,0 +1,355 @@ +#!/usr/bin/env python3 + +import argparse +import random +import time +import fileinput +import generate_utils + +# python3 -m pip install python-osc +# License: public domain. +from pythonosc import udp_client + +from math import ceil +from math import floor +from generate_utils import getLayerParam +from generate_utils import getSelectParam +from generate_utils import getEnableParam +from generate_utils import getBoardIndex +from generate_utils import NUM_LAYERS +from generate_utils import BOARD_ROWS +from generate_utils import BOARD_COLS + +import emotes + +# Based on a couple experiments, this seems like about as fast as we can go +# before players start losing events. +SYNC_FREQ_HZ = 5.0 +SYNC_DELAY_S = 1.0 / SYNC_FREQ_HZ + +def usage(): + print("python3 -m pip install python-osc") + print("python3 ./osc_ctrl.py") + +def getClient(ip = "127.0.0.1", port = 9000): + return udp_client.SimpleUDPClient(ip, port) + +class EvilGlobalState(): + # Mapping from ascii char to encoded byte. + encoding = {} +state = EvilGlobalState() + +# The characters in the TaSTT are all numbered from top left to bottom right. +# This function provides a mapping from letter ('a') to index (26). +def generateEncoding(): + encoding = {} + for i in range(0, 65535): + encoding[chr(i)] = (i % 256, int(i / 256)) + return encoding +state.encoding = generateEncoding() + +# Encodes a list of lines into the character set used by the board. +# Pads lines with spaces and adds lines so that the total number of +# lines sent is a multiple of the number of rows in the board. +def encodeMessage(lines): + result = [] + lines_tmp = lines + [" "] * ((BOARD_ROWS - len(lines)) % BOARD_ROWS) + for line in lines_tmp: + first_word = True + for word in line.split(): + if first_word: + first_word = False + else: + result.append(state.encoding[' ']) + + emote_word, emote_word_idx = emotes.lookup(word) + if emote_word: + + word_align = 0 + if len(result) > 0: + word_align = (6 - len(result) % 6) % 6 + word = ' ' * word_align + word + + for i in range(0, word_align): + result.append(state.encoding[' ']) + + for i in range(0, 6): + result.append((emote_word_idx, 0xE0)) + continue + + for char in word: + if not char in state.encoding: + print("skip unrecognized char {}".format(char)) + continue + result.append(state.encoding[char]) + result += [state.encoding[' ']] * (BOARD_COLS - len(line)) + return result + +def updateCell(client, cell_idx, letter_encoded): + for byte in range(0, generate_utils.BYTES_PER_CHAR): + addr="/avatar/parameters/" + generate_utils.getBlendParam(cell_idx, byte) + letter_remapped = (-127.5 + letter_encoded[byte]) / 127.5 + client.send_message(addr, letter_remapped) + +def enable(client): + addr="/avatar/parameters/" + getEnableParam() + client.send_message(addr, True) + +def disable(client): + addr="/avatar/parameters/" + getEnableParam() + client.send_message(addr, False) + +# Send a cell all at once. +# `which_cell` is an integer in the range [0,NUM_REGIONS) +def sendMessageCellDiscrete(client, msg_cell, which_cell): + empty_cell = [state.encoding[' ']] * NUM_LAYERS + + if msg_cell != empty_cell: + addr="/avatar/parameters/" + generate_utils.getSpeechNoiseToggleParam() + client.send_message(addr, True) + + # Really long messages just wrap back around. + which_cell = (which_cell % generate_utils.NUM_REGIONS) + + enable(client) + + # Seek to the current cell. + addr="/avatar/parameters/" + getSelectParam() + client.send_message(addr, which_cell) + + # Update each letter + for i in range(0, len(msg_cell)): + updateCell(client, i, msg_cell[i]) + + # Wait for sync. + time.sleep(SYNC_DELAY_S) + + if msg_cell != empty_cell: + addr="/avatar/parameters/" + generate_utils.getSpeechNoiseToggleParam() + client.send_message(addr, False) + +# The board is broken down into contiguous collections of characters called +# cells. Each cell contains `NUM_LAYERS` characters. We can update one cell +# every ~1.0 seconds. Going faster causes the board to display garbage to +# remote players. +def splitMessage(msg): + lines = [] + line = "" + for word in msg.split(): + # Hack: if the word is an emote, we make it 6 characters wide, then + # encode it differently later on. + emote_word, emote_word_idx = emotes.lookup(word) + if emote_word: + word = word.ljust(6) + # Due to some fuckery I do in the shader, emotes have to be rendered on + # a 6-character boundary. So pad the word on the left with spaces + # to get it onto that boundary. + word_align = 0 + if len(line) > 0: + word_align = (6 - (len(line) + 1) % 6) % 6 + print("len line: {}".format(len(line))) + print("word align: {}".format(word_align)) + word = ' ' * word_align + word + + while len(word) > BOARD_COLS: + if len(line) != 0: + lines.append(line) + line = "" + + word_prefix = word[0:BOARD_COLS-1] + "-" + word_suffix = word[BOARD_COLS-1:] + #print("append prefix {}".format(word_prefix)) + lines.append(word_prefix) + word = word_suffix + + if len(line) == 0: + line = word + continue + + if len(line) + len(" ") + len(word) <= BOARD_COLS: + line += " " + word + continue + + #print("append line {}".format(line)) + lines.append(line) + line = word + + if len(line) > 0: + lines.append(line) + + return lines + +class OscTxState: + # The message last sent to the board. + last_msg_encoded = [] + empty_cells_to_send_per_call = 1 + nonempty_cells_to_send_per_call = 1 + + # 0 indicates it's closed. 1 indicates half size. 2 indicates full size. + board_size = 0 + +def resizeBoard(num_lines, tx_state, shrink_only): + + resize_params = [] + + resize_param0 = None + resize_param1 = None + + if num_lines > BOARD_ROWS / 2: + # Board must be expanded to full size. + if shrink_only: + return + + if tx_state.board_size == 2: + return + elif tx_state.board_size == 1: + resize_params.append((False, True)) + else: + resize_params.append((False, False)) + resize_params.append((False, True)) + tx_state.board_size = 2 + elif num_lines == 0: + if not shrink_only: + return + # Board must be shrunk to 0 size + if tx_state.board_size == 0: + return + elif tx_state.board_size == 1: + resize_params.append((True, True)) + else: + resize_params.append((True, False)) + resize_params.append((True, True)) + tx_state.board_size = 0 + else: + # Board must be expanded or shrunk to half size. + if tx_state.board_size == 0: + if shrink_only: + return + resize_params.append((False, False)) + elif tx_state.board_size == 1: + return + else: + if not shrink_only: + return + resize_params.append((True, False)) + tx_state.board_size = 1 + + for resize_param_pair in resize_params: + print("Resizing board... "), + addr="/avatar/parameters/" + generate_utils.getResize0Param() + client.send_message(addr, resize_param_pair[0]) + addr="/avatar/parameters/" + generate_utils.getResize1Param() + client.send_message(addr, resize_param_pair[1]) + + time.sleep(0.25) + + addr="/avatar/parameters/" + generate_utils.getResizeEnableParam() + client.send_message(addr, True) + + # The animation is 0.5 seconds, with another 0.5 second buffer after. We + # want to stop in that buffer. + time.sleep(0.5) + + addr="/avatar/parameters/" + generate_utils.getResizeEnableParam() + client.send_message(addr, False) + + # Wait a while for the animation to complete. + time.sleep(1) + print("done") + + +# Send a message to the board, but only overwrite cells that we know need to +# change. +# This may take multiple calls to complete. +# Returns 3 possible values: +# 0: Done sending. +# 1: Exhausted empty cell budget. +# 2: Exhausted nonempty cell budget. +SEND_MSG_LAZY_DONE = 0 +SEND_MSG_LAZY_SENT_EMPTY = 1 +SEND_MSG_LAZY_SENT_NON_EMPTY = 2 +def sendMessageLazy(client, msg, tx_state): + lines = splitMessage(msg) + msg_encoded = encodeMessage(lines) + msg_encoded_len = len(msg_encoded) + + empty_cells_sent = 0 + nonempty_cells_sent = 0 + n_cells = ceil(msg_encoded_len / NUM_LAYERS) + for cell in range(0, n_cells): + cell_begin = cell * NUM_LAYERS + cell_end = (cell + 1) * NUM_LAYERS + cell_msg = msg_encoded[cell_begin:cell_end] + last_cell_msg = [] + + # Skip cells we've already sent. This makes the board much more + # responsive. + if cell_end <= len(tx_state.last_msg_encoded): + last_cell_msg = tx_state.last_msg_encoded[cell_begin:cell_end] + if cell_msg == last_cell_msg: + continue + + if cell_msg == [state.encoding[' ']] * NUM_LAYERS: + if empty_cells_sent >= tx_state.empty_cells_to_send_per_call: + return SEND_MSG_LAZY_SENT_EMPTY + empty_cells_sent += 1 + else: + if nonempty_cells_sent >= tx_state.nonempty_cells_to_send_per_call: + return SEND_MSG_LAZY_SENT_NON_EMPTY + nonempty_cells_sent += 1 + + sendMessageCellDiscrete(client, cell_msg, cell) + # Pad last msg encoded to the end of the array + tx_state.last_msg_encoded += [state.encoding[" "]] * (cell_end - + len(tx_state.last_msg_encoded)) + tx_state.last_msg_encoded[cell_begin:cell_end] = cell_msg + + #resizeBoard(len(lines), tx_state, shrink_only=True) + return SEND_MSG_LAZY_DONE + +def sendRawMessage(client, msg): + n_cells = ceil(len(msg) / NUM_LAYERS) + for cell in range(0, n_cells): + cell_begin = cell * NUM_LAYERS + cell_end = (cell + 1) * NUM_LAYERS + cell_msg = msg[cell_begin:cell_end] + #print("Send cell {}".format(cell)) + sendMessageCellDiscrete(client, cell_msg, cell) + +def clear(client, tx_state): + disable(client) + + addr="/avatar/parameters/" + generate_utils.getClearBoardParam() + client.send_message(addr, True) + + time.sleep(SYNC_DELAY_S) + + addr="/avatar/parameters/" + generate_utils.getClearBoardParam() + client.send_message(addr, False) + + tx_state.last_msg_encoded = [] + +def indicateSpeech(client, is_speaking: bool): + addr = "/avatar/parameters/" + generate_utils.getIndicator0Param() + client.send_message(addr, is_speaking) + +def indicatePaging(client, is_paging: bool): + addr = "/avatar/parameters/" + generate_utils.getIndicator1Param() + client.send_message(addr, is_paging) + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("-i", default="127.0.0.1", help="OSC server IP") + parser.add_argument("-p", type=int, default=9000, help="OSC server port") + args = parser.parse_args() + + client = getClient(args.i, args.p) + + state.encoding = generateEncoding() + + tx_state = OscTxState() + for line in fileinput.input(): + while sendMessageLazy(client, line, tx_state) != SEND_MSG_LAZY_DONE: + continue + clear(client, tx_state) + |
