diff options
Diffstat (limited to 'Scripts')
| -rw-r--r-- | Scripts/string_matcher.py | 165 | ||||
| -rw-r--r-- | Scripts/transcribe.py | 66 |
2 files changed, 55 insertions, 176 deletions
diff --git a/Scripts/string_matcher.py b/Scripts/string_matcher.py deleted file mode 100644 index a56308a..0000000 --- a/Scripts/string_matcher.py +++ /dev/null @@ -1,165 +0,0 @@ -#!/usr/bin/env python3 - -# python3 -m pip install editdistance -# License: MIT. -import editdistance - -import typing - -DEBUG = False - -# Find the window where the distance between these two transcriptions is -# minimized and use it to stitch them together. -def matchStringList(old_words: typing.List[str], - new_words: typing.List[str], window_size = 6) -> str: - if old_words == new_words: - return " ".join(old_words) - elif len(old_words) >= window_size and len(new_words) >= window_size: - # Find the window where the cumulative string distance - # between the words in that window in the old/new transcription - # is minimized. - old_slice = old_words[len(old_words) - window_size:] - - best_match_i = None - best_match_d = window_size * 1000 - - for i in range(0, 1 + len(new_words) - window_size): - new_slice = new_words[i:i + window_size] - cur_d = 0 - for j in range(0, window_size): - cur_d += editdistance.eval(old_slice[j], new_slice[j]) - if cur_d < best_match_d: - best_match_i = i - best_match_d = cur_d - - old_prefix = old_words[0:len(old_words) - window_size] - overlap = new_words[best_match_i:best_match_i + window_size] - new_suffix = new_words[best_match_i + window_size:] - - #print("Best match i: {}".format(best_match_i)) - #print("Window size: {}".format(window_size)) - #print("Old prefix: {}".format(old_prefix)) - #print("Overlap: {}".format(overlap)) - #print("New suffix: {}".format(new_suffix)) - return " ".join(old_prefix + new_words[best_match_i:]) - else: - return " ".join(new_words) - -def matchSpaceDelimitedStrings(old_text: str, new_text: str, window_size = 4) -> str: - old_words = old_text.split() - new_words = new_text.split() - return matchStringList(old_words, new_words, window_size) - -def matchStrings(old_text: str, new_text: str, window_size = 3) -> str: - if old_text == new_text: - if DEBUG: - print("STRING MATCH exception path 1") - return old_text - elif len(new_text) == 0: - return old_text - elif len(old_text) == 0: - return new_text - elif len(old_text) >= window_size and len(new_text) >= window_size: - # Find the window where the cumulative string distance - # between the text in that window in the old/new transcription - # is minimized. - - best_match_i = None - best_match_j = None - best_match_d = window_size * 1000 - - # The number of old slices to look at. Since the old text can grow - # unboundedly, it's crucial that we don't compare to every possible - # slice in the old and new transcriptions (O(N^2) time complexity). - # This is still wildly inefficient, but good enough for continuous - # transcription in a game bound by a single CPU core, like VRChat. - max_old_slices = 150 - old_n_slices = min(max_old_slices, len(old_text)) - last_old_window = len(old_text) - window_size - first_old_window = max(last_old_window - old_n_slices, 0) - - for i in range(first_old_window, last_old_window + 1): - old_slice = old_text[i:i + window_size] - - for j in range(0, 1 + len(new_text) - window_size): - new_slice = new_text[j:j + window_size] - cur_d = editdistance.eval(old_slice, new_slice) - if cur_d < best_match_d: - best_match_i = i - best_match_j = j - best_match_d = cur_d - - if DEBUG: - print("optimum at old '{}' i={} new '{}' j={} d={}".format( - old_slice, i, new_slice, j, cur_d)) - - old_prefix = old_text[0:best_match_i] - overlap = new_text[best_match_j:best_match_j + window_size] - new_suffix = new_text[best_match_j + window_size:] - - if DEBUG: - print("Best match i: {}".format(best_match_i)) - print("Best match j: {}".format(best_match_j)) - print("Window size: {}".format(window_size)) - print("Old prefix: {}".format(old_prefix)) - print("Overlap: {}".format(overlap)) - print("New suffix: {}".format(new_suffix)) - print("Input 1: {}".format(old_text)) - print("Input 2: {}".format(new_text)) - print("Output: {}".format(old_prefix + - new_text[best_match_j:])) - return old_prefix + new_text[best_match_j:] - else: - if DEBUG: - print("STRING MATCH exception path 2") - print(" OLD: {}".format(old_text)) - print(" NEW: {}".format(new_text)) - return new_text - -if __name__ == "__main__": - # Identical transcriptions should not be changed. - assert(matchSpaceDelimitedStrings("This is a test case.", "This is a test case.", window_size = 3) == "This is a test case.") - # A suffix should be detected and ignored. - assert(matchSpaceDelimitedStrings("This is a test case.", "is a test case.", window_size = 3) == "This is a test case.") - # A lengthening suffix should be correctly appended. - assert(matchSpaceDelimitedStrings("This is a test", "is a test case.", window_size = 3) == "This is a test case.") - # A strictly longer transcription should override the old prefix. - assert(matchSpaceDelimitedStrings("This is a test", "This is a test case.", window_size = 3) == "This is a test case.") - # Paranoia: repetitive text broke the older implementation, so I included - # some test cases without fully understanding what the old problem was. - assert(matchSpaceDelimitedStrings("test test test", "test test test test test test", window_size - = 3) == "test test test test test test") - assert(matchSpaceDelimitedStrings("test test test test test test", "test test test", window_size - = 3) == "test test test test test test") - - print(matchStrings("foo bar", "bar baz")) - print(matchStrings("alpha beta", "beta gamma")) - - in1 = "Okay, what about now? Looks like it sort of works. Key word being sort of." - in2 = "okay what about now looks like it sort of works key word being sort of looks" - bad_out = "Okay, what about now? Looks like it sort of works. Key word being sort of works key word being sort of looks" - good_out = "Okay what about now looks like it sort of works key word being sort of looks" - good_out = "Okay, what about now? Looks like it sort of works. Key word being sort of looks" - print(matchStrings(in1, in2)) - assert(matchStrings(in1, in2) == good_out) - - in1 = "This repository can take" - in2 = "This repository contains the code for" - bad_out = "This repository can tode for" - good_out = "This repository contains the code for" - assert(matchStrings(in1, in2) == good_out) - - in1 = "See something." - in2 = "See something. Say something." - bad_out = in1 - good_out = in2 - print(matchStrings(in1, in2)) - assert(matchStrings(in1, in2) == bad_out) - - in1 = "a" * 1000 - in2 = "b" * 10 * 1000 - # This should be fast (< 1 second) - #matchStrings(in1, in2) - - print("Tests passed.") - diff --git a/Scripts/transcribe.py b/Scripts/transcribe.py index fe06631..9711d15 100644 --- a/Scripts/transcribe.py +++ b/Scripts/transcribe.py @@ -4,6 +4,7 @@ from datetime import datetime from emotes_v2 import EmotesState from faster_whisper import WhisperModel from functools import partial +from math import ceil from playsound import playsound from sentence_splitter import split_text_into_sentences @@ -20,12 +21,12 @@ import os import osc_ctrl import pyaudio import steamvr -import string_matcher import subprocess import sys import threading import time import transformers +import typing import wave class AudioState: @@ -48,8 +49,13 @@ class AudioState: # PyAudio stream object self.stream = None + self.committed_text = "" self.text = "" self.filtered_text = "" + # List of: + # List of tuples of: + # Segment start time, end time, and text + self.ranges_ls = [] self.frames = [] # Locks access to `text`. @@ -189,6 +195,8 @@ def resetAudioLocked(audio_state): audio_state.transcribe_sleep_duration_min_s audio_state.text = "" + audio_state.preview_text = "" + audio_state.filtered_text = "" def resetDisplayLocked(audio_state): osc_ctrl.clear(audio_state.osc_state) @@ -201,7 +209,10 @@ def resetAudio(audio_state): audio_state.transcribe_lock.release() # Transcribe the audio recorded in a file. -def transcribe(audio_state, model, frames, use_cpu: bool): +# Returns two strings: committed text, and preview text. +# Committed text is temporally stable. Preview text is *not* temporally stable, +# but is lower latency than committed text. +def transcribe(audio_state, model, frames, use_cpu: bool) -> typing.Tuple[str,str]: start_time = time.time() frames = audio_state.frames @@ -217,9 +228,41 @@ def transcribe(audio_state, model, frames, use_cpu: bool): beam_size = 5, language = audio_state.language, vad_filter = True, - without_timestamps = True) + condition_on_previous_text = True, + without_timestamps = False) + ranges = [] + for s in segments: + #print(f"Segment: {s}") + ranges.append((s.start, s.end, s.text)) + audio_state.ranges_ls.append(ranges) + + committed_text = "" + if True: + # Tuple of (start time, end time, transcript) + first_segments = [] + for ranges in audio_state.ranges_ls: + for segment in ranges: + first_segments.append(segment) + break + if len(first_segments) >= 3: + c0 = first_segments[-3] + c1 = first_segments[-2] + c2 = first_segments[-1] + #print(f"c0: {c0}, c1: {c1}, c2: {c2}") + if c0 == c1 and c1 == c2: + # For simplicity, completely reset saved audio ranges. + audio_state.ranges_ls = [] + committed_text = c2[2] + n_frames_to_drop = int(ceil(audio_state.RATE * c0[1])) + del audio_state.frames[0:n_frames_to_drop] + + preview_text = "" + for seg in ranges: + if seg[2] == committed_text: + continue + preview_text += seg[2] - return "".join(s.text for s in segments) + return (committed_text, preview_text) def transcribeAudio(audio_state, model, @@ -251,8 +294,8 @@ def transcribeAudio(audio_state, audio_state.transcribe_sleep_duration_max_s, longer_sleep_dur) - text = transcribe(audio_state, model, audio_state.frames, use_cpu) - if not text: + text, preview_text = transcribe(audio_state, model, audio_state.frames, use_cpu) + if len(text) == 0 and len(preview_text) == 0: print("no transcription, spin ({} seconds)".format(time.time() - last_transcribe_time)) last_transcribe_time = time.time() continue @@ -260,23 +303,24 @@ def transcribeAudio(audio_state, if audio_state.drop_transcription: audio_state.drop_transcription = False audio_state.text = "" + audio_state.preview_text = "" audio_state.filtered_text = "" print("drop transcription ({} seconds)".format(time.time() - last_transcribe_time)) last_transcribe_time = time.time() continue old_text = audio_state.text - audio_state.text = string_matcher.matchStrings(audio_state.text, - text, window_size = 25) + audio_state.text += text + audio_state.preview_text = audio_state.text + preview_text now = time.time() print("Transcription ({} seconds): {}".format( now - last_transcribe_time, - audio_state.text)) + audio_state.preview_text)) last_transcribe_time = now # Translate if requested. - translated = audio_state.text + translated = audio_state.preview_text if audio_state.language_target: whisper_lang = audio_state.whisper_language nllb_lang = lang_compat.whisper_to_nllb[whisper_lang] @@ -314,7 +358,7 @@ def transcribeAudio(audio_state, filtered_text = filtered_text.lower() audio_state.filtered_text = filtered_text - if old_text != audio_state.text: + if old_text != audio_state.preview_text: # We think the user said something, so reset the amount of # time we sleep between transcriptions to the minimum. audio_state.transcribe_no_change_count = 0 |
