Как объединить два экземпляра System.Io.Stream в один?
Предположим, я хочу передать три файла пользователю все подряд, но вместо того, чтобы он передал мне объект Stream
чтобы сместить байты, я должен передать ему объект Stream
он вытащит байты. Я хотел бы взять мои три объекта FileStream
(или даже умнее, IEnumerable
) и вернуть новый объект ConcatenatedStream
который по требованию будет извлекаться из исходных streamов.
- Должен ли я вызвать Close () или Dispose () для объектов streamа?
- Закрытие streamов в Java
- Не удалось прочитать InputStream из Java Process (Runtime.getRuntime (). Exec () или ProcessBuilder)
- flush в java.io.FileWriter
- Буферизованные файлы (для более быстрого доступа к диску)
- Чтение двоичных данных из std :: cin
- Как превратить String в InputStreamReader в java?
- Печать Runtime exec () OutputStream для консоли
class ConcatenatedStream : Stream { Queue streams; public ConcatenatedStream(IEnumerable streams) { this.streams = new Queue (streams); } public override bool CanRead { get { return true; } } public override int Read(byte[] buffer, int offset, int count) { if (streams.Count == 0) return 0; int bytesRead = streams.Peek().Read(buffer, offset, count); if (bytesRead == 0) { streams.Dequeue().Dispose(); bytesRead += Read(buffer, offset + bytesRead, count - bytesRead); } return bytesRead; } public override bool CanSeek { get { return false; } } public override bool CanWrite { get { return false; } } public override void Flush() { throw new NotImplementedException(); } public override long Length { get { throw new NotImplementedException(); } } public override long Position { get { throw new NotImplementedException(); } set { throw new NotImplementedException(); } } public override long Seek(long offset, SeekOrigin origin) { throw new NotImplementedException(); } public override void SetLength(long value) { throw new NotImplementedException(); } public override void Write(byte[] buffer, int offset, int count) { throw new NotImplementedException(); } }
Непроверенный, но что-то вроде:
class StreamEnumerator : Stream { private long position; bool closeStreams; IEnumerator iterator; Stream current; private void EndOfStream() { if (closeStreams && current != null) { current.Close(); current.Dispose(); } current = null; } private Stream Current { get { if(current != null) return current; if (iterator == null) throw new ObjectDisposedException(GetType().Name); if (iterator.MoveNext()) { current = iterator.Current; } return current; } } protected override void Dispose(bool disposing) { if (disposing) { EndOfStream(); iterator.Dispose(); iterator = null; current = null; } base.Dispose(disposing); } public StreamEnumerator(IEnumerable source, bool closeStreams) { if (source == null) throw new ArgumentNullException("source"); iterator = source.GetEnumerator(); this.closeStreams = closeStreams; } public override bool CanRead { get { return true; } } public override bool CanWrite { get { return false; } } public override void Write(byte[] buffer, int offset, int count) { throw new NotSupportedException(); } public override void WriteByte(byte value) { throw new NotSupportedException(); } public override bool CanSeek { get { return false; } } public override bool CanTimeout { get { return false; } } public override void SetLength(long value) { throw new NotSupportedException(); } public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); } public override void Flush() { /* nothing to do */ } public override long Length { get { throw new NotSupportedException(); } } public override long Position { get { return position; } set { if (value != this.position) throw new NotSupportedException(); } } public override int Read(byte[] buffer, int offset, int count) { int result = 0; while (count > 0) { Stream stream = Current; if (stream == null) break; int thisCount = stream.Read(buffer, offset, count); result += thisCount; count -= thisCount; offset += thisCount; if (thisCount == 0) EndOfStream(); } position += result; return result; } }
Пока вам нужно только чтение, вот моя реализация такого streamа:
ЗАМЕТКА! Положение и поиск нарушены, необходимо исправить это
using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; namespace LVK.IO { /// /// This class is a descendant that manages multiple underlying /// streams which are considered to be chained together to one large stream. Only reading /// and seeking is allowed, writing will throw exceptions. /// public class CombinedStream : Stream { private readonly Stream[] _UnderlyingStreams; private readonly Int64[] _UnderlyingStartingPositions; private Int64 _Position; private readonly Int64 _TotalLength; private int _Index; /// /// Constructs a new on top of the specified array /// of streams. /// /// /// An array of objects that will be chained together and /// considered to be one big stream. /// public CombinedStream(params Stream[] underlyingStreams) { if (underlyingStreams == null) throw new ArgumentNullException("underlyingStreams"); foreach (Stream stream in underlyingStreams) { if (stream == null) throw new ArgumentNullException("underlyingStreams[]"); if (!stream.CanRead) throw new InvalidOperationException("CanRead not true for all streams"); if (!stream.CanSeek) throw new InvalidOperationException("CanSeek not true for all streams"); } _UnderlyingStreams = new Stream[underlyingStreams.Length]; _UnderlyingStartingPositions = new Int64[underlyingStreams.Length]; Array.Copy(underlyingStreams, _UnderlyingStreams, underlyingStreams.Length); _Position = 0; _Index = 0; _UnderlyingStartingPositions[0] = 0; for (int index = 1; index < _UnderlyingStartingPositions.Length; index++) { _UnderlyingStartingPositions[index] = _UnderlyingStartingPositions[index - 1] + _UnderlyingStreams[index - 1].Length; } _TotalLength = _UnderlyingStartingPositions[_UnderlyingStartingPositions.Length - 1] + _UnderlyingStreams[_UnderlyingStreams.Length - 1].Length; } /// /// Gets a value indicating whether the current stream supports reading. /// /// /// true . /// /// /// Always true for . /// public override Boolean CanRead { get { return true; } } /// /// Gets a value indicating whether the current stream supports seeking. /// /// /// true . /// /// /// Always true for . /// public override Boolean CanSeek { get { return true; } } /// /// Gets a value indicating whether the current stream supports writing. /// /// /// false . /// /// /// Always false for . /// public override Boolean CanWrite { get { return false; } } /// /// When overridden in a derived class, clears all buffers for this stream and causes any buffered data to be written to the underlying device. /// /// An I/O error occurs. public override void Flush() { foreach (Stream stream in _UnderlyingStreams) { stream.Flush(); } } /// /// Gets the total length in bytes of the underlying streams. /// /// /// The total length of the underlying streams. /// /// /// A long value representing the total length of the underlying streams in bytes. /// /// A class derived from Stream does not support seeking. /// Methods were called after the stream was closed. public override Int64 Length { get { return _TotalLength; } } /// /// Gets or sets the position within the current stream. /// /// /// The current position within the stream. /// An I/O error occurs. /// The stream does not support seeking. /// Methods were called after the stream was closed. public override Int64 Position { get { return _Position; } set { if (value < 0 || value > _TotalLength) throw new ArgumentOutOfRangeException("Position"); _Position = value; if (value == _TotalLength) { _Index = _UnderlyingStreams.Length - 1; _Position = _UnderlyingStreams[_Index].Length; } else { while (_Index > 0 && _Position < _UnderlyingStartingPositions[_Index]) { _Index--; } while (_Index < _UnderlyingStreams.Length - 1 && _Position >= _UnderlyingStartingPositions[_Index] + _UnderlyingStreams[_Index].Length) { _Index++; } } } } /// /// Reads a sequence of bytes from the current stream and advances the position within the stream by the number of bytes read. /// /// An array of bytes. When this method returns, the buffer contains the specified byte array with the values between offset and (offset + count - 1) replaced by the bytes read from the current source. /// The zero-based byte offset in buffer at which to begin storing the data read from the current stream. /// The maximum number of bytes to be read from the current stream. /// /// The total number of bytes read into the buffer. This can be less than the number of bytes requested if that many bytes are not currently available, or zero (0) if the end of the stream has been reached. /// /// The sum of offset and count is larger than the buffer length. /// Methods were called after the stream was closed. /// The stream does not support reading. /// buffer is null. /// An I/O error occurs. /// offset or count is negative. public override int Read(Byte[] buffer, int offset, int count) { int result = 0; while (count > 0) { _UnderlyingStreams[_Index].Position = _Position - _UnderlyingStartingPositions[_Index]; int bytesRead = _UnderlyingStreams[_Index].Read(buffer, offset, count); result += bytesRead; offset += bytesRead; count -= bytesRead; _Position += bytesRead; if (count > 0) { if (_Index < _UnderlyingStreams.Length - 1) _Index++; else break; } } return result; } /// /// Sets the position within the current stream. /// /// A byte offset relative to the origin parameter. /// A value of type indicating the reference point used to obtain the new position. /// /// The new position within the current stream. /// /// An I/O error occurs. /// The stream does not support seeking, such as if the stream is constructed from a pipe or console output. /// Methods were called after the stream was closed. public override long Seek(long offset, SeekOrigin origin) { switch (origin) { case SeekOrigin.Begin: Position = offset; break; case SeekOrigin.Current: Position += offset; break; case SeekOrigin.End: Position = Length + offset; break; } return Position; } /// /// Throws since the /// class does not supports changing the length. /// /// The desired length of the current stream in bytes. /// /// does not support this operation. /// public override void SetLength(long value) { throw new NotSupportedException("The method or operation is not supported by CombinedStream."); } /// /// Throws since the /// class does not supports writing to the underlying streams. /// /// An array of bytes. This method copies count bytes from buffer to the current stream. /// The zero-based byte offset in buffer at which to begin copying bytes to the current stream. /// The number of bytes to be written to the current stream. /// /// does not support this operation. /// public override void Write(byte[] buffer, int offset, int count) { throw new NotSupportedException("The method or operation is not supported by CombinedStream."); } } }
Почему бы не использовать контейнер, который уже инкапсулирует идею нескольких файлов, например, используя ZipOutputStream
из SharpZipLib
?
Вот вариант, который поддерживает поиск, который необходим во многих ситуациях. В нем отсутствует некоторая функциональность для удаления базовых streamов и может быть немного более эффективной, если вы были готовы предположить, что базовые streamи не изменяют длину. Затем можно вычислить общую длину и смещения streamа.
public sealed class ConcatenatedStream : Stream { List streams; private long _Position { get; set; } public ConcatenatedStream(List streams) { this.streams = streams; Seek(0, SeekOrigin.Begin); } public override bool CanRead { get { return true; } } public override int Read(byte[] buffer, int offset, int count) { if (streams.Count == 0) return 0; var startStream = 0; var cumulativeCapacity = 0L; for (var i = 0; i < streams.Count; i++) { cumulativeCapacity += streams[i].Length; if (_Position < cumulativeCapacity) { startStream = i; break; } } var bytesRead = 0; var curStream = startStream; while (_Position < Length && bytesRead < count && curStream < streams.Count) { var r = streams[curStream].Read(buffer, offset + bytesRead, count - bytesRead); bytesRead += r; Seek(_Position + r, SeekOrigin.Begin); curStream++; } return bytesRead; } public override bool CanSeek { get { return true; } } public override bool CanWrite { get { return false; } } public override void Flush() { throw new NotImplementedException(); } public override long Length { get { long length = 0; for (var i = 0; i < streams.Count; i++) { length += streams[i].Length; } return length; } } public override long Position { get { return _Position; } set { Seek(value, SeekOrigin.Begin); } } public override long Seek(long offset, SeekOrigin origin) { if (origin == SeekOrigin.Begin) { _Position = offset; var prevLength = 0L; var cumulativeLength = 0L; for (var i = 0; i < streams.Count; i++) { cumulativeLength += streams[i].Length; if (offset < cumulativeLength) { streams[i].Seek(offset - prevLength, SeekOrigin.Begin); return _Position; } prevLength = cumulativeLength; } } if (origin == SeekOrigin.Current) { var newAbs = _Position + offset; return Seek(newAbs, SeekOrigin.Begin); } else if(origin == SeekOrigin.End) { var newAbs = Length - offset; return Seek(newAbs, SeekOrigin.Begin); } throw new NotImplementedException(); } public override void SetLength(long value) { throw new NotImplementedException(); } public override void Write(byte[] buffer, int offset, int count) { throw new NotImplementedException(); } }