summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Scripts/string_matcher.py165
-rw-r--r--Scripts/transcribe.py66
2 files changed, 55 insertions, 176 deletions
diff --git a/Scripts/string_matcher.py b/Scripts/string_matcher.py
deleted file mode 100644
index a56308a..0000000
--- a/Scripts/string_matcher.py
+++ /dev/null
@@ -1,165 +0,0 @@
-#!/usr/bin/env python3
-
-# python3 -m pip install editdistance
-# License: MIT.
-import editdistance
-
-import typing
-
-DEBUG = False
-
-# Find the window where the distance between these two transcriptions is
-# minimized and use it to stitch them together.
-def matchStringList(old_words: typing.List[str],
- new_words: typing.List[str], window_size = 6) -> str:
- if old_words == new_words:
- return " ".join(old_words)
- elif len(old_words) >= window_size and len(new_words) >= window_size:
- # Find the window where the cumulative string distance
- # between the words in that window in the old/new transcription
- # is minimized.
- old_slice = old_words[len(old_words) - window_size:]
-
- best_match_i = None
- best_match_d = window_size * 1000
-
- for i in range(0, 1 + len(new_words) - window_size):
- new_slice = new_words[i:i + window_size]
- cur_d = 0
- for j in range(0, window_size):
- cur_d += editdistance.eval(old_slice[j], new_slice[j])
- if cur_d < best_match_d:
- best_match_i = i
- best_match_d = cur_d
-
- old_prefix = old_words[0:len(old_words) - window_size]
- overlap = new_words[best_match_i:best_match_i + window_size]
- new_suffix = new_words[best_match_i + window_size:]
-
- #print("Best match i: {}".format(best_match_i))
- #print("Window size: {}".format(window_size))
- #print("Old prefix: {}".format(old_prefix))
- #print("Overlap: {}".format(overlap))
- #print("New suffix: {}".format(new_suffix))
- return " ".join(old_prefix + new_words[best_match_i:])
- else:
- return " ".join(new_words)
-
-def matchSpaceDelimitedStrings(old_text: str, new_text: str, window_size = 4) -> str:
- old_words = old_text.split()
- new_words = new_text.split()
- return matchStringList(old_words, new_words, window_size)
-
-def matchStrings(old_text: str, new_text: str, window_size = 3) -> str:
- if old_text == new_text:
- if DEBUG:
- print("STRING MATCH exception path 1")
- return old_text
- elif len(new_text) == 0:
- return old_text
- elif len(old_text) == 0:
- return new_text
- elif len(old_text) >= window_size and len(new_text) >= window_size:
- # Find the window where the cumulative string distance
- # between the text in that window in the old/new transcription
- # is minimized.
-
- best_match_i = None
- best_match_j = None
- best_match_d = window_size * 1000
-
- # The number of old slices to look at. Since the old text can grow
- # unboundedly, it's crucial that we don't compare to every possible
- # slice in the old and new transcriptions (O(N^2) time complexity).
- # This is still wildly inefficient, but good enough for continuous
- # transcription in a game bound by a single CPU core, like VRChat.
- max_old_slices = 150
- old_n_slices = min(max_old_slices, len(old_text))
- last_old_window = len(old_text) - window_size
- first_old_window = max(last_old_window - old_n_slices, 0)
-
- for i in range(first_old_window, last_old_window + 1):
- old_slice = old_text[i:i + window_size]
-
- for j in range(0, 1 + len(new_text) - window_size):
- new_slice = new_text[j:j + window_size]
- cur_d = editdistance.eval(old_slice, new_slice)
- if cur_d < best_match_d:
- best_match_i = i
- best_match_j = j
- best_match_d = cur_d
-
- if DEBUG:
- print("optimum at old '{}' i={} new '{}' j={} d={}".format(
- old_slice, i, new_slice, j, cur_d))
-
- old_prefix = old_text[0:best_match_i]
- overlap = new_text[best_match_j:best_match_j + window_size]
- new_suffix = new_text[best_match_j + window_size:]
-
- if DEBUG:
- print("Best match i: {}".format(best_match_i))
- print("Best match j: {}".format(best_match_j))
- print("Window size: {}".format(window_size))
- print("Old prefix: {}".format(old_prefix))
- print("Overlap: {}".format(overlap))
- print("New suffix: {}".format(new_suffix))
- print("Input 1: {}".format(old_text))
- print("Input 2: {}".format(new_text))
- print("Output: {}".format(old_prefix +
- new_text[best_match_j:]))
- return old_prefix + new_text[best_match_j:]
- else:
- if DEBUG:
- print("STRING MATCH exception path 2")
- print(" OLD: {}".format(old_text))
- print(" NEW: {}".format(new_text))
- return new_text
-
-if __name__ == "__main__":
- # Identical transcriptions should not be changed.
- assert(matchSpaceDelimitedStrings("This is a test case.", "This is a test case.", window_size = 3) == "This is a test case.")
- # A suffix should be detected and ignored.
- assert(matchSpaceDelimitedStrings("This is a test case.", "is a test case.", window_size = 3) == "This is a test case.")
- # A lengthening suffix should be correctly appended.
- assert(matchSpaceDelimitedStrings("This is a test", "is a test case.", window_size = 3) == "This is a test case.")
- # A strictly longer transcription should override the old prefix.
- assert(matchSpaceDelimitedStrings("This is a test", "This is a test case.", window_size = 3) == "This is a test case.")
- # Paranoia: repetitive text broke the older implementation, so I included
- # some test cases without fully understanding what the old problem was.
- assert(matchSpaceDelimitedStrings("test test test", "test test test test test test", window_size
- = 3) == "test test test test test test")
- assert(matchSpaceDelimitedStrings("test test test test test test", "test test test", window_size
- = 3) == "test test test test test test")
-
- print(matchStrings("foo bar", "bar baz"))
- print(matchStrings("alpha beta", "beta gamma"))
-
- in1 = "Okay, what about now? Looks like it sort of works. Key word being sort of."
- in2 = "okay what about now looks like it sort of works key word being sort of looks"
- bad_out = "Okay, what about now? Looks like it sort of works. Key word being sort of works key word being sort of looks"
- good_out = "Okay what about now looks like it sort of works key word being sort of looks"
- good_out = "Okay, what about now? Looks like it sort of works. Key word being sort of looks"
- print(matchStrings(in1, in2))
- assert(matchStrings(in1, in2) == good_out)
-
- in1 = "This repository can take"
- in2 = "This repository contains the code for"
- bad_out = "This repository can tode for"
- good_out = "This repository contains the code for"
- assert(matchStrings(in1, in2) == good_out)
-
- in1 = "See something."
- in2 = "See something. Say something."
- bad_out = in1
- good_out = in2
- print(matchStrings(in1, in2))
- assert(matchStrings(in1, in2) == bad_out)
-
- in1 = "a" * 1000
- in2 = "b" * 10 * 1000
- # This should be fast (< 1 second)
- #matchStrings(in1, in2)
-
- print("Tests passed.")
-
diff --git a/Scripts/transcribe.py b/Scripts/transcribe.py
index fe06631..9711d15 100644
--- a/Scripts/transcribe.py
+++ b/Scripts/transcribe.py
@@ -4,6 +4,7 @@ from datetime import datetime
from emotes_v2 import EmotesState
from faster_whisper import WhisperModel
from functools import partial
+from math import ceil
from playsound import playsound
from sentence_splitter import split_text_into_sentences
@@ -20,12 +21,12 @@ import os
import osc_ctrl
import pyaudio
import steamvr
-import string_matcher
import subprocess
import sys
import threading
import time
import transformers
+import typing
import wave
class AudioState:
@@ -48,8 +49,13 @@ class AudioState:
# PyAudio stream object
self.stream = None
+ self.committed_text = ""
self.text = ""
self.filtered_text = ""
+ # List of:
+ # List of tuples of:
+ # Segment start time, end time, and text
+ self.ranges_ls = []
self.frames = []
# Locks access to `text`.
@@ -189,6 +195,8 @@ def resetAudioLocked(audio_state):
audio_state.transcribe_sleep_duration_min_s
audio_state.text = ""
+ audio_state.preview_text = ""
+ audio_state.filtered_text = ""
def resetDisplayLocked(audio_state):
osc_ctrl.clear(audio_state.osc_state)
@@ -201,7 +209,10 @@ def resetAudio(audio_state):
audio_state.transcribe_lock.release()
# Transcribe the audio recorded in a file.
-def transcribe(audio_state, model, frames, use_cpu: bool):
+# Returns two strings: committed text, and preview text.
+# Committed text is temporally stable. Preview text is *not* temporally stable,
+# but is lower latency than committed text.
+def transcribe(audio_state, model, frames, use_cpu: bool) -> typing.Tuple[str,str]:
start_time = time.time()
frames = audio_state.frames
@@ -217,9 +228,41 @@ def transcribe(audio_state, model, frames, use_cpu: bool):
beam_size = 5,
language = audio_state.language,
vad_filter = True,
- without_timestamps = True)
+ condition_on_previous_text = True,
+ without_timestamps = False)
+ ranges = []
+ for s in segments:
+ #print(f"Segment: {s}")
+ ranges.append((s.start, s.end, s.text))
+ audio_state.ranges_ls.append(ranges)
+
+ committed_text = ""
+ if True:
+ # Tuple of (start time, end time, transcript)
+ first_segments = []
+ for ranges in audio_state.ranges_ls:
+ for segment in ranges:
+ first_segments.append(segment)
+ break
+ if len(first_segments) >= 3:
+ c0 = first_segments[-3]
+ c1 = first_segments[-2]
+ c2 = first_segments[-1]
+ #print(f"c0: {c0}, c1: {c1}, c2: {c2}")
+ if c0 == c1 and c1 == c2:
+ # For simplicity, completely reset saved audio ranges.
+ audio_state.ranges_ls = []
+ committed_text = c2[2]
+ n_frames_to_drop = int(ceil(audio_state.RATE * c0[1]))
+ del audio_state.frames[0:n_frames_to_drop]
+
+ preview_text = ""
+ for seg in ranges:
+ if seg[2] == committed_text:
+ continue
+ preview_text += seg[2]
- return "".join(s.text for s in segments)
+ return (committed_text, preview_text)
def transcribeAudio(audio_state,
model,
@@ -251,8 +294,8 @@ def transcribeAudio(audio_state,
audio_state.transcribe_sleep_duration_max_s,
longer_sleep_dur)
- text = transcribe(audio_state, model, audio_state.frames, use_cpu)
- if not text:
+ text, preview_text = transcribe(audio_state, model, audio_state.frames, use_cpu)
+ if len(text) == 0 and len(preview_text) == 0:
print("no transcription, spin ({} seconds)".format(time.time() - last_transcribe_time))
last_transcribe_time = time.time()
continue
@@ -260,23 +303,24 @@ def transcribeAudio(audio_state,
if audio_state.drop_transcription:
audio_state.drop_transcription = False
audio_state.text = ""
+ audio_state.preview_text = ""
audio_state.filtered_text = ""
print("drop transcription ({} seconds)".format(time.time() - last_transcribe_time))
last_transcribe_time = time.time()
continue
old_text = audio_state.text
- audio_state.text = string_matcher.matchStrings(audio_state.text,
- text, window_size = 25)
+ audio_state.text += text
+ audio_state.preview_text = audio_state.text + preview_text
now = time.time()
print("Transcription ({} seconds): {}".format(
now - last_transcribe_time,
- audio_state.text))
+ audio_state.preview_text))
last_transcribe_time = now
# Translate if requested.
- translated = audio_state.text
+ translated = audio_state.preview_text
if audio_state.language_target:
whisper_lang = audio_state.whisper_language
nllb_lang = lang_compat.whisper_to_nllb[whisper_lang]
@@ -314,7 +358,7 @@ def transcribeAudio(audio_state,
filtered_text = filtered_text.lower()
audio_state.filtered_text = filtered_text
- if old_text != audio_state.text:
+ if old_text != audio_state.preview_text:
# We think the user said something, so reset the amount of
# time we sleep between transcriptions to the minimum.
audio_state.transcribe_no_change_count = 0