1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
#!/usr/bin/env python3
import argparse
from generate_utils import config
import generate_utils
from paging import MultiLinePager
from pythonosc import udp_client
import time
# Based on a couple experiments, this seems like about as fast as we can go
# before players start losing events.
SYNC_FREQ_HZ = 5.0
SYNC_DELAY_S = 1.0 / SYNC_FREQ_HZ
def getClient(ip = "127.0.0.1", port = 9000):
return udp_client.SimpleUDPClient(ip, port)
# The characters in the TaSTT are all numbered from top left to bottom right.
# This function provides a mapping from letter ('a') to index (26).
def generateEncoding():
encoding = {}
for i in range(0, 65535):
encoding[chr(i)] = (i % 256, int(i / 256))
return encoding
class OscState:
def __init__(self, chars_per_sync: int, rows: int, cols: int,
ip = "127.0.0.1", port = 9000):
self.client = getClient(ip, port)
self.pager = MultiLinePager(chars_per_sync, rows, cols)
self.encoding= generateEncoding()
def reset(self):
self.pager.reset()
def encodeMessage(encoding, msg):
encoded = []
for char in msg:
encoded.append(encoding[char])
return encoded
def lockWorld(client, lock: bool):
addr = "/avatar/parameters/" + generate_utils.getLockWorldParam()
client.send_message(addr, lock)
def toggleBoard(client, show: bool):
addr = "/avatar/parameters/" + generate_utils.getToggleParam()
client.send_message(addr, show)
def indicateSpeech(client, is_speaking: bool):
addr = "/avatar/parameters/" + generate_utils.getIndicator0Param()
client.send_message(addr, is_speaking)
def indicatePaging(client, is_paging: bool):
addr = "/avatar/parameters/" + generate_utils.getIndicator1Param()
client.send_message(addr, is_paging)
def enable(client):
addr="/avatar/parameters/" + generate_utils.getEnableParam()
client.send_message(addr, True)
def disable(client):
addr="/avatar/parameters/" + generate_utils.getEnableParam()
client.send_message(addr, False)
def clear(osc_state: OscState):
disable(osc_state.client)
addr="/avatar/parameters/" + generate_utils.getClearBoardParam()
osc_state.client.send_message(addr, True)
time.sleep(SYNC_DELAY_S)
addr="/avatar/parameters/" + generate_utils.getClearBoardParam()
osc_state.client.send_message(addr, False)
osc_state.reset()
def updateRegion(client, region_idx, letter_encoded):
for byte in range(0, generate_utils.config.BYTES_PER_CHAR):
addr="/avatar/parameters/" + generate_utils.getBlendParam(region_idx, byte)
letter_remapped = (-127.5 + letter_encoded[byte]) / 127.5
client.send_message(addr, letter_remapped)
# Sends one slice of `msg` to the board then returns. Slices are sent
# in FIFO order; e.g., the most recently spoken words are sent last.
# Returns True if done paging, False otherwise.
def pageMessage(osc_state: OscState, msg: str) -> bool:
msg_slice, slice_idx = osc_state.pager.getNextSlice(msg)
if slice_idx == -1:
return True
print("sending page {}: {} ({})".format(slice_idx, msg_slice,
len(msg_slice)))
empty_slice = " " * len(msg_slice)
if msg_slice != empty_slice:
addr="/avatar/parameters/" + generate_utils.getSpeechNoiseToggleParam()
osc_state.client.send_message(addr, True)
# Really long messages just wrap back around.
which_region = (slice_idx % generate_utils.config.numRegions(0))
print("send to region {}".format(which_region))
enable(osc_state.client)
# Seek to the current region.
addr="/avatar/parameters/" + generate_utils.getSelectParam()
osc_state.client.send_message(addr, which_region)
# Update each letter.
encoded = encodeMessage(osc_state.encoding, msg_slice)
print("len encoded: {}".format(len(encoded)))
for i in range(0, len(msg_slice)):
updateRegion(osc_state.client, i, encoded[i])
# Wait for parameter sync.
time.sleep(SYNC_DELAY_S)
if msg_slice != empty_slice:
addr="/avatar/parameters/" + generate_utils.getSpeechNoiseToggleParam()
osc_state.client.send_message(addr, False)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-i", default="127.0.0.1", help="OSC server IP")
parser.add_argument("-p", type=int, default=9000, help="OSC server port")
args = parser.parse_args()
|