diff options
| author | jsmall-nvidia <jsmall@nvidia.com> | 2021-11-10 17:33:22 -0500 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-11-10 17:33:22 -0500 |
| commit | 8a9e518371df03b3f382e0fe869da83751fdda0b (patch) | |
| tree | 749f9c1c79acd375ec3ee97e45a10007dd6632fa /source/core/slang-stream.cpp | |
| parent | 95e82acc0b32c81a9c6ac39708d18a423d8c7b1e (diff) | |
Interprocess communication via pipes (#2009)
* #include an absolute path didn't work - because paths were taken to always be relative.
* Use 'Process' to communicate with an command line tool.
* Remove slang-win-stream
* Tidy up windows ProcessUtil.
* First version of BufferedReadStream.
* Windows working IPC for steams.
* Test proxy count option.
* Split Process/ProcessUtil. Process is platform dependant. ProcessUtil are functions that are platform independent.
* First implementation of Unix Process interface.
* Unix process compiles on cygwin.
* Fix typo in unix process.
* Separate unix pipe stream error of invalid access, from pipe availability.
* Fix in standard line extraction.
* Make fd non blocking.
* Fix issues with Windows Process streams.
* Added UnixPipe.
* Some fixes around UnixPipeStream.
* Make a unix stream closed explicit.
* Hack to debug linux process/stream.
* Revert to old linux pipe handling.
* Pass executable path for unit tests.
Split out CommandLine into own source.
* Small improvements in process/command line.
* Check process behavior with crash.
* Make stderr and stdout unbuffered for crash testing.
* Only turn disable buffering in crash test.
* Disable crash test on CI.
* Fix crash on clang/linux.
* Enable crash test.
Remove _appendBuffer as can use StreamUtil functionality.
Diffstat (limited to 'source/core/slang-stream.cpp')
| -rw-r--r-- | source/core/slang-stream.cpp | 276 |
1 files changed, 276 insertions, 0 deletions
diff --git a/source/core/slang-stream.cpp b/source/core/slang-stream.cpp index f938eb886..042b1d898 100644 --- a/source/core/slang-stream.cpp +++ b/source/core/slang-stream.cpp @@ -250,6 +250,16 @@ SlangResult FileStream::write(const void* buffer, size_t length) return (bytesWritten == length) ? SLANG_OK : SLANG_FAIL; } +SlangResult FileStream::flush() +{ + if (m_handle && canWrite()) + { + fflush(m_handle); + return SLANG_OK; + } + return SLANG_E_NOT_AVAILABLE; +} + bool FileStream::canRead() { return ((int)m_fileAccess & (int)FileAccess::Read) != 0; @@ -362,4 +372,270 @@ SlangResult OwnedMemoryStream::write(const void * buffer, size_t length) return SLANG_OK; } +// !!!!!!!!!!!!!!!!!!!!!!!!!!!!! BufferedReadStream !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + +void BufferedReadStream::_advanceStartIndex(Index byteCount) +{ + SLANG_ASSERT(Index(getCount()) >= byteCount && byteCount >= 0); + m_startIndex += byteCount; + if (getCount() == 0) + { + m_startIndex = 0; + } +} + +Int64 BufferedReadStream::getPosition() +{ + return m_stream ? (m_stream->getPosition() - getCount()) : 0; +} + +SlangResult BufferedReadStream::seek(SeekOrigin origin, Int64 offset) +{ + if (!m_stream) + { + return SLANG_FAIL; + } + // As it currently stands the data behind m_startIndex is the previous data. + // So we could seek backwards up to -m_startIndex. + // We don't worry about this here, for simplicity sake. + + if (origin == SeekOrigin::End || origin == SeekOrigin::Start || offset < 0 || offset >= Int64(getCount())) + { + // Empty the buffer + m_startIndex = 0; + m_buffer.setCount(0); + // Seek on underlying stream + return m_stream->seek(origin, offset); + } + + // We can just seek on the buffered data + _advanceStartIndex(Index(offset)); + return SLANG_OK; +} + +SlangResult BufferedReadStream::read(void* inBuffer, size_t length, size_t& outReadBytes) +{ + // If the buffer has no data and the read size is larger than the default read size - may as well just read directly into the output buffer + if (getCount() == 0 && length > m_defaultReadSize) + { + return m_stream->read(inBuffer, length, outReadBytes); + } + + Byte* buffer = (Byte*)inBuffer; + + size_t totalReadBytes = 0; + outReadBytes = 0; + + // Do a read to fill the buffer. + SLANG_RETURN_ON_FAIL(update()); + + while (length > 0) + { + const size_t bufferCount = size_t(getCount()); + + if (bufferCount) + { + const size_t readCount = (bufferCount < length) ? bufferCount : length; + + ::memcpy(buffer, getBuffer(), readCount); + + _advanceStartIndex(Index(readCount)); + buffer += readCount; + length -= readCount; + + totalReadBytes += readCount; + } + else + { + if (m_stream == nullptr) + { + break; + } + + // Read from underlying buffer + size_t readBytes; + SlangResult res = m_stream->read(buffer, length, readBytes); + + outReadBytes = totalReadBytes + readBytes; + return res; + } + } + + outReadBytes = totalReadBytes; + return SLANG_OK; +} + +SlangResult BufferedReadStream::write(const void* buffer, size_t length) +{ + SLANG_UNUSED(buffer); + SLANG_UNUSED(length); + + return SLANG_E_NOT_AVAILABLE; +} + +bool BufferedReadStream::canRead() +{ + return getCount() > 0 || (m_stream && m_stream->canRead()); +} + +bool BufferedReadStream::canWrite() +{ + return false; +} + +void BufferedReadStream::close() +{ + if (m_stream) + { + m_stream->close(); + m_stream.setNull(); + } +} + +bool BufferedReadStream::isEnd() +{ + return getCount() == 0 && (m_stream == nullptr || m_stream->isEnd()); +} + +SlangResult BufferedReadStream::flush() +{ + return SLANG_E_NOT_AVAILABLE; +} + +SlangResult BufferedReadStream::update() +{ + if (m_stream == nullptr) + { + // Should this return an error? + return SLANG_OK; + } + + { + // How much buffer space do we have. We need at least m_defaultReadSize + const size_t remainingCount = size_t(m_buffer.getCapacity() - m_buffer.getCount()); + + // Repeat until we have enough space + while (remainingCount < m_defaultReadSize) + { + // If there is anything in the buffer shift it all down + if (m_startIndex > 0) + { + Byte* buffer = m_buffer.getBuffer(); + const Index count = getCount(); + if (count > 0) + { + ::memmove(buffer, buffer + m_startIndex, count); + } + + m_buffer.setCount(count); + m_startIndex = 0; + } + else + { + // Make sure we have the space + const Index prevCount = m_buffer.getCount(); + m_buffer.setCount(prevCount + m_defaultReadSize); + m_buffer.setCount(prevCount); + } + } + SLANG_ASSERT(size_t(m_buffer.getCapacity() - m_buffer.getCount()) >= m_defaultReadSize); + } + + { + const Index prevCount = m_buffer.getCount(); + m_buffer.setCount(prevCount + m_defaultReadSize); + + size_t readBytes = 0; + + const SlangResult res = m_stream->read(m_buffer.getBuffer() + prevCount, m_defaultReadSize, readBytes); + + m_buffer.setCount(prevCount + Index(readBytes)); + + return res; + } +} + +// !!!!!!!!!!!!!!!!!!!!!!!!!!!!! StreamUtil !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + +/* static */SlangResult StreamUtil::readAll(Stream* stream, size_t readSize, List<Byte>& ioBytes) +{ + while (!stream->isEnd()) + { + SLANG_RETURN_ON_FAIL(read(stream, readSize, ioBytes)); + } + + return SLANG_OK; +} + +/* static */SlangResult StreamUtil::read(Stream* stream, size_t readSize, List<Byte>& ioBytes) +{ + readSize = (readSize <= 0) ? 1024 : readSize; + + while (true) + { + const Index prevCount = ioBytes.getCount(); + ioBytes.setCount(prevCount + readSize); + + size_t readBytesCount; + SLANG_RETURN_ON_FAIL(stream->read(ioBytes.getBuffer() + prevCount, readSize, readBytesCount)); + ioBytes.setCount(prevCount + Index(readBytesCount)); + + if (readBytesCount == 0) + { + return SLANG_OK; + } + } +} + +/* static */SlangResult StreamUtil::discard(Stream* stream) +{ + Byte buf[1024]; + const Index bufSize = SLANG_COUNT_OF(buf); + + while (true) + { + size_t readBytesCount; + SLANG_RETURN_ON_FAIL(stream->read(buf, bufSize, readBytesCount)); + + if (readBytesCount == 0) + { + return SLANG_OK; + } + } +} + +/* static */SlangResult StreamUtil::discardAll(Stream* stream) +{ + while (!stream->isEnd()) + { + SLANG_RETURN_ON_FAIL(discard(stream)); + } + return SLANG_OK; +} + + +/* static */SlangResult StreamUtil::readOrDiscard(Stream* stream, size_t readSize, List<Byte>* ioBytes) +{ + if (ioBytes) + { + return read(stream, readSize, *ioBytes); + } + else + { + return discard(stream); + } +} + +/* static */SlangResult StreamUtil::readOrDiscardAll(Stream* stream, size_t readSize, List<Byte>* ioBytes) +{ + if (ioBytes) + { + return readAll(stream, readSize, *ioBytes); + } + else + { + return discardAll(stream); + } +} + } // namespace Slang |
