summaryrefslogtreecommitdiffstats
path: root/Scripts/osc_ctrl.py
blob: 3b257782f54afb397f5b9cdf569e49e6dc51aea7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#!/usr/bin/env python3

import argparse
from generate_utils import config
import generate_utils
from paging import MultiLinePager
from pythonosc import udp_client
import time

# Based on a couple experiments, this seems like about as fast as we can go
# before players start losing events.
SYNC_FREQ_HZ = 5.0
SYNC_DELAY_S = 1.0 / SYNC_FREQ_HZ

def getClient(ip = "127.0.0.1", port = 9000):
    return udp_client.SimpleUDPClient(ip, port)

# The characters in the TaSTT are all numbered from top left to bottom right.
# This function provides a mapping from letter ('a') to index (26).
def generateEncoding():
    encoding = {}
    for i in range(0, 65535):
        encoding[chr(i)] = (i % 256, int(i / 256))
    return encoding

class OscState:
    def __init__(self, chars_per_sync: int, rows: int, cols: int,
            ip = "127.0.0.1", port = 9000):
        self.client = getClient(ip, port)
        self.pager = MultiLinePager(chars_per_sync, rows, cols)
        self.encoding= generateEncoding()

    def reset(self):
        self.pager.reset()

def encodeMessage(encoding, msg):
    encoded = []
    for char in msg:
        encoded.append(encoding[char])
    return encoded

def lockWorld(client, lock: bool):
    addr = "/avatar/parameters/" + generate_utils.getLockWorldParam()
    client.send_message(addr, lock)

def toggleBoard(client, show: bool):
    addr = "/avatar/parameters/" + generate_utils.getToggleParam()
    client.send_message(addr, show)

def indicateSpeech(client, is_speaking: bool):
    addr = "/avatar/parameters/" + generate_utils.getIndicator0Param()
    client.send_message(addr, is_speaking)

def indicatePaging(client, is_paging: bool):
    addr = "/avatar/parameters/" + generate_utils.getIndicator1Param()
    client.send_message(addr, is_paging)

def enable(client):
    addr="/avatar/parameters/" + generate_utils.getEnableParam()
    client.send_message(addr, True)

def disable(client):
    addr="/avatar/parameters/" + generate_utils.getEnableParam()
    client.send_message(addr, False)

def clear(osc_state: OscState):
    disable(osc_state.client)

    addr="/avatar/parameters/" + generate_utils.getClearBoardParam()
    osc_state.client.send_message(addr, True)

    time.sleep(SYNC_DELAY_S)

    addr="/avatar/parameters/" + generate_utils.getClearBoardParam()
    osc_state.client.send_message(addr, False)

    osc_state.reset()

def updateRegion(client, region_idx, letter_encoded):
    for byte in range(0, generate_utils.config.BYTES_PER_CHAR):
        addr="/avatar/parameters/" + generate_utils.getBlendParam(region_idx, byte)
        letter_remapped = (-127.5 + letter_encoded[byte]) / 127.5
        client.send_message(addr, letter_remapped)

# Sends one slice of `msg` to the board then returns. Slices are sent
# in FIFO order; e.g., the most recently spoken words are sent last.
# Returns True if done paging, False otherwise.
def pageMessage(osc_state: OscState, msg: str) -> bool:
    msg_slice, slice_idx = osc_state.pager.getNextSlice(msg)
    if slice_idx == -1:
        return True
    print("sending page {}: {} ({})".format(slice_idx, msg_slice,
        len(msg_slice)))

    empty_slice = " " * len(msg_slice)
    if msg_slice != empty_slice:
        addr="/avatar/parameters/" + generate_utils.getSpeechNoiseToggleParam()
        osc_state.client.send_message(addr, True)

    # Really long messages just wrap back around.
    which_region = (slice_idx % generate_utils.config.numRegions(0))
    print("send to region {}".format(which_region))

    enable(osc_state.client)

    # Seek to the current region.
    addr="/avatar/parameters/" + generate_utils.getSelectParam()
    osc_state.client.send_message(addr, which_region)

    # Update each letter.
    encoded = encodeMessage(osc_state.encoding, msg_slice)
    print("len encoded: {}".format(len(encoded)))
    for i in range(0, len(msg_slice)):
        updateRegion(osc_state.client, i, encoded[i])

    # Wait for parameter sync.
    time.sleep(SYNC_DELAY_S)

    if msg_slice != empty_slice:
        addr="/avatar/parameters/" + generate_utils.getSpeechNoiseToggleParam()
        osc_state.client.send_message(addr, False)

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("-i", default="127.0.0.1", help="OSC server IP")
    parser.add_argument("-p", type=int, default=9000, help="OSC server port")
    args = parser.parse_args()