summaryrefslogtreecommitdiffstats
path: root/source/core/slang-stream.cpp
diff options
context:
space:
mode:
authorjsmall-nvidia <jsmall@nvidia.com>2021-11-10 17:33:22 -0500
committerGitHub <noreply@github.com>2021-11-10 17:33:22 -0500
commit8a9e518371df03b3f382e0fe869da83751fdda0b (patch)
tree749f9c1c79acd375ec3ee97e45a10007dd6632fa /source/core/slang-stream.cpp
parent95e82acc0b32c81a9c6ac39708d18a423d8c7b1e (diff)
Interprocess communication via pipes (#2009)
* #include an absolute path didn't work - because paths were taken to always be relative. * Use 'Process' to communicate with an command line tool. * Remove slang-win-stream * Tidy up windows ProcessUtil. * First version of BufferedReadStream. * Windows working IPC for steams. * Test proxy count option. * Split Process/ProcessUtil. Process is platform dependant. ProcessUtil are functions that are platform independent. * First implementation of Unix Process interface. * Unix process compiles on cygwin. * Fix typo in unix process. * Separate unix pipe stream error of invalid access, from pipe availability. * Fix in standard line extraction. * Make fd non blocking. * Fix issues with Windows Process streams. * Added UnixPipe. * Some fixes around UnixPipeStream. * Make a unix stream closed explicit. * Hack to debug linux process/stream. * Revert to old linux pipe handling. * Pass executable path for unit tests. Split out CommandLine into own source. * Small improvements in process/command line. * Check process behavior with crash. * Make stderr and stdout unbuffered for crash testing. * Only turn disable buffering in crash test. * Disable crash test on CI. * Fix crash on clang/linux. * Enable crash test. Remove _appendBuffer as can use StreamUtil functionality.
Diffstat (limited to 'source/core/slang-stream.cpp')
-rw-r--r--source/core/slang-stream.cpp276
1 files changed, 276 insertions, 0 deletions
diff --git a/source/core/slang-stream.cpp b/source/core/slang-stream.cpp
index f938eb886..042b1d898 100644
--- a/source/core/slang-stream.cpp
+++ b/source/core/slang-stream.cpp
@@ -250,6 +250,16 @@ SlangResult FileStream::write(const void* buffer, size_t length)
return (bytesWritten == length) ? SLANG_OK : SLANG_FAIL;
}
+SlangResult FileStream::flush()
+{
+ if (m_handle && canWrite())
+ {
+ fflush(m_handle);
+ return SLANG_OK;
+ }
+ return SLANG_E_NOT_AVAILABLE;
+}
+
bool FileStream::canRead()
{
return ((int)m_fileAccess & (int)FileAccess::Read) != 0;
@@ -362,4 +372,270 @@ SlangResult OwnedMemoryStream::write(const void * buffer, size_t length)
return SLANG_OK;
}
+// !!!!!!!!!!!!!!!!!!!!!!!!!!!!! BufferedReadStream !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+
+void BufferedReadStream::_advanceStartIndex(Index byteCount)
+{
+ SLANG_ASSERT(Index(getCount()) >= byteCount && byteCount >= 0);
+ m_startIndex += byteCount;
+ if (getCount() == 0)
+ {
+ m_startIndex = 0;
+ }
+}
+
+Int64 BufferedReadStream::getPosition()
+{
+ return m_stream ? (m_stream->getPosition() - getCount()) : 0;
+}
+
+SlangResult BufferedReadStream::seek(SeekOrigin origin, Int64 offset)
+{
+ if (!m_stream)
+ {
+ return SLANG_FAIL;
+ }
+ // As it currently stands the data behind m_startIndex is the previous data.
+ // So we could seek backwards up to -m_startIndex.
+ // We don't worry about this here, for simplicity sake.
+
+ if (origin == SeekOrigin::End || origin == SeekOrigin::Start || offset < 0 || offset >= Int64(getCount()))
+ {
+ // Empty the buffer
+ m_startIndex = 0;
+ m_buffer.setCount(0);
+ // Seek on underlying stream
+ return m_stream->seek(origin, offset);
+ }
+
+ // We can just seek on the buffered data
+ _advanceStartIndex(Index(offset));
+ return SLANG_OK;
+}
+
+SlangResult BufferedReadStream::read(void* inBuffer, size_t length, size_t& outReadBytes)
+{
+ // If the buffer has no data and the read size is larger than the default read size - may as well just read directly into the output buffer
+ if (getCount() == 0 && length > m_defaultReadSize)
+ {
+ return m_stream->read(inBuffer, length, outReadBytes);
+ }
+
+ Byte* buffer = (Byte*)inBuffer;
+
+ size_t totalReadBytes = 0;
+ outReadBytes = 0;
+
+ // Do a read to fill the buffer.
+ SLANG_RETURN_ON_FAIL(update());
+
+ while (length > 0)
+ {
+ const size_t bufferCount = size_t(getCount());
+
+ if (bufferCount)
+ {
+ const size_t readCount = (bufferCount < length) ? bufferCount : length;
+
+ ::memcpy(buffer, getBuffer(), readCount);
+
+ _advanceStartIndex(Index(readCount));
+ buffer += readCount;
+ length -= readCount;
+
+ totalReadBytes += readCount;
+ }
+ else
+ {
+ if (m_stream == nullptr)
+ {
+ break;
+ }
+
+ // Read from underlying buffer
+ size_t readBytes;
+ SlangResult res = m_stream->read(buffer, length, readBytes);
+
+ outReadBytes = totalReadBytes + readBytes;
+ return res;
+ }
+ }
+
+ outReadBytes = totalReadBytes;
+ return SLANG_OK;
+}
+
+SlangResult BufferedReadStream::write(const void* buffer, size_t length)
+{
+ SLANG_UNUSED(buffer);
+ SLANG_UNUSED(length);
+
+ return SLANG_E_NOT_AVAILABLE;
+}
+
+bool BufferedReadStream::canRead()
+{
+ return getCount() > 0 || (m_stream && m_stream->canRead());
+}
+
+bool BufferedReadStream::canWrite()
+{
+ return false;
+}
+
+void BufferedReadStream::close()
+{
+ if (m_stream)
+ {
+ m_stream->close();
+ m_stream.setNull();
+ }
+}
+
+bool BufferedReadStream::isEnd()
+{
+ return getCount() == 0 && (m_stream == nullptr || m_stream->isEnd());
+}
+
+SlangResult BufferedReadStream::flush()
+{
+ return SLANG_E_NOT_AVAILABLE;
+}
+
+SlangResult BufferedReadStream::update()
+{
+ if (m_stream == nullptr)
+ {
+ // Should this return an error?
+ return SLANG_OK;
+ }
+
+ {
+ // How much buffer space do we have. We need at least m_defaultReadSize
+ const size_t remainingCount = size_t(m_buffer.getCapacity() - m_buffer.getCount());
+
+ // Repeat until we have enough space
+ while (remainingCount < m_defaultReadSize)
+ {
+ // If there is anything in the buffer shift it all down
+ if (m_startIndex > 0)
+ {
+ Byte* buffer = m_buffer.getBuffer();
+ const Index count = getCount();
+ if (count > 0)
+ {
+ ::memmove(buffer, buffer + m_startIndex, count);
+ }
+
+ m_buffer.setCount(count);
+ m_startIndex = 0;
+ }
+ else
+ {
+ // Make sure we have the space
+ const Index prevCount = m_buffer.getCount();
+ m_buffer.setCount(prevCount + m_defaultReadSize);
+ m_buffer.setCount(prevCount);
+ }
+ }
+ SLANG_ASSERT(size_t(m_buffer.getCapacity() - m_buffer.getCount()) >= m_defaultReadSize);
+ }
+
+ {
+ const Index prevCount = m_buffer.getCount();
+ m_buffer.setCount(prevCount + m_defaultReadSize);
+
+ size_t readBytes = 0;
+
+ const SlangResult res = m_stream->read(m_buffer.getBuffer() + prevCount, m_defaultReadSize, readBytes);
+
+ m_buffer.setCount(prevCount + Index(readBytes));
+
+ return res;
+ }
+}
+
+// !!!!!!!!!!!!!!!!!!!!!!!!!!!!! StreamUtil !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+
+/* static */SlangResult StreamUtil::readAll(Stream* stream, size_t readSize, List<Byte>& ioBytes)
+{
+ while (!stream->isEnd())
+ {
+ SLANG_RETURN_ON_FAIL(read(stream, readSize, ioBytes));
+ }
+
+ return SLANG_OK;
+}
+
+/* static */SlangResult StreamUtil::read(Stream* stream, size_t readSize, List<Byte>& ioBytes)
+{
+ readSize = (readSize <= 0) ? 1024 : readSize;
+
+ while (true)
+ {
+ const Index prevCount = ioBytes.getCount();
+ ioBytes.setCount(prevCount + readSize);
+
+ size_t readBytesCount;
+ SLANG_RETURN_ON_FAIL(stream->read(ioBytes.getBuffer() + prevCount, readSize, readBytesCount));
+ ioBytes.setCount(prevCount + Index(readBytesCount));
+
+ if (readBytesCount == 0)
+ {
+ return SLANG_OK;
+ }
+ }
+}
+
+/* static */SlangResult StreamUtil::discard(Stream* stream)
+{
+ Byte buf[1024];
+ const Index bufSize = SLANG_COUNT_OF(buf);
+
+ while (true)
+ {
+ size_t readBytesCount;
+ SLANG_RETURN_ON_FAIL(stream->read(buf, bufSize, readBytesCount));
+
+ if (readBytesCount == 0)
+ {
+ return SLANG_OK;
+ }
+ }
+}
+
+/* static */SlangResult StreamUtil::discardAll(Stream* stream)
+{
+ while (!stream->isEnd())
+ {
+ SLANG_RETURN_ON_FAIL(discard(stream));
+ }
+ return SLANG_OK;
+}
+
+
+/* static */SlangResult StreamUtil::readOrDiscard(Stream* stream, size_t readSize, List<Byte>* ioBytes)
+{
+ if (ioBytes)
+ {
+ return read(stream, readSize, *ioBytes);
+ }
+ else
+ {
+ return discard(stream);
+ }
+}
+
+/* static */SlangResult StreamUtil::readOrDiscardAll(Stream* stream, size_t readSize, List<Byte>* ioBytes)
+{
+ if (ioBytes)
+ {
+ return readAll(stream, readSize, *ioBytes);
+ }
+ else
+ {
+ return discardAll(stream);
+ }
+}
+
} // namespace Slang