summaryrefslogtreecommitdiffstats
path: root/source/core/unix
diff options
context:
space:
mode:
authorjsmall-nvidia <jsmall@nvidia.com>2021-11-10 17:33:22 -0500
committerGitHub <noreply@github.com>2021-11-10 17:33:22 -0500
commit8a9e518371df03b3f382e0fe869da83751fdda0b (patch)
tree749f9c1c79acd375ec3ee97e45a10007dd6632fa /source/core/unix
parent95e82acc0b32c81a9c6ac39708d18a423d8c7b1e (diff)
Interprocess communication via pipes (#2009)
* #include an absolute path didn't work - because paths were taken to always be relative. * Use 'Process' to communicate with an command line tool. * Remove slang-win-stream * Tidy up windows ProcessUtil. * First version of BufferedReadStream. * Windows working IPC for steams. * Test proxy count option. * Split Process/ProcessUtil. Process is platform dependant. ProcessUtil are functions that are platform independent. * First implementation of Unix Process interface. * Unix process compiles on cygwin. * Fix typo in unix process. * Separate unix pipe stream error of invalid access, from pipe availability. * Fix in standard line extraction. * Make fd non blocking. * Fix issues with Windows Process streams. * Added UnixPipe. * Some fixes around UnixPipeStream. * Make a unix stream closed explicit. * Hack to debug linux process/stream. * Revert to old linux pipe handling. * Pass executable path for unit tests. Split out CommandLine into own source. * Small improvements in process/command line. * Check process behavior with crash. * Make stderr and stdout unbuffered for crash testing. * Only turn disable buffering in crash test. * Disable crash test on CI. * Fix crash on clang/linux. * Enable crash test. Remove _appendBuffer as can use StreamUtil functionality.
Diffstat (limited to 'source/core/unix')
-rw-r--r--source/core/unix/slang-unix-process-util.cpp246
-rw-r--r--source/core/unix/slang-unix-process.cpp430
2 files changed, 430 insertions, 246 deletions
diff --git a/source/core/unix/slang-unix-process-util.cpp b/source/core/unix/slang-unix-process-util.cpp
deleted file mode 100644
index af49eec37..000000000
--- a/source/core/unix/slang-unix-process-util.cpp
+++ /dev/null
@@ -1,246 +0,0 @@
-// slang-unix-process-util.cpp
-#include "../slang-process-util.h"
-
-#include "../slang-common.h"
-#include "../slang-string-util.h"
-#include "../slang-string-escape-util.h"
-#include "../slang-memory-arena.h"
-
-#include <stdio.h>
-#include <stdlib.h>
-
-//#include <dirent.h>
-#include <errno.h>
-#include <poll.h>
-#include <sys/stat.h>
-#include <sys/types.h>
-#include <sys/wait.h>
-#include <unistd.h>
-
-#include <time.h>
-
-namespace Slang {
-
-
-/* static */UnownedStringSlice ProcessUtil::getExecutableSuffix()
-{
-#if __CYGWIN__
- return UnownedStringSlice::fromLiteral(".exe");
-#else
- return UnownedStringSlice::fromLiteral("");
-#endif
-}
-
-/* static */StringEscapeHandler* ProcessUtil::getEscapeHandler()
-{
- return StringEscapeUtil::getHandler(StringEscapeUtil::Style::Space);
-}
-
-/* static */String ProcessUtil::getCommandLineString(const CommandLine& commandLine)
-{
- auto escapeHandler = getEscapeHandler();
-
- // When outputting the command line we potentially need to escape the path to the
- // command and args - that aren't already explicitly marked as escaped.
-
- StringBuilder cmd;
- StringEscapeUtil::appendMaybeQuoted(escapeHandler, commandLine.m_executable.getUnownedSlice(), cmd);
- for (const auto& arg : commandLine.m_args)
- {
- cmd << " ";
-
- StringEscapeUtil::appendMaybeQuoted(escapeHandler, arg.getUnownedSlice(), cmd);
- }
- return cmd.ToString();
-}
-
-/* static */SlangResult ProcessUtil::execute(const CommandLine& commandLine, ExecuteResult& outExecuteResult)
-{
- outExecuteResult.init();
-
- List<char const*> argPtrs;
-
- // Add the command
- argPtrs.add(commandLine.m_executable.getBuffer());
-
- // Add all the args - they don't need any explicit escaping
- for (auto arg : commandLine.m_args)
- {
- // All args for this target must be unescaped (as they are in CommandLine)
- argPtrs.add(arg.getBuffer());
- }
-
- // Terminate with a null
- argPtrs.add(nullptr);
-
- int stdoutPipe[2];
- int stderrPipe[2];
-
- if (pipe(stdoutPipe) == -1)
- {
- fprintf(stderr, "error: `pipe` failed\n");
- return SLANG_FAIL;
- }
-
- if (pipe(stderrPipe) == -1)
- {
- fprintf(stderr, "error: `pipe` failed\n");
- return SLANG_FAIL;
- }
-
- pid_t childProcessID = fork();
- if (childProcessID == -1)
- {
- fprintf(stderr, "error: `fork` failed\n");
- return SLANG_FAIL;
- }
-
- if (childProcessID == 0)
- {
- // We are the child process.
-
- dup2(stdoutPipe[1], STDOUT_FILENO);
- dup2(stderrPipe[1], STDERR_FILENO);
-
- close(stdoutPipe[0]);
- close(stdoutPipe[1]);
-
- close(stderrPipe[0]);
- close(stderrPipe[1]);
-
- execvp(argPtrs[0], (char* const*)&argPtrs[0]);
-
- // If we get here, then `exec` failed
- fprintf(stderr, "error: `exec` failed\n");
- return SLANG_FAIL;
- }
- else
- {
- // We are the parent process
-
- close(stdoutPipe[1]);
- close(stderrPipe[1]);
-
- int stdoutFD = stdoutPipe[0];
- int stderrFD = stderrPipe[0];
-
- pollfd pollInfos[2];
- nfds_t pollInfoCount = 2;
-
- pollInfos[0].fd = stdoutFD;
- pollInfos[0].events = POLLIN;
- pollInfos[0].revents = 0;
- pollInfos[1].fd = stderrFD;
- pollInfos[1].events = POLLIN;
- pollInfos[1].revents = 0;
-
- int remainingCount = 2;
- int iterations = 0;
- while (remainingCount)
- {
- // Safeguard against infinite loop:
- iterations++;
- if (iterations > 10000)
- {
- fprintf(stderr, "poll(): %d iterations\n", iterations);
- return SLANG_FAIL;
- }
-
- // Set a timeout of 100 seconds;
- // we really shouldn't wait too long...
- int pollTimeout = 100000;
- int pollResult = poll(pollInfos, pollInfoCount, pollTimeout);
- if (pollResult <= 0)
- {
- // If there was a signal that got in
- // the way, then retry...
- if (pollResult == -1 && errno == EINTR)
- continue;
-
- // timeout or error...
- fprintf(stderr, "error: `poll` failed or timed out\n");
- return SLANG_FAIL;
- }
-
- enum { kBufferSize = 1024 };
- char buffer[kBufferSize];
-
- if (pollInfos[0].revents)
- {
- auto count = read(stdoutFD, buffer, kBufferSize);
- if (count <= 0)
- {
- // end-of-file
- close(stdoutFD);
- pollInfos[0].fd = -1;
- remainingCount--;
- }
-
- outExecuteResult.standardOutput.append(buffer, buffer + count);
- }
-
- if (pollInfos[1].revents)
- {
- auto count = read(stderrFD, buffer, kBufferSize);
- if (count <= 0)
- {
- // end-of-file
- close(stderrFD);
- pollInfos[1].fd = -1;
- remainingCount--;
- }
-
- outExecuteResult.standardError.append(buffer, buffer + count);
- }
- }
-
- int childStatus = 0;
- iterations = 0;
- for (;;)
- {
- // Safeguard against infinite loop:
- iterations++;
- if (iterations > 10000)
- {
- fprintf(stderr, "waitpid(): %d iterations\n", iterations);
- return SLANG_FAIL;
- }
-
- pid_t terminatedProcessID = waitpid(childProcessID, &childStatus, 0);
- if (terminatedProcessID == -1)
- {
- fprintf(stderr, "error: `waitpid` failed\n");
- return SLANG_FAIL;
- }
-
- if (terminatedProcessID == childProcessID)
- {
- if (WIFEXITED(childStatus))
- {
- outExecuteResult.resultCode = (int)(int8_t)WEXITSTATUS(childStatus);
- }
- else
- {
- outExecuteResult.resultCode = 1;
- }
- return SLANG_OK;
- }
- }
- }
-
- return SLANG_FAIL;
-}
-
-/* static */uint64_t ProcessUtil::getClockFrequency()
-{
- return 1000000000;
-}
-
-/* static */uint64_t ProcessUtil::getClockTick()
-{
- struct timespec now;
- clock_gettime(CLOCK_MONOTONIC, &now);
- return uint64_t(now.tv_sec) * 1000000000 + now.tv_nsec;
-}
-
-} // namespace Slang
diff --git a/source/core/unix/slang-unix-process.cpp b/source/core/unix/slang-unix-process.cpp
new file mode 100644
index 000000000..083160f02
--- /dev/null
+++ b/source/core/unix/slang-unix-process.cpp
@@ -0,0 +1,430 @@
+// slang-unix-process.cpp
+#include "../slang-process.h"
+
+#include "../slang-common.h"
+#include "../slang-string-util.h"
+#include "../slang-string-escape-util.h"
+#include "../slang-memory-arena.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+
+//#include <dirent.h>
+#include <errno.h>
+#include <poll.h>
+#include <fcntl.h>
+
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <unistd.h>
+
+#include <time.h>
+
+namespace Slang {
+
+class UnixProcess : public Process
+{
+public:
+ // Process
+ virtual bool isTerminated() SLANG_OVERRIDE;
+ virtual void waitForTermination() SLANG_OVERRIDE;
+
+ UnixProcess(pid_t pid, Stream*const* streams);
+
+protected:
+ /// Returns true if terminated
+ bool _updateTerminationState(int options);
+
+ bool m_isTerminated = false; ///< True if ths process is terminated
+ pid_t m_pid; ///< The process id
+};
+
+class UnixPipeStream : public Stream
+{
+public:
+ typedef UnixPipeStream ThisType;
+
+ // Stream
+ virtual Int64 getPosition() SLANG_OVERRIDE { return 0; }
+ virtual SlangResult seek(SeekOrigin origin, Int64 offset) SLANG_OVERRIDE { SLANG_UNUSED(origin); SLANG_UNUSED(offset); return SLANG_E_NOT_AVAILABLE; }
+ virtual SlangResult read(void* buffer, size_t length, size_t& outReadBytes) SLANG_OVERRIDE;
+ virtual SlangResult write(const void* buffer, size_t length) SLANG_OVERRIDE;
+ virtual bool isEnd() SLANG_OVERRIDE { return m_isClosed; }
+ virtual bool canRead() SLANG_OVERRIDE { return _has(FileAccess::Read) && !m_isClosed; }
+ virtual bool canWrite() SLANG_OVERRIDE { return _has(FileAccess::Write) && !m_isClosed; }
+ virtual void close() SLANG_OVERRIDE;
+ virtual SlangResult flush() SLANG_OVERRIDE;
+
+ UnixPipeStream(int fd, FileAccess access, bool isOwned) :
+ m_fd(fd),
+ m_access(access),
+ m_isOwned(isOwned),
+ m_isClosed(false)
+ {
+ }
+
+protected:
+ /// This read file descriptor non blocking. Doing so will change the behavior of
+ /// read - it can fail and return an error indicating there is no data, instead of blocking.
+ /// Currently this mechanism isn't used, as checking via poll seemed to work.
+ void _setReadNonBlocking()
+ {
+ // Makes non blocking
+ if (_has(FileAccess::Read))
+ {
+ // Make non blocking, for read
+ fcntl(m_fd, F_SETFL, fcntl(m_fd, F_GETFL) | O_NONBLOCK);
+ }
+ }
+ bool _has(FileAccess access) const { return (Index(access) & Index(m_access)) != 0; }
+
+ bool m_isClosed; ///< If true this stream has been closed (ie cannot read/write to anymore)
+ bool m_isOwned; ///< True if m_fd is owned by this object.
+ FileAccess m_access; ///< Access allowed to this stream - either Read or Write
+ int m_fd; /// The 'file descriptor' for the pipe
+};
+
+/* !!!!!!!!!!!!!!!!!!!!!! UnixProcess !!!!!!!!!!!!!!!!!!!!!!!!!!!! */
+
+UnixProcess::UnixProcess(pid_t pid, Stream* const* streams):
+ m_pid(pid)
+{
+ // Set to an 'odd value'
+ m_returnValue = -1;
+
+ for (Index i = 0; i < SLANG_COUNT_OF(m_streams); ++i)
+ {
+ m_streams[i] = streams[i];
+ }
+}
+
+bool UnixProcess::_updateTerminationState(int options)
+{
+ if (!m_isTerminated)
+ {
+ int childStatus;
+ const pid_t terminatedPid = waitpid(m_pid, &childStatus, options);
+ if (terminatedPid == -1)
+ {
+ // Guess we should just mark as terminated
+ m_isTerminated = true;
+
+ fprintf(stderr, "error: `waitpid` failed\n");
+ }
+ else if (terminatedPid == m_pid)
+ {
+ if (WIFEXITED(childStatus))
+ {
+ m_returnValue = (int)(int8_t)WEXITSTATUS(childStatus);
+ }
+ m_isTerminated = true;
+ }
+ }
+ return m_isTerminated;
+}
+
+bool UnixProcess::isTerminated()
+{
+ if (m_isTerminated)
+ {
+ return true;
+ }
+ return _updateTerminationState(WNOHANG);
+}
+
+void UnixProcess::waitForTermination()
+{
+ while (!_updateTerminationState(0));
+}
+
+/* !!!!!!!!!!!!!!!!!!!!!! UnixPipeStream !!!!!!!!!!!!!!!!!!!!!!!!!!!! */
+
+void UnixPipeStream::close()
+{
+ if (!m_isClosed)
+ {
+ if (m_isOwned)
+ {
+ ::close(m_fd);
+ }
+
+ m_isClosed = true;
+ // Make something hopefully invalid
+ m_fd = -1;
+ }
+}
+
+SlangResult UnixPipeStream::flush()
+{
+#if 0
+ // https://stackoverflow.com/questions/43184035/flushing-pipe-without-closing-in-c
+ // Makes the case that flushing is not applicable with pipes.
+ if (canWrite())
+ {
+ // We might want to use
+ ::fsync(m_fd);
+ }
+#endif
+ return SLANG_OK;
+}
+
+SlangResult UnixPipeStream::read(void* buffer, size_t length, size_t& outReadBytes)
+{
+ outReadBytes = 0;
+
+ if (!_has(FileAccess::Read))
+ {
+ return SLANG_E_NOT_AVAILABLE;
+ }
+ if (m_isClosed)
+ {
+ return SLANG_OK;
+ }
+
+ // Check if it's hung up.
+ pollfd pollInfo;
+
+ pollInfo.fd = m_fd;
+ pollInfo.events = POLLIN | POLLHUP;
+ pollInfo.revents = 0;
+
+ // https://linux.die.net/man/2/poll
+
+ // Return immediately
+ const int pollTimeout = 0;
+
+ const int pollResult = ::poll(&pollInfo, 1, pollTimeout);
+ if (pollResult < 0)
+ {
+ return SLANG_FAIL;
+ }
+
+ // If there is data read that first
+ if (pollInfo.revents & POLLIN)
+ {
+ auto count = ::read(m_fd, buffer, length);
+
+ // If it's -1 it seems like an error
+ if (count == -1)
+ {
+ const int err = errno;
+
+ // On non blocking pipe these indicate there could be more to come
+ if (err == EAGAIN || err == EWOULDBLOCK)
+ {
+ return SLANG_OK;
+ }
+ // Okay - guess we have an error then
+ return SLANG_FAIL;
+ }
+
+ outReadBytes = size_t(count);
+ return SLANG_OK;
+ }
+
+ if (pollInfo.revents & POLLHUP)
+ {
+ close();
+ }
+
+ return SLANG_OK;
+}
+
+SlangResult UnixPipeStream::write(const void* buffer, size_t length)
+{
+ if (!_has(FileAccess::Write))
+ {
+ return SLANG_E_NOT_AVAILABLE;
+ }
+ if (m_isClosed)
+ {
+ // The pipe is closed
+ return SLANG_FAIL;
+ }
+
+ pollfd pollInfo;
+
+ pollInfo.fd = m_fd;
+ pollInfo.events = POLLHUP;
+ pollInfo.revents = 0;
+
+ // https://linux.die.net/man/2/poll
+
+ // Return immediately
+ const int pollTimeout = 0;
+
+ int pollResult = ::poll(&pollInfo, 1, pollTimeout);
+ if (pollResult < 0)
+ {
+ return SLANG_FAIL;
+ }
+
+ if (pollInfo.revents & POLLHUP)
+ {
+ close();
+ return SLANG_FAIL;
+ }
+
+ const ssize_t writeResult = ::write(m_fd, buffer, length);
+
+ if (writeResult < 0 || writeResult != length)
+ {
+ return SLANG_FAIL;
+ }
+
+ return SLANG_OK;
+}
+
+/* !!!!!!!!!!!!!!!!!!!!!! Process !!!!!!!!!!!!!!!!!!!!!!!!!!!! */
+
+/* static */UnownedStringSlice Process::getExecutableSuffix()
+{
+#if __CYGWIN__
+ return UnownedStringSlice::fromLiteral(".exe");
+#else
+ return UnownedStringSlice::fromLiteral("");
+#endif
+}
+
+/* static */StringEscapeHandler* Process::getEscapeHandler()
+{
+ return StringEscapeUtil::getHandler(StringEscapeUtil::Style::Space);
+}
+
+/* static */SlangResult Process::create(const CommandLine& commandLine, Process::Flags flags, RefPtr<Process>& outProcess)
+{
+ List<char const*> argPtrs;
+
+ // Add the command
+ argPtrs.add(commandLine.m_executable.getBuffer());
+
+ // Add all the args - they don't need any explicit escaping
+ for (auto arg : commandLine.m_args)
+ {
+ // All args for this target must be unescaped (as they are in CommandLine)
+ argPtrs.add(arg.getBuffer());
+ }
+
+ // Terminate with a null
+ argPtrs.add(nullptr);
+
+ int stdoutPipe[2];
+ int stderrPipe[2];
+ int stdinPipe[2];
+
+ if (pipe(stdoutPipe) == -1 || pipe(stderrPipe) == -1 || pipe(stdinPipe) == -1)
+ {
+ fprintf(stderr, "error: `pipe` failed\n");
+ return SLANG_FAIL;
+ }
+
+ pid_t childPid = fork();
+ if (childPid == -1)
+ {
+ fprintf(stderr, "error: `fork` failed\n");
+ return SLANG_FAIL;
+ }
+
+ if (childPid == 0)
+ {
+ // We are the child process.
+
+ // Duplicate into standard handles
+ dup2(stdoutPipe[1], STDOUT_FILENO);
+ dup2(stderrPipe[1], STDERR_FILENO);
+ dup2(stdinPipe[0], STDIN_FILENO);
+
+ // Close all of the handles
+ ::close(stdoutPipe[0]);
+ ::close(stdoutPipe[1]);
+
+ ::close(stderrPipe[0]);
+ ::close(stderrPipe[1]);
+
+ ::close(stdinPipe[0]);
+ ::close(stdinPipe[1]);
+
+ ::execvp(argPtrs[0], (char* const*)&argPtrs[0]);
+
+ // If we get here, then `exec` failed
+
+ // NOTE! Because we have dup2 into STDERR_FILENO, this error will *not* generally appear on
+ // the terminal but in the stderrPipe.
+ fprintf(stderr, "error: `exec` failed\n");
+
+ return SLANG_FAIL;
+ }
+ else
+ {
+ // We are the parent process
+ ::close(stdoutPipe[1]);
+ ::close(stderrPipe[1]);
+ ::close(stdinPipe[0]);
+
+ RefPtr<Stream> streams[Index(Process::StreamType::CountOf)];
+
+ // Previously code didn't need to close, so we'll make stream not own the handles
+ streams[Index(Process::StreamType::StdOut)] = new UnixPipeStream(stdoutPipe[0], FileAccess::Read, true);
+ streams[Index(Process::StreamType::ErrorOut)] = new UnixPipeStream(stderrPipe[0], FileAccess::Read, true);
+ streams[Index(Process::StreamType::StdIn)] = new UnixPipeStream(stdinPipe[1], FileAccess::Write, true);
+
+ outProcess = new UnixProcess(childPid, streams[0].readRef());
+ return SLANG_OK;
+ }
+}
+
+/* static */uint64_t Process::getClockFrequency()
+{
+ return 1000000000;
+}
+
+/* static */uint64_t Process::getClockTick()
+{
+ struct timespec now;
+ clock_gettime(CLOCK_MONOTONIC, &now);
+ return uint64_t(now.tv_sec) * 1000000000 + now.tv_nsec;
+}
+
+/* static */void Process::sleepCurrentThread(Index timeInMs)
+{
+ struct timespec timeSpec;
+
+ if (timeInMs > 0)
+ {
+ timeSpec.tv_sec = timeInMs / 1000;
+ timeSpec.tv_nsec = (timeInMs % 1000) * 1000 * 1000;
+ }
+ else
+ {
+ timeSpec.tv_sec = 0;
+ timeSpec.tv_nsec = 0;
+ }
+ nanosleep(&timeSpec, nullptr);
+}
+
+/* static */SlangResult Process::getStdStream(StreamType type, RefPtr<Stream>& out)
+{
+ switch (type)
+ {
+ case StreamType::StdIn:
+ {
+ out = new UnixPipeStream(STDIN_FILENO, FileAccess::Read, false);
+ break;
+ }
+ case StreamType::StdOut:
+ {
+ out = new UnixPipeStream(STDOUT_FILENO, FileAccess::Write, false);
+ break;
+ }
+ case StreamType::ErrorOut:
+ {
+ out = new UnixPipeStream(STDERR_FILENO, FileAccess::Write, false);
+ break;
+ }
+ default: return SLANG_FAIL;
+ }
+ return SLANG_OK;
+}
+
+} // namespace Slang