From 3e1f0797783b8cdc16b51a05ff601bff6729b5a4 Mon Sep 17 00:00:00 2001 From: yum Date: Thu, 23 Feb 2023 13:28:13 -0800 Subject: Implement streaming output for synchronous multiprocessing layer Synchronous multiprocessing layer now accepts a callback, which the caller can use to stream output to the UI. --- GUI/GUI/GUI/PythonWrapper.cpp | 144 ++++++++++++++++++++++++++++-------------- GUI/GUI/GUI/PythonWrapper.h | 10 +++ GUI/GUI/GUI/WhisperCPP.cpp | 26 +++++--- 3 files changed, 122 insertions(+), 58 deletions(-) (limited to 'GUI') diff --git a/GUI/GUI/GUI/PythonWrapper.cpp b/GUI/GUI/GUI/PythonWrapper.cpp index 0f835e9..52f283a 100644 --- a/GUI/GUI/GUI/PythonWrapper.cpp +++ b/GUI/GUI/GUI/PythonWrapper.cpp @@ -67,11 +67,34 @@ std::string GetWin32ErrMsg() { return std::to_string(error) + ": " + err_msg; } -bool PythonWrapper::InvokeCommandWithArgs( - const std::string& cmd, - std::vector&& args, - std::string* py_stdout, std::string* py_stderr) { +std::string DrainWin32Pipe(const HANDLE pipe) { + DWORD bytes_avail; + std::ostringstream oss; + if (PeekNamedPipe( + pipe, + nullptr, // buffer to read into + 0, // buffer size + nullptr, // bytes read + &bytes_avail, + nullptr // bytes left in this message + )) { + DWORD cur_bytes_read = 0; + DWORD sum_bytes_read = 0; + std::vector buf(4096, 0); + while (sum_bytes_read < bytes_avail && + ReadFile(pipe, buf.data(), buf.size() - 1, &cur_bytes_read, NULL) && + cur_bytes_read > 0) { + oss << std::string(buf.data(), cur_bytes_read); + sum_bytes_read += cur_bytes_read; + } + } + return oss.str(); +} + +bool PythonWrapper::InvokeCommandWithArgs(const std::string& cmd, + std::vector&& args, + const std::function&& out_cb) { std::ostringstream cmd_oss; cmd_oss << cmd; for (const auto& arg : args) { @@ -84,12 +107,10 @@ bool PythonWrapper::InvokeCommandWithArgs( stdout_sec_attr.nLength = sizeof(stdout_sec_attr); stdout_sec_attr.bInheritHandle = TRUE; if (!CreatePipe(&stdout_read, &stdout_write, &stdout_sec_attr, 0)) { - if (py_stderr) { - std::ostringstream err_oss; - err_oss << "Error while executing python command \"" << cmd_oss.str() - << "\": Failed to create stdout pipe: " << GetWin32ErrMsg() << std::endl; - *py_stderr = err_oss.str(); - } + std::ostringstream err_oss; + err_oss << "Error while executing python command \"" << cmd_oss.str() + << "\": Failed to create stdout pipe: " << GetWin32ErrMsg() << std::endl; + out_cb("", err_oss.str()); return false; } ScopeGuard stdout_cleanup([&]() { @@ -109,12 +130,10 @@ bool PythonWrapper::InvokeCommandWithArgs( stderr_sec_attr.bInheritHandle = TRUE; if (!CreatePipe(&stderr_read, &stderr_write, &stderr_sec_attr, 0)) { - if (py_stderr) { - std::ostringstream err_oss; - err_oss << "Error while executing python command \"" << cmd_oss.str() - << "\": Failed to create stderr pipe: " << GetWin32ErrMsg() << std::endl; - *py_stderr = err_oss.str(); - } + std::ostringstream err_oss; + err_oss << "Error while executing python command \"" << cmd_oss.str() + << "\": Failed to create stderr pipe: " << GetWin32ErrMsg() << std::endl; + out_cb("", err_oss.str()); return false; } ScopeGuard stderr_cleanup([&]() { @@ -147,7 +166,7 @@ bool PythonWrapper::InvokeCommandWithArgs( std::ostringstream err_oss; err_oss << "Error while executing python command \"" << cmd_oss.str() << "\": Failed to get PATH env variable: " << GetWin32ErrMsg() << std::endl; - *py_stderr = err_oss.str(); + out_cb("", err_oss.str()); return false; } // TODO(yum) add git to PATH @@ -165,13 +184,10 @@ bool PythonWrapper::InvokeCommandWithArgs( std::filesystem::current_path().string().c_str(), // current directory &si, &pi)) { - if (py_stderr) { - std::ostringstream err_oss; - - err_oss << "Error while executing python command \"" << cmd_oss.str() - << "\": Failed to launch process: " << GetWin32ErrMsg(); - *py_stderr = err_oss.str(); - } + std::ostringstream err_oss; + err_oss << "Error while executing python command \"" << cmd_oss.str() + << "\": Failed to launch process: " << GetWin32ErrMsg(); + out_cb("", err_oss.str()); return false; } ScopeGuard pi_cleanup([&] { @@ -179,47 +195,59 @@ bool PythonWrapper::InvokeCommandWithArgs( CloseHandle(pi.hThread); }); - WaitForSingleObject(pi.hProcess, INFINITE); + // While the process is running, drain output every 10 ms. + DWORD timeout_ms = 10; + DWORD ret = WAIT_TIMEOUT; + while (ret == WAIT_TIMEOUT) { + DWORD ret = WaitForSingleObject(pi.hProcess, timeout_ms); + if (ret != WAIT_TIMEOUT) { + break; + } + std::ostringstream stdout_oss, stderr_oss; + stdout_oss << DrainWin32Pipe(stdout_read); + stderr_oss << DrainWin32Pipe(stderr_read); + out_cb(stdout_oss.str(), stderr_oss.str()); + } std::ostringstream stdout_oss, stderr_oss; DWORD exit_code; if (!GetExitCodeProcess(pi.hProcess, &exit_code)) { - stderr_oss << "Failed to get exit code!\n"; + stderr_oss << "Failed to get exit code: " << GetWin32ErrMsg(); } if (exit_code != 0) { stderr_oss << "Command exited with code " << exit_code << ": " - << GetWin32ErrMsg() << std::endl; + << GetWin32ErrMsg(); } - WaitForSingleObject(pi.hProcess, INFINITE); - // Close write ends of pipes. If we don't do this, the last read will block forever. CloseHandle(stdout_write); stdout_write = 0; CloseHandle(stderr_write); stderr_write = 0; - std::vector buf(4096, 0); - DWORD bytes_read = 0; - while (ReadFile(stdout_read, buf.data(), buf.size(), &bytes_read, NULL) && bytes_read > 0) { - stdout_oss << std::string(buf.data(), bytes_read); - } - if (bytes_read < 0) { - stdout_oss << "Reading stdout failed: " << GetWin32ErrMsg(); - } - bytes_read = 0; - while (ReadFile(stderr_read, buf.data(), buf.size(), &bytes_read, NULL) && bytes_read > 0) { - stderr_oss << std::string(buf.data(), bytes_read); - } - if (bytes_read < 0) { - stderr_oss << "Reading stderr failed: " << GetWin32ErrMsg(); - } + stdout_oss << DrainWin32Pipe(stdout_read); + stderr_oss << DrainWin32Pipe(stderr_read); + out_cb(stdout_oss.str(), stderr_oss.str()); + + return exit_code == 0; +} + +bool PythonWrapper::InvokeCommandWithArgs( + const std::string& cmd, + std::vector&& args, + std::string* py_stdout, std::string* py_stderr) { - *py_stdout = stdout_oss.str(); + std::ostringstream out_oss, err_oss; + auto out_cb = [&](const std::string& out, const std::string& err) { + out_oss << out; + err_oss << err; + }; + bool ret = InvokeCommandWithArgs(cmd, std::move(args), std::move(out_cb)); if (py_stderr) { - *py_stderr = stderr_oss.str(); + *py_stderr = err_oss.str(); } - return exit_code == 0; + *py_stdout = out_oss.str(); + return ret; } bool PythonWrapper::InvokeWithArgs(std::vector&& args, @@ -251,6 +279,12 @@ bool PythonWrapper::InvokeWithArgs(std::vector&& args, } } +bool PythonWrapper::InvokeWithArgs(std::vector&& args, + const std::function&& out_cb) { + return InvokeCommandWithArgs("Resources/Python/python.exe", + std::move(args), std::move(out_cb)); +} + std::string PythonWrapper::GetVersion() { std::string py_stdout, py_stderr; @@ -272,15 +306,27 @@ std::string PythonWrapper::DumpMics() { } bool PythonWrapper::InstallPip(std::string* out, std::string* err) { - std::string result; + std::ostringstream out_oss, err_oss; + auto out_cb = [&](const std::string& out, const std::string& err) { + out_oss << out; + err_oss << err; + }; + bool ret = InstallPip(std::move(out_cb)); + *out = out_oss.str(); + if (err) { + *err = err_oss.str(); + } + return ret; +} +bool PythonWrapper::InstallPip(const std::function&& out_cb) { std::filesystem::path pip_flag = "Resources/Python/.pip_installed"; if (std::filesystem::exists(pip_flag)) { return true; } std::string pip_path = "Resources/Python/get-pip.py"; - if (!InvokeWithArgs({ pip_path }, out, err)) { + if (!InvokeWithArgs({ pip_path }, std::move(out_cb))) { return false; } diff --git a/GUI/GUI/GUI/PythonWrapper.h b/GUI/GUI/GUI/PythonWrapper.h index 3433b0f..12f56cc 100644 --- a/GUI/GUI/GUI/PythonWrapper.h +++ b/GUI/GUI/GUI/PythonWrapper.h @@ -32,6 +32,12 @@ namespace PythonWrapper std::string* py_stdout, std::string* py_stderr = NULL); + // Invoke a command on the shell with arguments. + // On error, sets `out` to an error message and returns false. + bool InvokeCommandWithArgs(const std::string& cmd, + std::vector&& args, + const std::function&& out_cb); + // Invoke the interpreter with arguments. // On error, sets `out` to an error message and returns false. bool InvokeWithArgs(std::vector&& args, std::string* py_stdout, @@ -40,6 +46,9 @@ namespace PythonWrapper bool InvokeWithArgs(std::vector&& args, const std::string&& err_msg, wxTextCtrl* out); + bool InvokeWithArgs(std::vector&& args, + const std::function&& out_cb); + // Execute python --version. std::string GetVersion(); @@ -47,6 +56,7 @@ namespace PythonWrapper std::string DumpMics(); // Execute get-pip.py. + bool InstallPip(const std::function&& out_cb); bool InstallPip(std::string* out, std::string* err = nullptr); // TODO(yum) both StartApp and GenerateAnimator should be diff --git a/GUI/GUI/GUI/WhisperCPP.cpp b/GUI/GUI/GUI/WhisperCPP.cpp index b730236..3007834 100644 --- a/GUI/GUI/GUI/WhisperCPP.cpp +++ b/GUI/GUI/GUI/WhisperCPP.cpp @@ -156,16 +156,18 @@ bool WhisperCPP::InstallDependencies() { return true; } - std::string py_stdout, py_stderr; + std::function out_cb = + [&](const std::string& out, const std::string& err) -> void { + Log(out_, out); + Log(out_, err); + }; bool ret = PythonWrapper::InvokeWithArgs({ "-u", // Unbuffered output "-m pip", "install", "-r Resources/Scripts/whisper_requirements.txt", - }, &py_stdout, &py_stderr); + }, std::move(out_cb)); - Log(out_, py_stdout); - Log(out_, py_stderr); if (!ret) { Log(out_, "Failed to install dependencies!\n"); return false; @@ -247,11 +249,17 @@ void WhisperCPP::Start(const AppConfig& c) { } ScopeGuard mic_stream_cleanup([mic_stream]() { mic_stream->Release(); }); - std::string pip_out, pip_err; - Log(out_, "Installing pip\n"); - if (!PythonWrapper::InstallPip(&pip_out, &pip_err)) { - Log(out_, "Failed to install pip: {}\n", pip_err); - return; + { + std::function out_cb = + [&](const std::string& out, const std::string& err) -> void { + Log(out_, out); + Log(out_, err); + }; + Log(out_, "Installing pip\n"); + if (!PythonWrapper::InstallPip(std::move(out_cb))) { + Log(out_, "Failed to install pip!\n"); + return; + } } Log(out_, "Installing Python dependencies\n"); if (!InstallDependencies()) { -- cgit v1.2.3