From d0a70a8ab5d9ec282d466f89745da46fb3bc9b40 Mon Sep 17 00:00:00 2001 From: Adam Sitnik Date: Thu, 18 Feb 2021 10:28:26 +0100 Subject: [PATCH 01/10] introduce BufferedFileStreamStrategy --- .../System/IO/BufferedFileStreamStrategy.cs | 171 ++++++++++++++++++ 1 file changed, 171 insertions(+) create mode 100644 src/libraries/System.Private.CoreLib/src/System/IO/BufferedFileStreamStrategy.cs diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/BufferedFileStreamStrategy.cs b/src/libraries/System.Private.CoreLib/src/System/IO/BufferedFileStreamStrategy.cs new file mode 100644 index 0000000000000..bb6855711ceca --- /dev/null +++ b/src/libraries/System.Private.CoreLib/src/System/IO/BufferedFileStreamStrategy.cs @@ -0,0 +1,171 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Win32.SafeHandles; + +namespace System.IO +{ + // this type exists so we can avoid duplicating the buffering logic in every FileStreamStrategy implementation + // for simple properties that would just call the wrapped stream properties, we call strategy directly + // for everything else, we are calling BufferedStream methods that take care of all the buffering work + internal sealed class BufferedFileStreamStrategy : FileStreamStrategy + { + private readonly FileStreamStrategy _strategy; + private readonly BufferedStream _bufferedStream; + private readonly FileStream _fileStream; + + internal BufferedFileStreamStrategy(FileStream fileStream, FileStreamStrategy strategy, int bufferSize) + { + _fileStream = fileStream; + _strategy = strategy; + _bufferedStream = new BufferedStream(strategy, bufferSize, allowForZeroByteReads: true); + } + + ~BufferedFileStreamStrategy() + { + // Preserved for compatibility since FileStream has defined a + // finalizer in past releases and derived classes may depend + // on Dispose(false) call. + _fileStream.DisposeInternal(false); + } + + public override bool CanRead => _strategy.CanRead; + + public override bool CanWrite => _strategy.CanWrite; + + public override bool CanSeek => _strategy.CanSeek; + + public override long Length => _bufferedStream.Length; + + public override long Position + { + get => _bufferedStream.Position; + set => _bufferedStream.Position = value; + } + + internal override bool IsAsync => _strategy.IsAsync; + + internal override string Name => _strategy.Name; + + internal override SafeFileHandle SafeFileHandle + { + get + { + _fileStream.Flush(); + return _strategy.SafeFileHandle; + } + } + + internal override bool IsClosed => _strategy.IsClosed; + + internal override void Lock(long position, long length) => _strategy.Lock(position, length); + + internal override void Unlock(long position, long length) => _strategy.Unlock(position, length); + + public override long Seek(long offset, SeekOrigin origin) => _bufferedStream.Seek(offset, origin); + + public override void SetLength(long value) => _bufferedStream.SetLength(value); + + public override int ReadByte() => _bufferedStream.ReadByte(); + + public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) + => _bufferedStream.BeginRead(buffer, offset, count, callback, state); + + public override int EndRead(IAsyncResult asyncResult) + => _bufferedStream.EndRead(asyncResult); + + public override int Read(byte[] buffer, int offset, int count) => _bufferedStream.Read(buffer, offset, count); + + public override int Read(Span buffer) => _bufferedStream.Read(buffer); + + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + => _bufferedStream.ReadAsync(buffer, offset, count, cancellationToken); + + public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + => _bufferedStream.ReadAsync(buffer, cancellationToken); + + public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) + => _bufferedStream.BeginWrite(buffer, offset, count, callback, state); + + public override void EndWrite(IAsyncResult asyncResult) + => _bufferedStream.EndWrite(asyncResult); + + public override void WriteByte(byte value) => _bufferedStream.WriteByte(value); + + public override void Write(byte[] buffer, int offset, int count) => _bufferedStream.Write(buffer, offset, count); + + public override void Write(ReadOnlySpan buffer) => _bufferedStream.Write(buffer); + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + => _bufferedStream.WriteAsync(buffer, offset, count, cancellationToken); + + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + => _bufferedStream.WriteAsync(buffer, cancellationToken); + + public override void Flush() => _bufferedStream.Flush(); + + internal override void Flush(bool flushToDisk) => _bufferedStream.Flush(flushToDisk); + + public override Task FlushAsync(CancellationToken cancellationToken) + { + // don't try to Flush if the buffer was not used + // added to keep *NothingToFlush_CompletesSynchronously tests green + if (_bufferedStream.HasSomeDataInTheBuffer) + { + return _bufferedStream.FlushAsync(cancellationToken); + } + + return Task.CompletedTask; + } + + public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) + => _bufferedStream.CopyToAsync(destination, bufferSize, cancellationToken); + + public override ValueTask DisposeAsync() + { + if (_bufferedStream.HasSomeDataInTheBuffer) + { + return _bufferedStream.DisposeAsync(); + } + return _strategy.DisposeAsync(); + } + + internal override void DisposeInternal(bool disposing) + { + try + { + // the finalizer must at least try to flush the write buffer + // so we enforce it by passing always true + _bufferedStream.DisposeInternal(true); + } + catch (Exception e) when (!disposing && IsIoRelatedException(e)) + { + // On finalization, ignore failures from trying to flush the write buffer, + // e.g. if this stream is wrapping a pipe and the pipe is now broken. + } + + if (disposing) + { + GC.SuppressFinalize(this); + } + } + + private static bool IsIoRelatedException(Exception e) => + // These all derive from IOException + // DirectoryNotFoundException + // DriveNotFoundException + // EndOfStreamException + // FileLoadException + // FileNotFoundException + // PathTooLongException + // PipeException + e is IOException || + // Note that SecurityException is only thrown on runtimes that support CAS + // e is SecurityException || + e is UnauthorizedAccessException || + e is NotSupportedException || + (e is ArgumentException && !(e is ArgumentNullException)); + } +} From 0b463473f842c6541fb1b40d7d582ea8c2d66516 Mon Sep 17 00:00:00 2001 From: Adam Sitnik Date: Thu, 18 Feb 2021 10:29:02 +0100 Subject: [PATCH 02/10] remove finalizer from FileStream --- .../System/IO/DerivedFileStreamStrategy.cs | 34 +++++++++++++++-- .../src/System/IO/FileStream.cs | 37 +++++-------------- .../src/System/IO/FileStreamStrategy.cs | 4 -- .../src/System/IO/FileStreamStrategyBase.cs | 11 +++++- 4 files changed, 50 insertions(+), 36 deletions(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/DerivedFileStreamStrategy.cs b/src/libraries/System.Private.CoreLib/src/System/IO/DerivedFileStreamStrategy.cs index 76d4441309f7b..e7dd824305f49 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/DerivedFileStreamStrategy.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/DerivedFileStreamStrategy.cs @@ -15,8 +15,21 @@ namespace System.IO internal sealed class DerivedFileStreamStrategy : FileStreamStrategy { private readonly FileStreamStrategy _strategy; + private readonly FileStream _fileStream; - internal DerivedFileStreamStrategy(FileStream fileStream, FileStreamStrategy strategy) : base(fileStream) => _strategy = strategy; + internal DerivedFileStreamStrategy(FileStream fileStream, FileStreamStrategy strategy) + { + _fileStream = fileStream; + _strategy = strategy; + } + + ~DerivedFileStreamStrategy() + { + // Preserved for compatibility since FileStream has defined a + // finalizer in past releases and derived classes may depend + // on Dispose(false) call. + _fileStream.DisposeInternal(false); + } public override bool CanRead => _strategy.CanRead; @@ -36,7 +49,14 @@ public override long Position internal override string Name => _strategy.Name; - internal override SafeFileHandle SafeFileHandle => _strategy.SafeFileHandle; + internal override SafeFileHandle SafeFileHandle + { + get + { + _fileStream.Flush(false); + return _strategy.SafeFileHandle; + } + } internal override bool IsClosed => _strategy.IsClosed; @@ -136,6 +156,14 @@ public override Task CopyToAsync(Stream destination, int bufferSize, Cancellatio public override ValueTask DisposeAsync() => _fileStream.BaseDisposeAsync(); - internal override void DisposeInternal(bool disposing) => _strategy.DisposeInternal(disposing); + internal override void DisposeInternal(bool disposing) + { + _strategy.DisposeInternal(disposing); + + if (disposing) + { + GC.SuppressFinalize(this); + } + } } } diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/FileStream.cs b/src/libraries/System.Private.CoreLib/src/System/IO/FileStream.cs index 91449fc854d9f..7593e7b96c778 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/FileStream.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/FileStream.cs @@ -218,7 +218,10 @@ public override Task FlushAsync(CancellationToken cancellationToken) public override int Read(byte[] buffer, int offset, int count) { - ValidateReadWriteArgs(buffer, offset, count); + // to avoid duplicating the call to ValidateBufferArguments with BufferedStream, + // each strategy is supposed to validate the input of this method on their own + if (_strategy.IsClosed) + throw Error.GetFileNotOpen(); return _strategy.Read(buffer, offset, count); } @@ -255,7 +258,10 @@ public override ValueTask ReadAsync(Memory buffer, CancellationToken public override void Write(byte[] buffer, int offset, int count) { - ValidateReadWriteArgs(buffer, offset, count); + // to avoid duplicating the call to ValidateBufferArguments with BufferedStream, + // each strategy is supposed to validate the input of this method on their own + if (_strategy.IsClosed) + throw Error.GetFileNotOpen(); _strategy.Write(buffer, offset, count); } @@ -316,17 +322,6 @@ public virtual void Flush(bool flushToDisk) /// Gets a value indicating whether the current stream supports writing. public override bool CanWrite => _strategy.CanWrite; - /// Validates arguments to Read and Write and throws resulting exceptions. - /// The buffer to read from or write to. - /// The zero-based offset into the buffer. - /// The maximum number of bytes to read or write. - private void ValidateReadWriteArgs(byte[] buffer, int offset, int count) - { - ValidateBufferArguments(buffer, offset, count); - if (_strategy.IsClosed) - throw Error.GetFileNotOpen(); - } - /// Sets the length of this stream to the given value. /// The new length of the stream. public override void SetLength(long value) @@ -397,21 +392,9 @@ public override long Position /// The byte to write to the stream. public override void WriteByte(byte value) => _strategy.WriteByte(value); - ~FileStream() - { - // Preserved for compatibility since FileStream has defined a - // finalizer in past releases and derived classes may depend - // on Dispose(false) call. - Dispose(false); - } + internal void DisposeInternal(bool disposing) => Dispose(disposing); - protected override void Dispose(bool disposing) - { - if (_strategy != null) // possible in finalizer - { - _strategy.DisposeInternal(disposing); - } - } + protected override void Dispose(bool disposing) => _strategy.DisposeInternal(disposing); public override ValueTask DisposeAsync() => _strategy.DisposeAsync(); diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/FileStreamStrategy.cs b/src/libraries/System.Private.CoreLib/src/System/IO/FileStreamStrategy.cs index f4657da1495b0..b7067e45bf9e5 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/FileStreamStrategy.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/FileStreamStrategy.cs @@ -7,10 +7,6 @@ namespace System.IO { internal abstract class FileStreamStrategy : Stream { - protected readonly FileStream _fileStream; - - protected FileStreamStrategy(FileStream fileStream) => _fileStream = fileStream; - internal abstract bool IsAsync { get; } internal abstract string Name { get; } diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/FileStreamStrategyBase.cs b/src/libraries/System.Private.CoreLib/src/System/IO/FileStreamStrategyBase.cs index 78858a231158e..11d7852a93911 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/FileStreamStrategyBase.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/FileStreamStrategyBase.cs @@ -18,6 +18,7 @@ internal abstract class FileStreamStrategyBase : FileStreamStrategy protected byte[]? _buffer; protected readonly int _bufferLength; protected readonly SafeFileHandle _fileHandle; // only ever null if ctor throws + protected readonly FileStream _fileStream; /// Whether the file is opened for reading, writing, or both. protected readonly FileAccess _access; @@ -64,12 +65,13 @@ internal abstract class FileStreamStrategyBase : FileStreamStrategy /// Whether the file stream's handle has been exposed. protected bool _exposedHandle; - protected FileStreamStrategyBase(FileStream fileStream, SafeFileHandle handle, FileAccess access, int bufferSize, bool isAsync) : base(fileStream) + protected FileStreamStrategyBase(FileStream fileStream, SafeFileHandle handle, FileAccess access, int bufferSize, bool isAsync) { _exposedHandle = true; _bufferLength = bufferSize; InitFromHandle(handle, access, isAsync); + _fileStream = fileStream; // Note: Cleaner to set the following fields in ValidateAndInitFromHandle, // but we can't as they're readonly. @@ -81,7 +83,7 @@ protected FileStreamStrategyBase(FileStream fileStream, SafeFileHandle handle, F _fileHandle = handle; } - protected FileStreamStrategyBase(FileStream fileStream, string path, FileMode mode, FileAccess access, FileShare share, int bufferSize, FileOptions options) : base(fileStream) + protected FileStreamStrategyBase(FileStream fileStream, string path, FileMode mode, FileAccess access, FileShare share, int bufferSize, FileOptions options) { string fullPath = Path.GetFullPath(path); @@ -93,6 +95,7 @@ protected FileStreamStrategyBase(FileStream fileStream, string path, FileMode mo _useAsyncIO = true; _fileHandle = FileStreamHelpers.OpenHandle(fullPath, mode, access, share, options); + _fileStream = fileStream; try { @@ -167,6 +170,8 @@ public override Task FlushAsync(CancellationToken cancellationToken) public override int Read(byte[] buffer, int offset, int count) { + ValidateBufferArguments(buffer, offset, count); + return _useAsyncIO ? ReadAsyncTask(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult() : ReadSpan(new Span(buffer, offset, count)); @@ -242,6 +247,8 @@ private Task ReadAsyncTask(byte[] buffer, int offset, int count, Cancellati public override void Write(byte[] buffer, int offset, int count) { + ValidateBufferArguments(buffer, offset, count); + if (_useAsyncIO) { WriteAsyncInternal(new ReadOnlyMemory(buffer, offset, count), CancellationToken.None).AsTask().GetAwaiter().GetResult(); From d9628274c3926be535bd9d647466c5235c92c662 Mon Sep 17 00:00:00 2001 From: Adam Sitnik Date: Thu, 18 Feb 2021 10:31:51 +0100 Subject: [PATCH 03/10] implement WindowsFileStreamStrategy that has all the logic that is common to Async and Sync Windows File Stream Strategies --- .../System/IO/WindowsFileStreamStrategy.cs | 1522 +++-------------- 1 file changed, 228 insertions(+), 1294 deletions(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/WindowsFileStreamStrategy.cs b/src/libraries/System.Private.CoreLib/src/System/IO/WindowsFileStreamStrategy.cs index 53b504433921e..9ccb7259fea24 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/WindowsFileStreamStrategy.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/WindowsFileStreamStrategy.cs @@ -39,513 +39,179 @@ namespace System.IO { - internal sealed partial class WindowsFileStreamStrategy : FileStreamStrategyBase + internal abstract class WindowsFileStreamStrategy : FileStreamStrategy { - private bool _canSeek; - private bool _isPipe; // Whether to disable async buffering code. - private long _appendStart; // When appending, prevent overwriting file. + // Error codes (not HRESULTS), from winerror.h + internal const int ERROR_BROKEN_PIPE = 109; + internal const int ERROR_NO_DATA = 232; + protected const int ERROR_HANDLE_EOF = 38; + protected const int ERROR_INVALID_PARAMETER = 87; + protected const int ERROR_IO_PENDING = 997; - private static readonly unsafe IOCompletionCallback s_ioCallback = FileStreamCompletionSource.IOCallback; + protected readonly SafeFileHandle _fileHandle; // only ever null if ctor throws - private Task _activeBufferOperation = Task.CompletedTask; // tracks in-progress async ops using the buffer - private PreAllocatedOverlapped? _preallocatedOverlapped; // optimization for async ops to avoid per-op allocations - private FileStreamCompletionSource? _currentOverlappedOwner; // async op currently using the preallocated overlapped + /// Whether the file is opened for reading, writing, or both. + private readonly FileAccess _access; - internal WindowsFileStreamStrategy(FileStream fileStream, SafeFileHandle handle, FileAccess access, int bufferSize, bool isAsync) - : base(fileStream, handle, access, bufferSize, isAsync) - { - } + /// The path to the opened file. + protected readonly string? _path; - internal WindowsFileStreamStrategy(FileStream fileStream, string path, FileMode mode, FileAccess access, FileShare share, int bufferSize, FileOptions options) - : base(fileStream, path, mode, access, share, bufferSize, options) - { - } + protected long _filePosition; - protected override void Init(FileMode mode, FileShare share, string originalPath, FileOptions options) - { - if (!PathInternal.IsExtended(originalPath)) - { - // To help avoid stumbling into opening COM/LPT ports by accident, we will block on non file handles unless - // we were explicitly passed a path that has \\?\. GetFullPath() will turn paths like C:\foo\con.txt into - // \\.\CON, so we'll only allow the \\?\ syntax. + private readonly bool _canSeek; + private readonly bool _isPipe; // Whether to disable async buffering code. - int fileType = Interop.Kernel32.GetFileType(_fileHandle); - if (fileType != Interop.Kernel32.FileTypes.FILE_TYPE_DISK) - { - int errorCode = fileType == Interop.Kernel32.FileTypes.FILE_TYPE_UNKNOWN - ? Marshal.GetLastWin32Error() - : Interop.Errors.ERROR_SUCCESS; + /// Whether the file stream's handle has been exposed. + protected bool _exposedHandle; - _fileHandle.Dispose(); + private long _appendStart; // When appending, prevent overwriting file. - if (errorCode != Interop.Errors.ERROR_SUCCESS) - { - throw Win32Marshal.GetExceptionForWin32Error(errorCode); - } - throw new NotSupportedException(SR.NotSupported_FileStreamOnNonFiles); - } - } + internal WindowsFileStreamStrategy(SafeFileHandle handle, FileAccess access) + { + _exposedHandle = true; - // This is necessary for async IO using IO Completion ports via our - // managed Threadpool API's. This (theoretically) calls the OS's - // BindIoCompletionCallback method, and passes in a stub for the - // LPOVERLAPPED_COMPLETION_ROUTINE. This stub looks at the Overlapped - // struct for this request and gets a delegate to a managed callback - // from there, which it then calls on a threadpool thread. (We allocate - // our native OVERLAPPED structs 2 pointers too large and store EE state - // & GC handles there, one to an IAsyncResult, the other to a delegate.) - if (_useAsyncIO) - { - try - { - _fileHandle.ThreadPoolBinding = ThreadPoolBoundHandle.BindHandle(_fileHandle); - } - catch (ArgumentException ex) - { - throw new IOException(SR.IO_BindHandleFailed, ex); - } - finally - { - if (_fileHandle.ThreadPoolBinding == null) - { - // We should close the handle so that the handle is not open until SafeFileHandle GC - Debug.Assert(!_exposedHandle, "Are we closing handle that we exposed/not own, how?"); - _fileHandle.Dispose(); - } - } - } + InitFromHandle(handle, access, out _canSeek, out _isPipe); - _canSeek = true; + // Note: Cleaner to set the following fields in ValidateAndInitFromHandle, + // but we can't as they're readonly. + _access = access; - // For Append mode... - if (mode == FileMode.Append) - { - _appendStart = SeekCore(_fileHandle, 0, SeekOrigin.End); - } - else - { - _appendStart = -1; - } + // As the handle was passed in, we must set the handle field at the very end to + // avoid the finalizer closing the handle when we throw errors. + _fileHandle = handle; } - protected override void InitFromHandle(SafeFileHandle handle, FileAccess access, bool useAsyncIO) + internal WindowsFileStreamStrategy(string path, FileMode mode, FileAccess access, FileShare share, FileOptions options) { -#if DEBUG - bool hadBinding = handle.ThreadPoolBinding != null; + string fullPath = Path.GetFullPath(path); + + _path = fullPath; + _access = access; + + _fileHandle = FileStreamHelpers.OpenHandle(fullPath, mode, access, share, options); try { -#endif - InitFromHandleImpl(handle, useAsyncIO); -#if DEBUG + _canSeek = true; + + Init(mode, path); } catch { - Debug.Assert(hadBinding || handle.ThreadPoolBinding == null, "We should never error out with a ThreadPoolBinding we've added"); + // If anything goes wrong while setting up the stream, make sure we deterministically dispose + // of the opened handle. + _fileHandle.Dispose(); + _fileHandle = null!; throw; } -#endif } - private void InitFromHandleImpl(SafeFileHandle handle, bool useAsyncIO) - { - int handleType = Interop.Kernel32.GetFileType(handle); - Debug.Assert(handleType == Interop.Kernel32.FileTypes.FILE_TYPE_DISK || handleType == Interop.Kernel32.FileTypes.FILE_TYPE_PIPE || handleType == Interop.Kernel32.FileTypes.FILE_TYPE_CHAR, "FileStream was passed an unknown file type!"); - - _canSeek = handleType == Interop.Kernel32.FileTypes.FILE_TYPE_DISK; - _isPipe = handleType == Interop.Kernel32.FileTypes.FILE_TYPE_PIPE; - - // This is necessary for async IO using IO Completion ports via our - // managed Threadpool API's. This calls the OS's - // BindIoCompletionCallback method, and passes in a stub for the - // LPOVERLAPPED_COMPLETION_ROUTINE. This stub looks at the Overlapped - // struct for this request and gets a delegate to a managed callback - // from there, which it then calls on a threadpool thread. (We allocate - // our native OVERLAPPED structs 2 pointers too large and store EE - // state & a handle to a delegate there.) - // - // If, however, we've already bound this file handle to our completion port, - // don't try to bind it again because it will fail. A handle can only be - // bound to a single completion port at a time. - if (useAsyncIO && !(handle.IsAsync ?? false)) - { - try - { - handle.ThreadPoolBinding = ThreadPoolBoundHandle.BindHandle(handle); - } - catch (Exception ex) - { - // If you passed in a synchronous handle and told us to use - // it asynchronously, throw here. - throw new ArgumentException(SR.Arg_HandleNotAsync, nameof(handle), ex); - } - } - else if (!useAsyncIO) - { - FileStreamHelpers.VerifyHandleIsSync(handle); - } - - if (_canSeek) - SeekCore(handle, 0, SeekOrigin.Current); - else - _filePosition = 0; - } + public sealed override bool CanSeek => _canSeek; - private bool HasActiveBufferOperation => !_activeBufferOperation.IsCompleted; + public sealed override bool CanRead => !_fileHandle.IsClosed && (_access & FileAccess.Read) != 0; - public override bool CanSeek => _canSeek; + public sealed override bool CanWrite => !_fileHandle.IsClosed && (_access & FileAccess.Write) != 0; - public unsafe override long Length + public unsafe sealed override long Length { get { Interop.Kernel32.FILE_STANDARD_INFO info; if (!Interop.Kernel32.GetFileInformationByHandleEx(_fileHandle, Interop.Kernel32.FileStandardInfo, &info, (uint)sizeof(Interop.Kernel32.FILE_STANDARD_INFO))) - throw Win32Marshal.GetExceptionForLastWin32Error(_path); - long len = info.EndOfFile; - - // If we're writing near the end of the file, we must include our - // internal buffer in our Length calculation. Don't flush because - // we use the length of the file in our async write method. - if (_writePos > 0 && _filePosition + _writePos > len) - len = _writePos + _filePosition; - - return len; - } - } - - protected override void Dispose(bool disposing) - { - // Nothing will be done differently based on whether we are - // disposing vs. finalizing. This is taking advantage of the - // weak ordering between normal finalizable objects & critical - // finalizable objects, which I included in the SafeHandle - // design for Win32FileStream, which would often "just work" when - // finalized. - try - { - if (_fileHandle != null && !_fileHandle.IsClosed && _writePos > 0) - { - // Flush data to disk iff we were writing. After - // thinking about this, we also don't need to flush - // our read position, regardless of whether the handle - // was exposed to the user. They probably would NOT - // want us to do this. - try - { - FlushWriteBuffer(!disposing); - } - catch (Exception e) when (IsIoRelatedException(e) && !disposing) - { - // On finalization, ignore failures from trying to flush the write buffer, - // e.g. if this stream is wrapping a pipe and the pipe is now broken. - } - } - } - finally - { - if (_fileHandle != null && !_fileHandle.IsClosed) { - _fileHandle.ThreadPoolBinding?.Dispose(); - _fileHandle.Dispose(); + throw Win32Marshal.GetExceptionForLastWin32Error(_path); } - _preallocatedOverlapped?.Dispose(); - _canSeek = false; - - // Don't set the buffer to null, to avoid a NullReferenceException - // when users have a race condition in their code (i.e. they call - // Close when calling another method on Stream like Read). + return info.EndOfFile; } } - public override async ValueTask DisposeAsync() + /// Gets or sets the position within the current stream + public override long Position { - // Same logic as in Dispose(), except with async counterparts. - // TODO: https://github.com/dotnet/runtime/issues/27643: FlushAsync does synchronous work. - try - { - if (_fileHandle != null && !_fileHandle.IsClosed && _writePos > 0) - { - await FlushAsync(default).ConfigureAwait(false); - } - } - finally + get { - if (_fileHandle != null && !_fileHandle.IsClosed) - { - _fileHandle.ThreadPoolBinding?.Dispose(); - _fileHandle.Dispose(); - } + VerifyOSHandlePosition(); - _preallocatedOverlapped?.Dispose(); - _canSeek = false; - GC.SuppressFinalize(this); // the handle is closed; nothing further for the finalizer to do + return _filePosition; } - } - - protected override void FlushOSBuffer() - { - if (!Interop.Kernel32.FlushFileBuffers(_fileHandle)) + set { - throw Win32Marshal.GetExceptionForLastWin32Error(_path); + Seek(value, SeekOrigin.Begin); } } - // Returns a task that flushes the internal write buffer - private Task FlushWriteAsync(CancellationToken cancellationToken) - { - Debug.Assert(_useAsyncIO); - Debug.Assert(_readPos == 0 && _readLength == 0, "FileStream: Read buffer must be empty in FlushWriteAsync!"); - - // If the buffer is already flushed, don't spin up the OS write - if (_writePos == 0) return Task.CompletedTask; - - Task flushTask = WriteAsyncInternalCore(new ReadOnlyMemory(GetBuffer(), 0, _writePos), cancellationToken); - _writePos = 0; + internal sealed override string Name => _path ?? SR.IO_UnknownFileName; - // Update the active buffer operation - _activeBufferOperation = HasActiveBufferOperation ? - Task.WhenAll(_activeBufferOperation, flushTask) : - flushTask; + internal sealed override bool IsClosed => _fileHandle.IsClosed; - return flushTask; - } - - protected override void FlushWriteBufferForWriteByte() => FlushWriteBuffer(); - - // Writes are buffered. Anytime the buffer fills up - // (_writePos + delta > _bufferSize) or the buffer switches to reading - // and there is left over data (_writePos > 0), this function must be called. - protected override void FlushWriteBuffer(bool calledFromFinalizer = false) + internal sealed override SafeFileHandle SafeFileHandle { - if (_writePos == 0) return; - Debug.Assert(_readPos == 0 && _readLength == 0, "FileStream: Read buffer must be empty in FlushWrite!"); - - if (_useAsyncIO) - { - Task writeTask = FlushWriteAsync(CancellationToken.None); - // With our Whidbey async IO & overlapped support for AD unloads, - // we don't strictly need to block here to release resources - // since that support takes care of the pinning & freeing the - // overlapped struct. We need to do this when called from - // Close so that the handle is closed when Close returns, but - // we don't need to call EndWrite from the finalizer. - // Additionally, if we do call EndWrite, we block forever - // because AD unloads prevent us from running the managed - // callback from the IO completion port. Blocking here when - // called from the finalizer during AD unload is clearly wrong, - // but we can't use any sort of test for whether the AD is - // unloading because if we weren't unloading, an AD unload - // could happen on a separate thread before we call EndWrite. - if (!calledFromFinalizer) - { - writeTask.GetAwaiter().GetResult(); - } - } - else + get { - WriteCore(new ReadOnlySpan(GetBuffer(), 0, _writePos)); + // Flushing is the responsibility of BufferedFileStreamStrategy + _exposedHandle = true; + return _fileHandle; } - - _writePos = 0; } - public override void SetLength(long value) + // this method just disposes everything as there is no buffer here + // and we don't really need to Flush anything here + public override ValueTask DisposeAsync() { - // Handle buffering updates. - if (_writePos > 0) - { - FlushWriteBuffer(); - } - else if (_readPos < _readLength) + if (_fileHandle != null && !_fileHandle.IsClosed) { - FlushReadBuffer(); + _fileHandle.ThreadPoolBinding?.Dispose(); + _fileHandle.Dispose(); } - _readPos = 0; - _readLength = 0; - - if (_appendStart != -1 && value < _appendStart) - throw new IOException(SR.IO_SetLengthAppendTruncate); - SetLengthCore(value); - } - - // We absolutely need this method broken out so that WriteInternalCoreAsync can call - // a method without having to go through buffering code that might call FlushWrite. - private unsafe void SetLengthCore(long value) - { - Debug.Assert(value >= 0, "value >= 0"); - VerifyOSHandlePosition(); - - var eofInfo = new Interop.Kernel32.FILE_END_OF_FILE_INFO - { - EndOfFile = value - }; - if (!Interop.Kernel32.SetFileInformationByHandle( - _fileHandle, - Interop.Kernel32.FileEndOfFileInfo, - &eofInfo, - (uint)sizeof(Interop.Kernel32.FILE_END_OF_FILE_INFO))) - { - int errorCode = Marshal.GetLastWin32Error(); - if (errorCode == Interop.Errors.ERROR_INVALID_PARAMETER) - throw new ArgumentOutOfRangeException(nameof(value), SR.ArgumentOutOfRange_FileLengthTooBig); - throw Win32Marshal.GetExceptionForWin32Error(errorCode, _path); - } + GC.SuppressFinalize(this); // the handle is closed; nothing further for the finalizer to do - if (_filePosition > value) - { - SeekCore(_fileHandle, 0, SeekOrigin.End); - } + return ValueTask.CompletedTask; } - // Instance method to help code external to this MarshalByRefObject avoid - // accessing its fields by ref. This avoids a compiler warning. - private FileStreamCompletionSource? CompareExchangeCurrentOverlappedOwner(FileStreamCompletionSource? newSource, FileStreamCompletionSource? existingSource) => - Interlocked.CompareExchange(ref _currentOverlappedOwner, newSource, existingSource); + // this method in the future will be called in no-buffering scenarios + internal sealed override void DisposeInternal(bool disposing) => Dispose(disposing); - protected override int ReadSpan(Span destination) + // this method is called from BufferedStream.Dispose so the content is already flushed + protected override void Dispose(bool disposing) { - Debug.Assert(!_useAsyncIO, "Must only be used when in synchronous mode"); - Debug.Assert((_readPos == 0 && _readLength == 0 && _writePos >= 0) || (_writePos == 0 && _readPos <= _readLength), - "We're either reading or writing, but not both."); - - bool isBlocked = false; - int n = _readLength - _readPos; - // if the read buffer is empty, read into either user's array or our - // buffer, depending on number of bytes user asked for and buffer size. - if (n == 0) - { - if (!CanRead) throw Error.GetReadNotSupported(); - if (_writePos > 0) FlushWriteBuffer(); - if (!CanSeek || (destination.Length >= _bufferLength)) - { - n = ReadNative(destination); - // Throw away read buffer. - _readPos = 0; - _readLength = 0; - return n; - } - n = ReadNative(GetBuffer()); - if (n == 0) return 0; - isBlocked = n < _bufferLength; - _readPos = 0; - _readLength = n; - } - // Now copy min of count or numBytesAvailable (i.e. near EOF) to array. - if (n > destination.Length) n = destination.Length; - new ReadOnlySpan(GetBuffer(), _readPos, n).CopyTo(destination); - _readPos += n; - - // We may have read less than the number of bytes the user asked - // for, but that is part of the Stream contract. Reading again for - // more data may cause us to block if we're using a device with - // no clear end of file, such as a serial port or pipe. If we - // blocked here & this code was used with redirected pipes for a - // process's standard output, this can lead to deadlocks involving - // two processes. But leave this here for files to avoid what would - // probably be a breaking change. -- - - // If we are reading from a device with no clear EOF like a - // serial port or a pipe, this will cause us to block incorrectly. - if (!_isPipe) + if (_fileHandle != null && !_fileHandle.IsClosed) { - // If we hit the end of the buffer and didn't have enough bytes, we must - // read some more from the underlying stream. However, if we got - // fewer bytes from the underlying stream than we asked for (i.e. we're - // probably blocked), don't ask for more bytes. - if (n < destination.Length && !isBlocked) - { - Debug.Assert(_readPos == _readLength, "Read buffer should be empty!"); - int moreBytesRead = ReadNative(destination.Slice(n)); - n += moreBytesRead; - // We've just made our buffer inconsistent with our position - // pointer. We must throw away the read buffer. - _readPos = 0; - _readLength = 0; - } + _fileHandle.ThreadPoolBinding?.Dispose(); + _fileHandle.Dispose(); } - return n; - } - - [Conditional("DEBUG")] - private void AssertCanRead() - { - Debug.Assert(!_fileHandle.IsClosed, "!_fileHandle.IsClosed"); - Debug.Assert(CanRead, "CanRead"); + // Don't set the buffer to null, to avoid a NullReferenceException + // when users have a race condition in their code (i.e. they call + // Close when calling another method on Stream like Read). } - /// Reads from the file handle into the buffer, overwriting anything in it. - protected override int FillReadBufferForReadByte() => - _useAsyncIO ? - ReadNativeAsync(new Memory(_buffer), 0, CancellationToken.None).GetAwaiter().GetResult() : - ReadNative(_buffer); + public sealed override void Flush() => Flush(flushToDisk: false); // we have nothing to flush as there is no buffer here - private unsafe int ReadNative(Span buffer) + internal sealed override void Flush(bool flushToDisk) { - Debug.Assert(!_useAsyncIO, $"{nameof(ReadNative)} doesn't work on asynchronous file streams."); - AssertCanRead(); - - // Make sure we are reading from the right spot - VerifyOSHandlePosition(); - - int r = ReadFileNative(_fileHandle, buffer, null, out int errorCode); - - if (r == -1) + if (flushToDisk && CanWrite) { - // For pipes, ERROR_BROKEN_PIPE is the normal end of the pipe. - if (errorCode == ERROR_BROKEN_PIPE) + if (!Interop.Kernel32.FlushFileBuffers(_fileHandle)) { - r = 0; - } - else - { - if (errorCode == ERROR_INVALID_PARAMETER) - throw new ArgumentException(SR.Arg_HandleNotSync, "_fileHandle"); - - throw Win32Marshal.GetExceptionForWin32Error(errorCode, _path); + throw Win32Marshal.GetExceptionForLastWin32Error(_path); } } - Debug.Assert(r >= 0, "FileStream's ReadNative is likely broken."); - _filePosition += r; - - return r; } - public override long Seek(long offset, SeekOrigin origin) + public sealed override long Seek(long offset, SeekOrigin origin) { if (origin < SeekOrigin.Begin || origin > SeekOrigin.End) throw new ArgumentException(SR.Argument_InvalidSeekOrigin, nameof(origin)); if (_fileHandle.IsClosed) throw Error.GetFileNotOpen(); if (!CanSeek) throw Error.GetSeekNotSupported(); - Debug.Assert((_readPos == 0 && _readLength == 0 && _writePos >= 0) || (_writePos == 0 && _readPos <= _readLength), "We're either reading or writing, but not both."); - - // If we've got bytes in our buffer to write, write them out. - // If we've read in and consumed some bytes, we'll have to adjust - // our seek positions ONLY IF we're seeking relative to the current - // position in the stream. This simulates doing a seek to the new - // position, then a read for the number of bytes we have in our buffer. - if (_writePos > 0) - { - FlushWriteBuffer(); - } - else if (origin == SeekOrigin.Current) - { - // Don't call FlushRead here, which would have caused an infinite - // loop. Simply adjust the seek origin. This isn't necessary - // if we're seeking relative to the beginning or end of the stream. - offset -= (_readLength - _readPos); - } - _readPos = _readLength = 0; - // Verify that internal position is in sync with the handle VerifyOSHandlePosition(); - long oldPos = _filePosition + (_readPos - _readLength); + long oldPos = _filePosition; long pos = SeekCore(_fileHandle, offset, origin); // Prevent users from overwriting data in a file that was opened in @@ -556,52 +222,13 @@ public override long Seek(long offset, SeekOrigin origin) throw new IOException(SR.IO_SeekAppendOverwrite); } - // We now must update the read buffer. We can in some cases simply - // update _readPos within the buffer, copy around the buffer so our - // Position property is still correct, and avoid having to do more - // reads from the disk. Otherwise, discard the buffer's contents. - if (_readLength > 0) - { - // We can optimize the following condition: - // oldPos - _readPos <= pos < oldPos + _readLen - _readPos - if (oldPos == pos) - { - if (_readPos > 0) - { - Buffer.BlockCopy(GetBuffer(), _readPos, GetBuffer(), 0, _readLength - _readPos); - _readLength -= _readPos; - _readPos = 0; - } - // If we still have buffered data, we must update the stream's - // position so our Position property is correct. - if (_readLength > 0) - SeekCore(_fileHandle, _readLength, SeekOrigin.Current); - } - else if (oldPos - _readPos < pos && pos < oldPos + _readLength - _readPos) - { - int diff = (int)(pos - oldPos); - Buffer.BlockCopy(GetBuffer(), _readPos + diff, GetBuffer(), 0, _readLength - (_readPos + diff)); - _readLength -= (_readPos + diff); - _readPos = 0; - if (_readLength > 0) - SeekCore(_fileHandle, _readLength, SeekOrigin.Current); - } - else - { - // Lose the read buffer. - _readPos = 0; - _readLength = 0; - } - Debug.Assert(_readLength >= 0 && _readPos <= _readLength, "_readLen should be nonnegative, and _readPos should be less than or equal _readLen"); - Debug.Assert(pos == Position, "Seek optimization: pos != Position! Buffer math was mangled."); - } return pos; } // This doesn't do argument checking. Necessary for SetLength, which must // set the file pointer beyond the end of the file. This will update the // internal position - protected override long SeekCore(SafeFileHandle fileHandle, long offset, SeekOrigin origin, bool closeInvalidHandle = false) + protected long SeekCore(SafeFileHandle fileHandle, long offset, SeekOrigin origin, bool closeInvalidHandle = false) { Debug.Assert(!fileHandle.IsClosed && _canSeek, "!fileHandle.IsClosed && _canSeek"); Debug.Assert(origin >= SeekOrigin.Begin && origin <= SeekOrigin.End, "origin >= SeekOrigin.Begin && origin <= SeekOrigin.End"); @@ -622,591 +249,175 @@ protected override long SeekCore(SafeFileHandle fileHandle, long offset, SeekOri return ret; } - protected override void OnBufferAllocated() + internal sealed override void Lock(long position, long length) { - Debug.Assert(_buffer != null); - Debug.Assert(_preallocatedOverlapped == null); + int positionLow = unchecked((int)(position)); + int positionHigh = unchecked((int)(position >> 32)); + int lengthLow = unchecked((int)(length)); + int lengthHigh = unchecked((int)(length >> 32)); - if (_useAsyncIO) - _preallocatedOverlapped = new PreAllocatedOverlapped(s_ioCallback, this, _buffer); + if (!Interop.Kernel32.LockFile(_fileHandle, positionLow, positionHigh, lengthLow, lengthHigh)) + { + throw Win32Marshal.GetExceptionForLastWin32Error(_path); + } } - protected override void WriteSpan(ReadOnlySpan source) + internal sealed override void Unlock(long position, long length) { - Debug.Assert(!_useAsyncIO, "Must only be used when in synchronous mode"); + int positionLow = unchecked((int)(position)); + int positionHigh = unchecked((int)(position >> 32)); + int lengthLow = unchecked((int)(length)); + int lengthHigh = unchecked((int)(length >> 32)); - if (_writePos == 0) + if (!Interop.Kernel32.UnlockFile(_fileHandle, positionLow, positionHigh, lengthLow, lengthHigh)) { - // Ensure we can write to the stream, and ready buffer for writing. - if (!CanWrite) throw Error.GetWriteNotSupported(); - if (_readPos < _readLength) FlushReadBuffer(); - _readPos = 0; - _readLength = 0; + throw Win32Marshal.GetExceptionForLastWin32Error(_path); } + } + + protected abstract void OnInitFromHandle(SafeFileHandle handle); + + protected virtual void OnInit() { } - // If our buffer has data in it, copy data from the user's array into - // the buffer, and if we can fit it all there, return. Otherwise, write - // the buffer to disk and copy any remaining data into our buffer. - // The assumption here is memcpy is cheaper than disk (or net) IO. - // (10 milliseconds to disk vs. ~20-30 microseconds for a 4K memcpy) - // So the extra copying will reduce the total number of writes, in - // non-pathological cases (i.e. write 1 byte, then write for the buffer - // size repeatedly) - if (_writePos > 0) + private void Init(FileMode mode, string originalPath) + { + if (!PathInternal.IsExtended(originalPath)) { - int numBytes = _bufferLength - _writePos; // space left in buffer - if (numBytes > 0) + // To help avoid stumbling into opening COM/LPT ports by accident, we will block on non file handles unless + // we were explicitly passed a path that has \\?\. GetFullPath() will turn paths like C:\foo\con.txt into + // \\.\CON, so we'll only allow the \\?\ syntax. + + int fileType = Interop.Kernel32.GetFileType(_fileHandle); + if (fileType != Interop.Kernel32.FileTypes.FILE_TYPE_DISK) { - if (numBytes >= source.Length) - { - source.CopyTo(GetBuffer().AsSpan(_writePos)); - _writePos += source.Length; - return; - } - else + int errorCode = fileType == Interop.Kernel32.FileTypes.FILE_TYPE_UNKNOWN + ? Marshal.GetLastWin32Error() + : Interop.Errors.ERROR_SUCCESS; + + _fileHandle.Dispose(); + + if (errorCode != Interop.Errors.ERROR_SUCCESS) { - source.Slice(0, numBytes).CopyTo(GetBuffer().AsSpan(_writePos)); - _writePos += numBytes; - source = source.Slice(numBytes); + throw Win32Marshal.GetExceptionForWin32Error(errorCode); } + throw new NotSupportedException(SR.NotSupported_FileStreamOnNonFiles); } - // Reset our buffer. We essentially want to call FlushWrite - // without calling Flush on the underlying Stream. - - WriteCore(new ReadOnlySpan(GetBuffer(), 0, _writePos)); - _writePos = 0; } - // If the buffer would slow writes down, avoid buffer completely. - if (source.Length >= _bufferLength) + OnInit(); + + // For Append mode... + if (mode == FileMode.Append) { - Debug.Assert(_writePos == 0, "FileStream cannot have buffered data to write here! Your stream will be corrupted."); - WriteCore(source); - return; + _appendStart = SeekCore(_fileHandle, 0, SeekOrigin.End); } - else if (source.Length == 0) + else { - return; // Don't allocate a buffer then call memcpy for 0 bytes. + _appendStart = -1; } - - // Copy remaining bytes into buffer, to write at a later date. - source.CopyTo(GetBuffer().AsSpan(_writePos)); - _writePos = source.Length; - return; } - private unsafe void WriteCore(ReadOnlySpan source) + private void InitFromHandle(SafeFileHandle handle, FileAccess access, out bool canSeek, out bool isPipe) { - Debug.Assert(!_useAsyncIO); - Debug.Assert(!_fileHandle.IsClosed, "!_handle.IsClosed"); - Debug.Assert(CanWrite, "_parent.CanWrite"); - Debug.Assert(_readPos == _readLength, "_readPos == _readLen"); - - // Make sure we are writing to the position that we think we are - VerifyOSHandlePosition(); - - int r = WriteFileNative(_fileHandle, source, null, out int errorCode); +#if DEBUG + bool hadBinding = handle.ThreadPoolBinding != null; - if (r == -1) + try { - // For pipes, ERROR_NO_DATA is not an error, but the pipe is closing. - if (errorCode == ERROR_NO_DATA) - { - r = 0; - } - else - { - // ERROR_INVALID_PARAMETER may be returned for writes - // where the position is too large or for synchronous writes - // to a handle opened asynchronously. - if (errorCode == ERROR_INVALID_PARAMETER) - throw new IOException(SR.IO_FileTooLongOrHandleNotSync); - throw Win32Marshal.GetExceptionForWin32Error(errorCode, _path); - } +#endif + InitFromHandleImpl(handle, out canSeek, out isPipe); +#if DEBUG } - Debug.Assert(r >= 0, "FileStream's WriteCore is likely broken."); - _filePosition += r; - return; - } - - protected override Task? ReadAsyncInternal(Memory destination, CancellationToken cancellationToken, out int synchronousResult) - { - Debug.Assert(_useAsyncIO); - if (!CanRead) throw Error.GetReadNotSupported(); - - Debug.Assert((_readPos == 0 && _readLength == 0 && _writePos >= 0) || (_writePos == 0 && _readPos <= _readLength), "We're either reading or writing, but not both."); - - if (_isPipe) - { - // Pipes are tricky, at least when you have 2 different pipes - // that you want to use simultaneously. When redirecting stdout - // & stderr with the Process class, it's easy to deadlock your - // parent & child processes when doing writes 4K at a time. The - // OS appears to use a 4K buffer internally. If you write to a - // pipe that is full, you will block until someone read from - // that pipe. If you try reading from an empty pipe and - // Win32FileStream's ReadAsync blocks waiting for data to fill it's - // internal buffer, you will be blocked. In a case where a child - // process writes to stdout & stderr while a parent process tries - // reading from both, you can easily get into a deadlock here. - // To avoid this deadlock, don't buffer when doing async IO on - // pipes. But don't completely ignore buffered data either. - if (_readPos < _readLength) - { - int n = Math.Min(_readLength - _readPos, destination.Length); - new Span(GetBuffer(), _readPos, n).CopyTo(destination.Span); - _readPos += n; - synchronousResult = n; - return null; - } - else - { - Debug.Assert(_writePos == 0, "Win32FileStream must not have buffered write data here! Pipes should be unidirectional."); - synchronousResult = 0; - return ReadNativeAsync(destination, 0, cancellationToken); - } - } - - Debug.Assert(!_isPipe, "Should not be a pipe."); - - // Handle buffering. - if (_writePos > 0) FlushWriteBuffer(); - if (_readPos == _readLength) - { - // I can't see how to handle buffering of async requests when - // filling the buffer asynchronously, without a lot of complexity. - // The problems I see are issuing an async read, we do an async - // read to fill the buffer, then someone issues another read - // (either synchronously or asynchronously) before the first one - // returns. This would involve some sort of complex buffer locking - // that we probably don't want to get into, at least not in V1. - // If we did a sync read to fill the buffer, we could avoid the - // problem, and any async read less than 64K gets turned into a - // synchronous read by NT anyways... -- - - if (destination.Length < _bufferLength) - { - Task readTask = ReadNativeAsync(new Memory(GetBuffer()), 0, cancellationToken); - _readLength = readTask.GetAwaiter().GetResult(); - int n = Math.Min(_readLength, destination.Length); - new Span(GetBuffer(), 0, n).CopyTo(destination.Span); - _readPos = n; - - synchronousResult = n; - return null; - } - else - { - // Here we're making our position pointer inconsistent - // with our read buffer. Throw away the read buffer's contents. - _readPos = 0; - _readLength = 0; - synchronousResult = 0; - return ReadNativeAsync(destination, 0, cancellationToken); - } - } - else + catch { - int n = Math.Min(_readLength - _readPos, destination.Length); - new Span(GetBuffer(), _readPos, n).CopyTo(destination.Span); - _readPos += n; - - if (n == destination.Length) - { - // Return a completed task - synchronousResult = n; - return null; - } - else - { - // For streams with no clear EOF like serial ports or pipes - // we cannot read more data without causing an app to block - // incorrectly. Pipes don't go down this path - // though. This code needs to be fixed. - // Throw away read buffer. - _readPos = 0; - _readLength = 0; - synchronousResult = 0; - return ReadNativeAsync(destination.Slice(n), n, cancellationToken); - } + Debug.Assert(hadBinding || handle.ThreadPoolBinding == null, "We should never error out with a ThreadPoolBinding we've added"); + throw; } +#endif } - private unsafe Task ReadNativeAsync(Memory destination, int numBufferedBytesRead, CancellationToken cancellationToken) + private void InitFromHandleImpl(SafeFileHandle handle, out bool canSeek, out bool isPipe) { - AssertCanRead(); - Debug.Assert(_useAsyncIO, "ReadNativeAsync doesn't work on synchronous file streams!"); - - // Create and store async stream class library specific data in the async result - FileStreamCompletionSource completionSource = FileStreamCompletionSource.Create(this, numBufferedBytesRead, destination); - NativeOverlapped* intOverlapped = completionSource.Overlapped; - - // Calculate position in the file we should be at after the read is done - if (CanSeek) - { - long len = Length; - - // Make sure we are reading from the position that we think we are - VerifyOSHandlePosition(); - - if (_filePosition + destination.Length > len) - { - if (_filePosition <= len) - { - destination = destination.Slice(0, (int)(len - _filePosition)); - } - else - { - destination = default; - } - } - - // Now set the position to read from in the NativeOverlapped struct - // For pipes, we should leave the offset fields set to 0. - intOverlapped->OffsetLow = unchecked((int)_filePosition); - intOverlapped->OffsetHigh = (int)(_filePosition >> 32); - - // When using overlapped IO, the OS is not supposed to - // touch the file pointer location at all. We will adjust it - // ourselves. This isn't threadsafe. - - // WriteFile should not update the file pointer when writing - // in overlapped mode, according to MSDN. But it does update - // the file pointer when writing to a UNC path! - // So changed the code below to seek to an absolute - // location, not a relative one. ReadFile seems consistent though. - SeekCore(_fileHandle, destination.Length, SeekOrigin.Current); - } - - // queue an async ReadFile operation and pass in a packed overlapped - int r = ReadFileNative(_fileHandle, destination.Span, intOverlapped, out int errorCode); - - // ReadFile, the OS version, will return 0 on failure. But - // my ReadFileNative wrapper returns -1. My wrapper will return - // the following: - // On error, r==-1. - // On async requests that are still pending, r==-1 w/ errorCode==ERROR_IO_PENDING - // on async requests that completed sequentially, r==0 - // You will NEVER RELIABLY be able to get the number of bytes - // read back from this call when using overlapped structures! You must - // not pass in a non-null lpNumBytesRead to ReadFile when using - // overlapped structures! This is by design NT behavior. - if (r == -1) - { - // For pipes, when they hit EOF, they will come here. - if (errorCode == ERROR_BROKEN_PIPE) - { - // Not an error, but EOF. AsyncFSCallback will NOT be - // called. Call the user callback here. + int handleType = Interop.Kernel32.GetFileType(handle); + Debug.Assert(handleType == Interop.Kernel32.FileTypes.FILE_TYPE_DISK || handleType == Interop.Kernel32.FileTypes.FILE_TYPE_PIPE || handleType == Interop.Kernel32.FileTypes.FILE_TYPE_CHAR, "FileStream was passed an unknown file type!"); - // We clear the overlapped status bit for this special case. - // Failure to do so looks like we are freeing a pending overlapped later. - intOverlapped->InternalLow = IntPtr.Zero; - completionSource.SetCompletedSynchronously(0); - } - else if (errorCode != ERROR_IO_PENDING) - { - if (!_fileHandle.IsClosed && CanSeek) // Update Position - It could be anywhere. - { - SeekCore(_fileHandle, 0, SeekOrigin.Current); - } + canSeek = handleType == Interop.Kernel32.FileTypes.FILE_TYPE_DISK; + isPipe = handleType == Interop.Kernel32.FileTypes.FILE_TYPE_PIPE; - completionSource.ReleaseNativeResource(); + OnInitFromHandle(handle); - if (errorCode == ERROR_HANDLE_EOF) - { - throw Error.GetEndOfFile(); - } - else - { - throw Win32Marshal.GetExceptionForWin32Error(errorCode, _path); - } - } - else if (cancellationToken.CanBeCanceled) // ERROR_IO_PENDING - { - // Only once the IO is pending do we register for cancellation - completionSource.RegisterForCancellation(cancellationToken); - } - } + if (_canSeek) + SeekCore(handle, 0, SeekOrigin.Current); else - { - // Due to a workaround for a race condition in NT's ReadFile & - // WriteFile routines, we will always be returning 0 from ReadFileNative - // when we do async IO instead of the number of bytes read, - // irregardless of whether the operation completed - // synchronously or asynchronously. We absolutely must not - // set asyncResult._numBytes here, since will never have correct - // results. - } - - return completionSource.Task; + _filePosition = 0; } - protected override ValueTask WriteAsyncInternal(ReadOnlyMemory source, CancellationToken cancellationToken) + public sealed override void SetLength(long value) { - Debug.Assert(_useAsyncIO); - Debug.Assert((_readPos == 0 && _readLength == 0 && _writePos >= 0) || (_writePos == 0 && _readPos <= _readLength), "We're either reading or writing, but not both."); - Debug.Assert(!_isPipe || (_readPos == 0 && _readLength == 0), "Win32FileStream must not have buffered data here! Pipes should be unidirectional."); - - if (!CanWrite) throw Error.GetWriteNotSupported(); - - bool writeDataStoredInBuffer = false; - if (!_isPipe) // avoid async buffering with pipes, as doing so can lead to deadlocks (see comments in ReadInternalAsyncCore) - { - // Ensure the buffer is clear for writing - if (_writePos == 0) - { - if (_readPos < _readLength) - { - FlushReadBuffer(); - } - _readPos = 0; - _readLength = 0; - } - - // Determine how much space remains in the buffer - int remainingBuffer = _bufferLength - _writePos; - Debug.Assert(remainingBuffer >= 0); - - // Simple/common case: - // - The write is smaller than our buffer, such that it's worth considering buffering it. - // - There's no active flush operation, such that we don't have to worry about the existing buffer being in use. - // - And the data we're trying to write fits in the buffer, meaning it wasn't already filled by previous writes. - // In that case, just store it in the buffer. - if (source.Length < _bufferLength && !HasActiveBufferOperation && source.Length <= remainingBuffer) - { - source.Span.CopyTo(new Span(GetBuffer(), _writePos, source.Length)); - _writePos += source.Length; - writeDataStoredInBuffer = true; - - // There is one special-but-common case, common because devs often use - // byte[] sizes that are powers of 2 and thus fit nicely into our buffer, which is - // also a power of 2. If after our write the buffer still has remaining space, - // then we're done and can return a completed task now. But if we filled the buffer - // completely, we want to do the asynchronous flush/write as part of this operation - // rather than waiting until the next write that fills the buffer. - if (source.Length != remainingBuffer) - return default; - - Debug.Assert(_writePos == _bufferLength); - } - } - - // At this point, at least one of the following is true: - // 1. There was an active flush operation (it could have completed by now, though). - // 2. The data doesn't fit in the remaining buffer (or it's a pipe and we chose not to try). - // 3. We wrote all of the data to the buffer, filling it. - // - // If there's an active operation, we can't touch the current buffer because it's in use. - // That gives us a choice: we can either allocate a new buffer, or we can skip the buffer - // entirely (even if the data would otherwise fit in it). For now, for simplicity, we do - // the latter; it could also have performance wins due to OS-level optimizations, and we could - // potentially add support for PreAllocatedOverlapped due to having a single buffer. (We can - // switch to allocating a new buffer, potentially experimenting with buffer pooling, should - // performance data suggest it's appropriate.) - // - // If the data doesn't fit in the remaining buffer, it could be because it's so large - // it's greater than the entire buffer size, in which case we'd always skip the buffer, - // or it could be because there's more data than just the space remaining. For the latter - // case, we need to issue an asynchronous write to flush that data, which then turns this into - // the first case above with an active operation. - // - // If we already stored the data, then we have nothing additional to write beyond what - // we need to flush. - // - // In any of these cases, we have the same outcome: - // - If there's data in the buffer, flush it by writing it out asynchronously. - // - Then, if there's any data to be written, issue a write for it concurrently. - // We return a Task that represents one or both. - - // Flush the buffer asynchronously if there's anything to flush - Task? flushTask = null; - if (_writePos > 0) - { - flushTask = FlushWriteAsync(cancellationToken); - - // If we already copied all of the data into the buffer, - // simply return the flush task here. Same goes for if the task has - // already completed and was unsuccessful. - if (writeDataStoredInBuffer || - flushTask.IsFaulted || - flushTask.IsCanceled) - { - return new ValueTask(flushTask); - } - } - - Debug.Assert(!writeDataStoredInBuffer); - Debug.Assert(_writePos == 0); + if (_appendStart != -1 && value < _appendStart) + throw new IOException(SR.IO_SetLengthAppendTruncate); - // Finally, issue the write asynchronously, and return a Task that logically - // represents the write operation, including any flushing done. - Task writeTask = WriteAsyncInternalCore(source, cancellationToken); - return new ValueTask( - (flushTask == null || flushTask.Status == TaskStatus.RanToCompletion) ? writeTask : - (writeTask.Status == TaskStatus.RanToCompletion) ? flushTask : - Task.WhenAll(flushTask, writeTask)); + SetLengthCore(value); } - private unsafe Task WriteAsyncInternalCore(ReadOnlyMemory source, CancellationToken cancellationToken) + // We absolutely need this method broken out so that WriteInternalCoreAsync can call + // a method without having to go through buffering code that might call FlushWrite. + protected unsafe void SetLengthCore(long value) { - Debug.Assert(!_fileHandle.IsClosed, "!_handle.IsClosed"); - Debug.Assert(CanWrite, "_parent.CanWrite"); - Debug.Assert(_readPos == _readLength, "_readPos == _readLen"); - Debug.Assert(_useAsyncIO, "WriteInternalCoreAsync doesn't work on synchronous file streams!"); - - // Create and store async stream class library specific data in the async result - FileStreamCompletionSource completionSource = FileStreamCompletionSource.Create(this, 0, source); - NativeOverlapped* intOverlapped = completionSource.Overlapped; - - if (CanSeek) - { - // Make sure we set the length of the file appropriately. - long len = Length; - - // Make sure we are writing to the position that we think we are - VerifyOSHandlePosition(); - - if (_filePosition + source.Length > len) - { - SetLengthCore(_filePosition + source.Length); - } - - // Now set the position to read from in the NativeOverlapped struct - // For pipes, we should leave the offset fields set to 0. - intOverlapped->OffsetLow = (int)_filePosition; - intOverlapped->OffsetHigh = (int)(_filePosition >> 32); - - // When using overlapped IO, the OS is not supposed to - // touch the file pointer location at all. We will adjust it - // ourselves. This isn't threadsafe. - SeekCore(_fileHandle, source.Length, SeekOrigin.Current); - } - - // queue an async WriteFile operation and pass in a packed overlapped - int r = WriteFileNative(_fileHandle, source.Span, intOverlapped, out int errorCode); - - // WriteFile, the OS version, will return 0 on failure. But - // my WriteFileNative wrapper returns -1. My wrapper will return - // the following: - // On error, r==-1. - // On async requests that are still pending, r==-1 w/ errorCode==ERROR_IO_PENDING - // On async requests that completed sequentially, r==0 - // You will NEVER RELIABLY be able to get the number of bytes - // written back from this call when using overlapped IO! You must - // not pass in a non-null lpNumBytesWritten to WriteFile when using - // overlapped structures! This is ByDesign NT behavior. - if (r == -1) - { - // For pipes, when they are closed on the other side, they will come here. - if (errorCode == ERROR_NO_DATA) - { - // Not an error, but EOF. AsyncFSCallback will NOT be called. - // Completing TCS and return cached task allowing the GC to collect TCS. - completionSource.SetCompletedSynchronously(0); - return Task.CompletedTask; - } - else if (errorCode != ERROR_IO_PENDING) - { - if (!_fileHandle.IsClosed && CanSeek) // Update Position - It could be anywhere. - { - SeekCore(_fileHandle, 0, SeekOrigin.Current); - } - - completionSource.ReleaseNativeResource(); + Debug.Assert(value >= 0, "value >= 0"); + VerifyOSHandlePosition(); - if (errorCode == ERROR_HANDLE_EOF) - { - throw Error.GetEndOfFile(); - } - else - { - throw Win32Marshal.GetExceptionForWin32Error(errorCode, _path); - } - } - else if (cancellationToken.CanBeCanceled) // ERROR_IO_PENDING - { - // Only once the IO is pending do we register for cancellation - completionSource.RegisterForCancellation(cancellationToken); - } - } - else + var eofInfo = new Interop.Kernel32.FILE_END_OF_FILE_INFO { - // Due to a workaround for a race condition in NT's ReadFile & - // WriteFile routines, we will always be returning 0 from WriteFileNative - // when we do async IO instead of the number of bytes written, - // irregardless of whether the operation completed - // synchronously or asynchronously. We absolutely must not - // set asyncResult._numBytes here, since will never have correct - // results. - } - - return completionSource.Task; - } - - // Error codes (not HRESULTS), from winerror.h - internal const int ERROR_BROKEN_PIPE = 109; - internal const int ERROR_NO_DATA = 232; - private const int ERROR_HANDLE_EOF = 38; - private const int ERROR_INVALID_PARAMETER = 87; - private const int ERROR_IO_PENDING = 997; - - // __ConsoleStream also uses this code. - private unsafe int ReadFileNative(SafeFileHandle handle, Span bytes, NativeOverlapped* overlapped, out int errorCode) - { - Debug.Assert(handle != null, "handle != null"); - Debug.Assert((_useAsyncIO && overlapped != null) || (!_useAsyncIO && overlapped == null), "Async IO and overlapped parameters inconsistent in call to ReadFileNative."); - - int r; - int numBytesRead = 0; + EndOfFile = value + }; - fixed (byte* p = &MemoryMarshal.GetReference(bytes)) + if (!Interop.Kernel32.SetFileInformationByHandle( + _fileHandle, + Interop.Kernel32.FileEndOfFileInfo, + &eofInfo, + (uint)sizeof(Interop.Kernel32.FILE_END_OF_FILE_INFO))) { - r = _useAsyncIO ? - Interop.Kernel32.ReadFile(handle, p, bytes.Length, IntPtr.Zero, overlapped) : - Interop.Kernel32.ReadFile(handle, p, bytes.Length, out numBytesRead, IntPtr.Zero); + int errorCode = Marshal.GetLastWin32Error(); + if (errorCode == Interop.Errors.ERROR_INVALID_PARAMETER) + throw new ArgumentOutOfRangeException(nameof(value), SR.ArgumentOutOfRange_FileLengthTooBig); + throw Win32Marshal.GetExceptionForWin32Error(errorCode, _path); } - if (r == 0) - { - errorCode = GetLastWin32ErrorAndDisposeHandleIfInvalid(); - return -1; - } - else + if (_filePosition > value) { - errorCode = 0; - return numBytesRead; + SeekCore(_fileHandle, 0, SeekOrigin.End); } } - private unsafe int WriteFileNative(SafeFileHandle handle, ReadOnlySpan buffer, NativeOverlapped* overlapped, out int errorCode) + /// + /// Verify that the actual position of the OS's handle equals what we expect it to. + /// This will fail if someone else moved the UnixFileStream's handle or if + /// our position updating code is incorrect. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + protected void VerifyOSHandlePosition() { - Debug.Assert(handle != null, "handle != null"); - Debug.Assert((_useAsyncIO && overlapped != null) || (!_useAsyncIO && overlapped == null), "Async IO and overlapped parameters inconsistent in call to WriteFileNative."); - - int numBytesWritten = 0; - int r; - - fixed (byte* p = &MemoryMarshal.GetReference(buffer)) + bool verifyPosition = _exposedHandle; // in release, only verify if we've given out the handle such that someone else could be manipulating it +#if DEBUG + verifyPosition = true; // in debug, always make sure our position matches what the OS says it should be +#endif + if (verifyPosition && CanSeek) { - r = _useAsyncIO ? - Interop.Kernel32.WriteFile(handle, p, buffer.Length, IntPtr.Zero, overlapped) : - Interop.Kernel32.WriteFile(handle, p, buffer.Length, out numBytesWritten, IntPtr.Zero); - } + long oldPos = _filePosition; // SeekCore will override the current _position, so save it now + long curPos = SeekCore(_fileHandle, 0, SeekOrigin.Current); + if (oldPos != curPos) + { + // For reads, this is non-fatal but we still could have returned corrupted + // data in some cases, so discard the internal buffer. For writes, + // this is a problem; discard the buffer and error out. - if (r == 0) - { - errorCode = GetLastWin32ErrorAndDisposeHandleIfInvalid(); - return -1; - } - else - { - errorCode = 0; - return numBytesWritten; + throw new IOException(SR.IO_FileStreamHandlePosition); + } } } - private int GetLastWin32ErrorAndDisposeHandleIfInvalid() + protected int GetLastWin32ErrorAndDisposeHandleIfInvalid() { int errorCode = Marshal.GetLastWin32Error(); @@ -1235,333 +446,56 @@ private int GetLastWin32ErrorAndDisposeHandleIfInvalid() return errorCode; } - public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) - { - // If we're in sync mode, just use the shared CopyToAsync implementation that does - // typical read/write looping. - if (!_useAsyncIO) - { - return base.CopyToAsync(destination, bufferSize, cancellationToken); - } - - ValidateCopyToArguments(destination, bufferSize); - - // Fail if the file was closed - if (_fileHandle.IsClosed) - { - throw Error.GetFileNotOpen(); - } - if (!CanRead) - { - throw Error.GetReadNotSupported(); - } - - // Bail early for cancellation if cancellation has been requested - if (cancellationToken.IsCancellationRequested) - { - return Task.FromCanceled(cancellationToken); - } - - // Do the async copy, with differing implementations based on whether the FileStream was opened as async or sync - Debug.Assert((_readPos == 0 && _readLength == 0 && _writePos >= 0) || (_writePos == 0 && _readPos <= _readLength), "We're either reading or writing, but not both."); - return AsyncModeCopyToAsync(destination, bufferSize, cancellationToken); - } - - private async Task AsyncModeCopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) + // __ConsoleStream also uses this code. + protected unsafe int ReadFileNative(SafeFileHandle handle, Span bytes, NativeOverlapped* overlapped, out int errorCode) { - Debug.Assert(_useAsyncIO, "This implementation is for async mode only"); - Debug.Assert(!_fileHandle.IsClosed, "!_handle.IsClosed"); - Debug.Assert(CanRead, "_parent.CanRead"); - - // Make sure any pending writes have been flushed before we do a read. - if (_writePos > 0) - { - await FlushWriteAsync(cancellationToken).ConfigureAwait(false); - } - - // Typically CopyToAsync would be invoked as the only "read" on the stream, but it's possible some reading is - // done and then the CopyToAsync is issued. For that case, see if we have any data available in the buffer. - if (GetBuffer() != null) - { - int bufferedBytes = _readLength - _readPos; - if (bufferedBytes > 0) - { - await destination.WriteAsync(new ReadOnlyMemory(GetBuffer(), _readPos, bufferedBytes), cancellationToken).ConfigureAwait(false); - _readPos = _readLength = 0; - } - } + Debug.Assert(handle != null, "handle != null"); - // For efficiency, we avoid creating a new task and associated state for each asynchronous read. - // Instead, we create a single reusable awaitable object that will be triggered when an await completes - // and reset before going again. - var readAwaitable = new AsyncCopyToAwaitable(this); + int r; + int numBytesRead = 0; - // Make sure we are reading from the position that we think we are. - // Only set the position in the awaitable if we can seek (e.g. not for pipes). - bool canSeek = CanSeek; - if (canSeek) + fixed (byte* p = &MemoryMarshal.GetReference(bytes)) { - VerifyOSHandlePosition(); - readAwaitable._position = _filePosition; + r = overlapped != null ? + Interop.Kernel32.ReadFile(handle, p, bytes.Length, IntPtr.Zero, overlapped) : + Interop.Kernel32.ReadFile(handle, p, bytes.Length, out numBytesRead, IntPtr.Zero); } - // Get the buffer to use for the copy operation, as the base CopyToAsync does. We don't try to use - // _buffer here, even if it's not null, as concurrent operations are allowed, and another operation may - // actually be using the buffer already. Plus, it'll be rare for _buffer to be non-null, as typically - // CopyToAsync is used as the only operation performed on the stream, and the buffer is lazily initialized. - // Further, typically the CopyToAsync buffer size will be larger than that used by the FileStream, such that - // we'd likely be unable to use it anyway. Instead, we rent the buffer from a pool. - byte[] copyBuffer = ArrayPool.Shared.Rent(bufferSize); - - // Allocate an Overlapped we can use repeatedly for all operations - var awaitableOverlapped = new PreAllocatedOverlapped(AsyncCopyToAwaitable.s_callback, readAwaitable, copyBuffer); - var cancellationReg = default(CancellationTokenRegistration); - try + if (r == 0) { - // Register for cancellation. We do this once for the whole copy operation, and just try to cancel - // whatever read operation may currently be in progress, if there is one. It's possible the cancellation - // request could come in between operations, in which case we flag that with explicit calls to ThrowIfCancellationRequested - // in the read/write copy loop. - if (cancellationToken.CanBeCanceled) - { - cancellationReg = cancellationToken.UnsafeRegister(static s => - { - Debug.Assert(s is AsyncCopyToAwaitable); - var innerAwaitable = (AsyncCopyToAwaitable)s; - unsafe - { - lock (innerAwaitable.CancellationLock) // synchronize with cleanup of the overlapped - { - if (innerAwaitable._nativeOverlapped != null) - { - // Try to cancel the I/O. We ignore the return value, as cancellation is opportunistic and we - // don't want to fail the operation because we couldn't cancel it. - Interop.Kernel32.CancelIoEx(innerAwaitable._fileStream._fileHandle, innerAwaitable._nativeOverlapped); - } - } - } - }, readAwaitable); - } - - // Repeatedly read from this FileStream and write the results to the destination stream. - while (true) - { - cancellationToken.ThrowIfCancellationRequested(); - readAwaitable.ResetForNextOperation(); - - try - { - bool synchronousSuccess; - int errorCode; - unsafe - { - // Allocate a native overlapped for our reusable overlapped, and set position to read based on the next - // desired address stored in the awaitable. (This position may be 0, if either we're at the beginning or - // if the stream isn't seekable.) - readAwaitable._nativeOverlapped = _fileHandle.ThreadPoolBinding!.AllocateNativeOverlapped(awaitableOverlapped); - if (canSeek) - { - readAwaitable._nativeOverlapped->OffsetLow = unchecked((int)readAwaitable._position); - readAwaitable._nativeOverlapped->OffsetHigh = (int)(readAwaitable._position >> 32); - } - - // Kick off the read. - synchronousSuccess = ReadFileNative(_fileHandle, copyBuffer, readAwaitable._nativeOverlapped, out errorCode) >= 0; - } - - // If the operation did not synchronously succeed, it either failed or initiated the asynchronous operation. - if (!synchronousSuccess) - { - switch (errorCode) - { - case ERROR_IO_PENDING: - // Async operation in progress. - break; - case ERROR_BROKEN_PIPE: - case ERROR_HANDLE_EOF: - // We're at or past the end of the file, and the overlapped callback - // won't be raised in these cases. Mark it as completed so that the await - // below will see it as such. - readAwaitable.MarkCompleted(); - break; - default: - // Everything else is an error (and there won't be a callback). - throw Win32Marshal.GetExceptionForWin32Error(errorCode, _path); - } - } - - // Wait for the async operation (which may or may not have already completed), then throw if it failed. - await readAwaitable; - switch (readAwaitable._errorCode) - { - case 0: // success - break; - case ERROR_BROKEN_PIPE: // logically success with 0 bytes read (write end of pipe closed) - case ERROR_HANDLE_EOF: // logically success with 0 bytes read (read at end of file) - Debug.Assert(readAwaitable._numBytes == 0, $"Expected 0 bytes read, got {readAwaitable._numBytes}"); - break; - case Interop.Errors.ERROR_OPERATION_ABORTED: // canceled - throw new OperationCanceledException(cancellationToken.IsCancellationRequested ? cancellationToken : new CancellationToken(true)); - default: // error - throw Win32Marshal.GetExceptionForWin32Error((int)readAwaitable._errorCode, _path); - } - - // Successful operation. If we got zero bytes, we're done: exit the read/write loop. - int numBytesRead = (int)readAwaitable._numBytes; - if (numBytesRead == 0) - { - break; - } - - // Otherwise, update the read position for next time accordingly. - if (canSeek) - { - readAwaitable._position += numBytesRead; - } - } - finally - { - // Free the resources for this read operation - unsafe - { - NativeOverlapped* overlapped; - lock (readAwaitable.CancellationLock) // just an Exchange, but we need this to be synchronized with cancellation, so using the same lock - { - overlapped = readAwaitable._nativeOverlapped; - readAwaitable._nativeOverlapped = null; - } - if (overlapped != null) - { - _fileHandle.ThreadPoolBinding!.FreeNativeOverlapped(overlapped); - } - } - } - - // Write out the read data. - await destination.WriteAsync(new ReadOnlyMemory(copyBuffer, 0, (int)readAwaitable._numBytes), cancellationToken).ConfigureAwait(false); - } + errorCode = GetLastWin32ErrorAndDisposeHandleIfInvalid(); + return -1; } - finally + else { - // Cleanup from the whole copy operation - cancellationReg.Dispose(); - awaitableOverlapped.Dispose(); - - ArrayPool.Shared.Return(copyBuffer); - - // Make sure the stream's current position reflects where we ended up - if (!_fileHandle.IsClosed && CanSeek) - { - SeekCore(_fileHandle, 0, SeekOrigin.End); - } + errorCode = 0; + return numBytesRead; } } - /// Used by CopyToAsync to enable awaiting the result of an overlapped I/O operation with minimal overhead. - private sealed unsafe class AsyncCopyToAwaitable : ICriticalNotifyCompletion + protected unsafe int WriteFileNative(SafeFileHandle handle, ReadOnlySpan buffer, NativeOverlapped* overlapped, out int errorCode) { - /// Sentinel object used to indicate that the I/O operation has completed before being awaited. - private static readonly Action s_sentinel = () => { }; - /// Cached delegate to IOCallback. - internal static readonly IOCompletionCallback s_callback = IOCallback; - - /// The FileStream that owns this instance. - internal readonly WindowsFileStreamStrategy _fileStream; - - /// Tracked position representing the next location from which to read. - internal long _position; - /// The current native overlapped pointer. This changes for each operation. - internal NativeOverlapped* _nativeOverlapped; - /// - /// null if the operation is still in progress, - /// s_sentinel if the I/O operation completed before the await, - /// s_callback if it completed after the await yielded. - /// - internal Action? _continuation; - /// Last error code from completed operation. - internal uint _errorCode; - /// Last number of read bytes from completed operation. - internal uint _numBytes; - - /// Lock object used to protect cancellation-related access to _nativeOverlapped. - internal object CancellationLock => this; - - /// Initialize the awaitable. - internal AsyncCopyToAwaitable(WindowsFileStreamStrategy fileStream) - { - _fileStream = fileStream; - } - - /// Reset state to prepare for the next read operation. - internal void ResetForNextOperation() - { - Debug.Assert(_position >= 0, $"Expected non-negative position, got {_position}"); - _continuation = null; - _errorCode = 0; - _numBytes = 0; - } - - /// Overlapped callback: store the results, then invoke the continuation delegate. - internal static void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* pOVERLAP) - { - var awaitable = (AsyncCopyToAwaitable?)ThreadPoolBoundHandle.GetNativeOverlappedState(pOVERLAP); - Debug.Assert(awaitable != null); - - Debug.Assert(!ReferenceEquals(awaitable._continuation, s_sentinel), "Sentinel must not have already been set as the continuation"); - awaitable._errorCode = errorCode; - awaitable._numBytes = numBytes; - - (awaitable._continuation ?? Interlocked.CompareExchange(ref awaitable._continuation, s_sentinel, null))?.Invoke(); - } + Debug.Assert(handle != null, "handle != null"); - /// - /// Called when it's known that the I/O callback for an operation will not be invoked but we'll - /// still be awaiting the awaitable. - /// - internal void MarkCompleted() - { - Debug.Assert(_continuation == null, "Expected null continuation"); - _continuation = s_sentinel; - } + int numBytesWritten = 0; + int r; - public AsyncCopyToAwaitable GetAwaiter() => this; - public bool IsCompleted => ReferenceEquals(_continuation, s_sentinel); - public void GetResult() { } - public void OnCompleted(Action continuation) => UnsafeOnCompleted(continuation); - public void UnsafeOnCompleted(Action continuation) + fixed (byte* p = &MemoryMarshal.GetReference(buffer)) { - if (ReferenceEquals(_continuation, s_sentinel) || - Interlocked.CompareExchange(ref _continuation, continuation, null) != null) - { - Debug.Assert(ReferenceEquals(_continuation, s_sentinel), $"Expected continuation set to s_sentinel, got ${_continuation}"); - Task.Run(continuation); - } + r = overlapped != null ? + Interop.Kernel32.WriteFile(handle, p, buffer.Length, IntPtr.Zero, overlapped) : + Interop.Kernel32.WriteFile(handle, p, buffer.Length, out numBytesWritten, IntPtr.Zero); } - } - internal override void Lock(long position, long length) - { - int positionLow = unchecked((int)(position)); - int positionHigh = unchecked((int)(position >> 32)); - int lengthLow = unchecked((int)(length)); - int lengthHigh = unchecked((int)(length >> 32)); - - if (!Interop.Kernel32.LockFile(_fileHandle, positionLow, positionHigh, lengthLow, lengthHigh)) + if (r == 0) { - throw Win32Marshal.GetExceptionForLastWin32Error(_path); + errorCode = GetLastWin32ErrorAndDisposeHandleIfInvalid(); + return -1; } - } - - internal override void Unlock(long position, long length) - { - int positionLow = unchecked((int)(position)); - int positionHigh = unchecked((int)(position >> 32)); - int lengthLow = unchecked((int)(length)); - int lengthHigh = unchecked((int)(length >> 32)); - - if (!Interop.Kernel32.UnlockFile(_fileHandle, positionLow, positionHigh, lengthLow, lengthHigh)) + else { - throw Win32Marshal.GetExceptionForLastWin32Error(_path); + errorCode = 0; + return numBytesWritten; } } } From 41239e003c2a699a6ab25a978806c13542ae39dc Mon Sep 17 00:00:00 2001 From: Adam Sitnik Date: Thu, 18 Feb 2021 10:32:13 +0100 Subject: [PATCH 04/10] implement synchronous WindowsFileStreamStrategy --- .../IO/SyncWindowsFileStreamStrategy.cs | 189 ++++++++++++++++++ 1 file changed, 189 insertions(+) create mode 100644 src/libraries/System.Private.CoreLib/src/System/IO/SyncWindowsFileStreamStrategy.cs diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/SyncWindowsFileStreamStrategy.cs b/src/libraries/System.Private.CoreLib/src/System/IO/SyncWindowsFileStreamStrategy.cs new file mode 100644 index 0000000000000..456ac18ed4c1f --- /dev/null +++ b/src/libraries/System.Private.CoreLib/src/System/IO/SyncWindowsFileStreamStrategy.cs @@ -0,0 +1,189 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Buffers; +using System.Diagnostics; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Win32.SafeHandles; +using System.Runtime.CompilerServices; + +/* + * Win32FileStream supports different modes of accessing the disk - async mode + * and sync mode. They are two completely different codepaths in the + * sync & async methods (i.e. Read/Write vs. ReadAsync/WriteAsync). File + * handles in NT can be opened in only sync or overlapped (async) mode, + * and we have to deal with this pain. Stream has implementations of + * the sync methods in terms of the async ones, so we'll + * call through to our base class to get those methods when necessary. + * + * Also buffering is added into Win32FileStream as well. Folded in the + * code from BufferedStream, so all the comments about it being mostly + * aggressive (and the possible perf improvement) apply to Win32FileStream as + * well. Also added some buffering to the async code paths. + * + * Class Invariants: + * The class has one buffer, shared for reading & writing. It can only be + * used for one or the other at any point in time - not both. The following + * should be true: + * 0 <= _readPos <= _readLen < _bufferSize + * 0 <= _writePos < _bufferSize + * _readPos == _readLen && _readPos > 0 implies the read buffer is valid, + * but we're at the end of the buffer. + * _readPos == _readLen == 0 means the read buffer contains garbage. + * Either _writePos can be greater than 0, or _readLen & _readPos can be + * greater than zero, but neither can be greater than zero at the same time. + * + */ + +namespace System.IO +{ + internal sealed class SyncWindowsFileStreamStrategy : WindowsFileStreamStrategy + { + internal SyncWindowsFileStreamStrategy(SafeFileHandle handle, FileAccess access) : base(handle, access) + { + } + + internal SyncWindowsFileStreamStrategy(string path, FileMode mode, FileAccess access, FileShare share, FileOptions options) + : base(path, mode, access, share, options) + { + } + + internal override bool IsAsync => false; + + protected override void OnInitFromHandle(SafeFileHandle handle) + { + // As we can accurately check the handle type when we have access to NtQueryInformationFile we don't need to skip for + // any particular file handle type. + + // If the handle was passed in without an explicit async setting, we already looked it up in GetDefaultIsAsync + if (!handle.IsAsync.HasValue) + return; + + // If we can't check the handle, just assume it is ok. + if (!(FileStreamHelpers.IsHandleSynchronous(handle, ignoreInvalid: false) ?? true)) + throw new ArgumentException(SR.Arg_HandleNotSync, nameof(handle)); + } + + public override int Read(byte[] buffer, int offset, int count) => ReadSpan(new Span(buffer, offset, count)); + + public override int Read(Span buffer) => ReadSpan(buffer); + + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + // If we weren't opened for asynchronous I/O, we still call to the base implementation so that + // Read is invoked asynchronously. But we can do so using the base Stream's internal helper + // that bypasses delegating to BeginRead, since we already know this is FileStream rather + // than something derived from it and what our BeginRead implementation is going to do. + return (Task)BeginReadInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false); + } + + public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + { + // If we weren't opened for asynchronous I/O, we still call to the base implementation so that + // Read is invoked asynchronously. But if we have a byte[], we can do so using the base Stream's + // internal helper that bypasses delegating to BeginRead, since we already know this is FileStream + // rather than something derived from it and what our BeginRead implementation is going to do. + return MemoryMarshal.TryGetArray(buffer, out ArraySegment segment) ? + new ValueTask((Task)BeginReadInternal(segment.Array!, segment.Offset, segment.Count, null, null, serializeAsynchronously: true, apm: false)) : + base.ReadAsync(buffer, cancellationToken); + } + + public override void Write(byte[] buffer, int offset, int count) + => WriteSpan(new ReadOnlySpan(buffer, offset, count)); + + public override void Write(ReadOnlySpan buffer) + { + if (_fileHandle.IsClosed) + { + throw Error.GetFileNotOpen(); + } + + WriteSpan(buffer); + } + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + // If we weren't opened for asynchronous I/O, we still call to the base implementation so that + // Write is invoked asynchronously. But we can do so using the base Stream's internal helper + // that bypasses delegating to BeginWrite, since we already know this is FileStream rather + // than something derived from it and what our BeginWrite implementation is going to do. + return (Task)BeginWriteInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false); + } + + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + // If we weren't opened for asynchronous I/O, we still call to the base implementation so that + // Write is invoked asynchronously. But if we have a byte[], we can do so using the base Stream's + // internal helper that bypasses delegating to BeginWrite, since we already know this is FileStream + // rather than something derived from it and what our BeginWrite implementation is going to do. + return MemoryMarshal.TryGetArray(buffer, out ArraySegment segment) ? + new ValueTask((Task)BeginWriteInternal(segment.Array!, segment.Offset, segment.Count, null, null, serializeAsynchronously: true, apm: false)) : + base.WriteAsync(buffer, cancellationToken); + } + + private unsafe int ReadSpan(Span destination) + { + Debug.Assert(CanRead, "BufferedStream has already verified that"); + Debug.Assert(!_fileHandle.IsClosed, "!_handle.IsClosed"); + + // Make sure we are reading from the right spot + VerifyOSHandlePosition(); + + int r = ReadFileNative(_fileHandle, destination, null, out int errorCode); + + if (r == -1) + { + // For pipes, ERROR_BROKEN_PIPE is the normal end of the pipe. + if (errorCode == ERROR_BROKEN_PIPE) + { + r = 0; + } + else + { + if (errorCode == ERROR_INVALID_PARAMETER) + throw new ArgumentException(SR.Arg_HandleNotSync, "_fileHandle"); + + throw Win32Marshal.GetExceptionForWin32Error(errorCode, _path); + } + } + Debug.Assert(r >= 0, "FileStream's ReadNative is likely broken."); + _filePosition += r; + + return r; + } + + private unsafe void WriteSpan(ReadOnlySpan source) + { + Debug.Assert(CanWrite, "BufferedStream has already verified that"); + Debug.Assert(!_fileHandle.IsClosed, "!_handle.IsClosed"); + + // Make sure we are writing to the position that we think we are + VerifyOSHandlePosition(); + + int r = WriteFileNative(_fileHandle, source, null, out int errorCode); + + if (r == -1) + { + // For pipes, ERROR_NO_DATA is not an error, but the pipe is closing. + if (errorCode == ERROR_NO_DATA) + { + r = 0; + } + else + { + // ERROR_INVALID_PARAMETER may be returned for writes + // where the position is too large or for synchronous writes + // to a handle opened asynchronously. + if (errorCode == ERROR_INVALID_PARAMETER) + throw new IOException(SR.IO_FileTooLongOrHandleNotSync); + throw Win32Marshal.GetExceptionForWin32Error(errorCode, _path); + } + } + Debug.Assert(r >= 0, "FileStream's WriteCore is likely broken."); + _filePosition += r; + return; + } + } +} From 9bb14fb7ddcf27fe969f5ec3e8ed58e766391126 Mon Sep 17 00:00:00 2001 From: Adam Sitnik Date: Thu, 18 Feb 2021 10:32:40 +0100 Subject: [PATCH 05/10] implement asynchronous WindowsFileStreamStrategy --- .../System.Private.CoreLib.Shared.projitems | 3 + .../IO/AsyncWindowsFileStreamStrategy.cs | 654 ++++++++++++++++++ .../IO/FileStreamCompletionSource.Win32.cs | 19 +- .../System/IO/FileStreamHelpers.Windows.cs | 38 +- 4 files changed, 682 insertions(+), 32 deletions(-) create mode 100644 src/libraries/System.Private.CoreLib/src/System/IO/AsyncWindowsFileStreamStrategy.cs diff --git a/src/libraries/System.Private.CoreLib/src/System.Private.CoreLib.Shared.projitems b/src/libraries/System.Private.CoreLib/src/System.Private.CoreLib.Shared.projitems index 9271b335aa5ae..036326725ee34 100644 --- a/src/libraries/System.Private.CoreLib/src/System.Private.CoreLib.Shared.projitems +++ b/src/libraries/System.Private.CoreLib/src/System.Private.CoreLib.Shared.projitems @@ -388,6 +388,7 @@ + @@ -1623,6 +1624,7 @@ + @@ -1630,6 +1632,7 @@ + diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/AsyncWindowsFileStreamStrategy.cs b/src/libraries/System.Private.CoreLib/src/System/IO/AsyncWindowsFileStreamStrategy.cs new file mode 100644 index 0000000000000..e1ede8b489aed --- /dev/null +++ b/src/libraries/System.Private.CoreLib/src/System/IO/AsyncWindowsFileStreamStrategy.cs @@ -0,0 +1,654 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Buffers; +using System.Diagnostics; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Win32.SafeHandles; +using System.Runtime.CompilerServices; + +/* + * Win32FileStream supports different modes of accessing the disk - async mode + * and sync mode. They are two completely different codepaths in the + * sync & async methods (i.e. Read/Write vs. ReadAsync/WriteAsync). File + * handles in NT can be opened in only sync or overlapped (async) mode, + * and we have to deal with this pain. Stream has implementations of + * the sync methods in terms of the async ones, so we'll + * call through to our base class to get those methods when necessary. + * + * Also buffering is added into Win32FileStream as well. Folded in the + * code from BufferedStream, so all the comments about it being mostly + * aggressive (and the possible perf improvement) apply to Win32FileStream as + * well. Also added some buffering to the async code paths. + * + * Class Invariants: + * The class has one buffer, shared for reading & writing. It can only be + * used for one or the other at any point in time - not both. The following + * should be true: + * 0 <= _readPos <= _readLen < _bufferSize + * 0 <= _writePos < _bufferSize + * _readPos == _readLen && _readPos > 0 implies the read buffer is valid, + * but we're at the end of the buffer. + * _readPos == _readLen == 0 means the read buffer contains garbage. + * Either _writePos can be greater than 0, or _readLen & _readPos can be + * greater than zero, but neither can be greater than zero at the same time. + * + */ + +namespace System.IO +{ + internal sealed partial class AsyncWindowsFileStreamStrategy : WindowsFileStreamStrategy + { + private static readonly unsafe IOCompletionCallback s_ioCallback = FileStreamCompletionSource.IOCallback; + + private PreAllocatedOverlapped? _preallocatedOverlapped; // optimization for async ops to avoid per-op allocations + private FileStreamCompletionSource? _currentOverlappedOwner; // async op currently using the preallocated overlapped + + internal AsyncWindowsFileStreamStrategy(SafeFileHandle handle, FileAccess access) + : base(handle, access) + { + } + + internal AsyncWindowsFileStreamStrategy(string path, FileMode mode, FileAccess access, FileShare share, FileOptions options) + : base(path, mode, access, share, options) + { + } + + internal override bool IsAsync => true; + + public override ValueTask DisposeAsync() + { + // the order matters, let the base class Dispose handle first + ValueTask result = base.DisposeAsync(); + Debug.Assert(result.IsCompleted, "the method must be sync, as it performs no flushing"); + + _preallocatedOverlapped?.Dispose(); + + return result; + } + + protected override void Dispose(bool disposing) + { + // the order matters, let the base class Dispose handle first + base.Dispose(disposing); + + _preallocatedOverlapped?.Dispose(); + } + + protected override void OnInitFromHandle(SafeFileHandle handle) + { + // This is necessary for async IO using IO Completion ports via our + // managed Threadpool API's. This calls the OS's + // BindIoCompletionCallback method, and passes in a stub for the + // LPOVERLAPPED_COMPLETION_ROUTINE. This stub looks at the Overlapped + // struct for this request and gets a delegate to a managed callback + // from there, which it then calls on a threadpool thread. (We allocate + // our native OVERLAPPED structs 2 pointers too large and store EE + // state & a handle to a delegate there.) + // + // If, however, we've already bound this file handle to our completion port, + // don't try to bind it again because it will fail. A handle can only be + // bound to a single completion port at a time. + if (!(handle.IsAsync ?? false)) + { + try + { + handle.ThreadPoolBinding = ThreadPoolBoundHandle.BindHandle(handle); + } + catch (Exception ex) + { + // If you passed in a synchronous handle and told us to use + // it asynchronously, throw here. + throw new ArgumentException(SR.Arg_HandleNotAsync, nameof(handle), ex); + } + } + } + + protected override void OnInit() + { + // This is necessary for async IO using IO Completion ports via our + // managed Threadpool API's. This (theoretically) calls the OS's + // BindIoCompletionCallback method, and passes in a stub for the + // LPOVERLAPPED_COMPLETION_ROUTINE. This stub looks at the Overlapped + // struct for this request and gets a delegate to a managed callback + // from there, which it then calls on a threadpool thread. (We allocate + // our native OVERLAPPED structs 2 pointers too large and store EE state + // & GC handles there, one to an IAsyncResult, the other to a delegate.) + try + { + _fileHandle.ThreadPoolBinding = ThreadPoolBoundHandle.BindHandle(_fileHandle); + } + catch (ArgumentException ex) + { + throw new IOException(SR.IO_BindHandleFailed, ex); + } + finally + { + if (_fileHandle.ThreadPoolBinding == null) + { + // We should close the handle so that the handle is not open until SafeFileHandle GC + Debug.Assert(!_exposedHandle, "Are we closing handle that we exposed/not own, how?"); + _fileHandle.Dispose(); + } + } + } + + // called by BufferedStream. TODO: find a cleaner solution + internal void OnBufferAllocated(byte[] buffer) + { + Debug.Assert(buffer != null); + Debug.Assert(_preallocatedOverlapped == null); + + _preallocatedOverlapped = new PreAllocatedOverlapped(s_ioCallback, this, buffer); + _buffer = buffer; + } + + public override int Read(byte[] buffer, int offset, int count) + => ReadAsyncInternal(new Memory(buffer, offset, count)).GetAwaiter().GetResult(); + + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + => ReadAsyncInternal(new Memory(buffer, offset, count), cancellationToken); + + public override ValueTask ReadAsync(Memory destination, CancellationToken cancellationToken = default) + => new ValueTask(ReadAsyncInternal(destination, cancellationToken)); + + private unsafe Task ReadAsyncInternal(Memory destination, CancellationToken cancellationToken = default) + { + Debug.Assert(CanRead, "BufferedStream has already verified that"); + Debug.Assert(!_fileHandle.IsClosed, "!_handle.IsClosed"); + + // Create and store async stream class library specific data in the async result + FileStreamCompletionSource completionSource = FileStreamCompletionSource.Create(this, 0, destination); + NativeOverlapped* intOverlapped = completionSource.Overlapped; + + // Calculate position in the file we should be at after the read is done + if (CanSeek) + { + long len = Length; + + // Make sure we are reading from the position that we think we are + VerifyOSHandlePosition(); + + if (_filePosition + destination.Length > len) + { + if (_filePosition <= len) + { + destination = destination.Slice(0, (int)(len - _filePosition)); + } + else + { + destination = default; + } + } + + // Now set the position to read from in the NativeOverlapped struct + // For pipes, we should leave the offset fields set to 0. + intOverlapped->OffsetLow = unchecked((int)_filePosition); + intOverlapped->OffsetHigh = (int)(_filePosition >> 32); + + // When using overlapped IO, the OS is not supposed to + // touch the file pointer location at all. We will adjust it + // ourselves. This isn't threadsafe. + + // WriteFile should not update the file pointer when writing + // in overlapped mode, according to MSDN. But it does update + // the file pointer when writing to a UNC path! + // So changed the code below to seek to an absolute + // location, not a relative one. ReadFile seems consistent though. + SeekCore(_fileHandle, destination.Length, SeekOrigin.Current); + } + + // queue an async ReadFile operation and pass in a packed overlapped + int r = ReadFileNative(_fileHandle, destination.Span, intOverlapped, out int errorCode); + + // ReadFile, the OS version, will return 0 on failure. But + // my ReadFileNative wrapper returns -1. My wrapper will return + // the following: + // On error, r==-1. + // On async requests that are still pending, r==-1 w/ errorCode==ERROR_IO_PENDING + // on async requests that completed sequentially, r==0 + // You will NEVER RELIABLY be able to get the number of bytes + // read back from this call when using overlapped structures! You must + // not pass in a non-null lpNumBytesRead to ReadFile when using + // overlapped structures! This is by design NT behavior. + if (r == -1) + { + // For pipes, when they hit EOF, they will come here. + if (errorCode == ERROR_BROKEN_PIPE) + { + // Not an error, but EOF. AsyncFSCallback will NOT be + // called. Call the user callback here. + + // We clear the overlapped status bit for this special case. + // Failure to do so looks like we are freeing a pending overlapped later. + intOverlapped->InternalLow = IntPtr.Zero; + completionSource.SetCompletedSynchronously(0); + } + else if (errorCode != ERROR_IO_PENDING) + { + if (!_fileHandle.IsClosed && CanSeek) // Update Position - It could be anywhere. + { + SeekCore(_fileHandle, 0, SeekOrigin.Current); + } + + completionSource.ReleaseNativeResource(); + + if (errorCode == ERROR_HANDLE_EOF) + { + throw Error.GetEndOfFile(); + } + else + { + throw Win32Marshal.GetExceptionForWin32Error(errorCode, _path); + } + } + else if (cancellationToken.CanBeCanceled) // ERROR_IO_PENDING + { + // Only once the IO is pending do we register for cancellation + completionSource.RegisterForCancellation(cancellationToken); + } + } + else + { + // Due to a workaround for a race condition in NT's ReadFile & + // WriteFile routines, we will always be returning 0 from ReadFileNative + // when we do async IO instead of the number of bytes read, + // irregardless of whether the operation completed + // synchronously or asynchronously. We absolutely must not + // set asyncResult._numBytes here, since will never have correct + // results. + } + + return completionSource.Task; + } + + public override void Write(byte[] buffer, int offset, int count) + => WriteAsyncInternal(new ReadOnlyMemory(buffer, offset, count), CancellationToken.None).AsTask().GetAwaiter().GetResult(); + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + => WriteAsyncInternal(new ReadOnlyMemory(buffer, offset, count), cancellationToken).AsTask(); + + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + => WriteAsyncInternal(buffer, cancellationToken); + + // Instance method to help code external to this MarshalByRefObject avoid + // accessing its fields by ref. This avoids a compiler warning. + private FileStreamCompletionSource? CompareExchangeCurrentOverlappedOwner(FileStreamCompletionSource? newSource, FileStreamCompletionSource? existingSource) => + Interlocked.CompareExchange(ref _currentOverlappedOwner, newSource, existingSource); + + private ValueTask WriteAsyncInternal(ReadOnlyMemory source, CancellationToken cancellationToken) + => new ValueTask(WriteAsyncInternalCore(source, cancellationToken)); + + private unsafe Task WriteAsyncInternalCore(ReadOnlyMemory source, CancellationToken cancellationToken) + { + Debug.Assert(CanWrite, "BufferedStream has already verified that"); + Debug.Assert(!_fileHandle.IsClosed, "!_handle.IsClosed"); + + // Create and store async stream class library specific data in the async result + FileStreamCompletionSource completionSource = FileStreamCompletionSource.Create(this, 0, source); + NativeOverlapped* intOverlapped = completionSource.Overlapped; + + if (CanSeek) + { + // Make sure we set the length of the file appropriately. + long len = Length; + + // Make sure we are writing to the position that we think we are + VerifyOSHandlePosition(); + + if (_filePosition + source.Length > len) + { + SetLengthCore(_filePosition + source.Length); + } + + // Now set the position to read from in the NativeOverlapped struct + // For pipes, we should leave the offset fields set to 0. + intOverlapped->OffsetLow = (int)_filePosition; + intOverlapped->OffsetHigh = (int)(_filePosition >> 32); + + // When using overlapped IO, the OS is not supposed to + // touch the file pointer location at all. We will adjust it + // ourselves. This isn't threadsafe. + SeekCore(_fileHandle, source.Length, SeekOrigin.Current); + } + + // queue an async WriteFile operation and pass in a packed overlapped + int r = WriteFileNative(_fileHandle, source.Span, intOverlapped, out int errorCode); + + // WriteFile, the OS version, will return 0 on failure. But + // my WriteFileNative wrapper returns -1. My wrapper will return + // the following: + // On error, r==-1. + // On async requests that are still pending, r==-1 w/ errorCode==ERROR_IO_PENDING + // On async requests that completed sequentially, r==0 + // You will NEVER RELIABLY be able to get the number of bytes + // written back from this call when using overlapped IO! You must + // not pass in a non-null lpNumBytesWritten to WriteFile when using + // overlapped structures! This is ByDesign NT behavior. + if (r == -1) + { + // For pipes, when they are closed on the other side, they will come here. + if (errorCode == ERROR_NO_DATA) + { + // Not an error, but EOF. AsyncFSCallback will NOT be called. + // Completing TCS and return cached task allowing the GC to collect TCS. + completionSource.SetCompletedSynchronously(0); + return Task.CompletedTask; + } + else if (errorCode != ERROR_IO_PENDING) + { + if (!_fileHandle.IsClosed && CanSeek) // Update Position - It could be anywhere. + { + SeekCore(_fileHandle, 0, SeekOrigin.Current); + } + + completionSource.ReleaseNativeResource(); + + if (errorCode == ERROR_HANDLE_EOF) + { + throw Error.GetEndOfFile(); + } + else + { + throw Win32Marshal.GetExceptionForWin32Error(errorCode, _path); + } + } + else if (cancellationToken.CanBeCanceled) // ERROR_IO_PENDING + { + // Only once the IO is pending do we register for cancellation + completionSource.RegisterForCancellation(cancellationToken); + } + } + else + { + // Due to a workaround for a race condition in NT's ReadFile & + // WriteFile routines, we will always be returning 0 from WriteFileNative + // when we do async IO instead of the number of bytes written, + // irregardless of whether the operation completed + // synchronously or asynchronously. We absolutely must not + // set asyncResult._numBytes here, since will never have correct + // results. + } + + return completionSource.Task; + } + + public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) + { + ValidateCopyToArguments(destination, bufferSize); + + // Fail if the file was closed + if (_fileHandle.IsClosed) + { + throw Error.GetFileNotOpen(); + } + if (!CanRead) + { + throw Error.GetReadNotSupported(); + } + + // Bail early for cancellation if cancellation has been requested + if (cancellationToken.IsCancellationRequested) + { + return Task.FromCanceled(cancellationToken); + } + + return AsyncModeCopyToAsync(destination, bufferSize, cancellationToken); + } + + private async Task AsyncModeCopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) + { + Debug.Assert(!_fileHandle.IsClosed, "!_handle.IsClosed"); + Debug.Assert(CanRead, "_parent.CanRead"); + + // For efficiency, we avoid creating a new task and associated state for each asynchronous read. + // Instead, we create a single reusable awaitable object that will be triggered when an await completes + // and reset before going again. + var readAwaitable = new AsyncCopyToAwaitable(this); + + // Make sure we are reading from the position that we think we are. + // Only set the position in the awaitable if we can seek (e.g. not for pipes). + bool canSeek = CanSeek; + if (canSeek) + { + VerifyOSHandlePosition(); + readAwaitable._position = _filePosition; + } + + // Get the buffer to use for the copy operation, as the base CopyToAsync does. We don't try to use + // _buffer here, even if it's not null, as concurrent operations are allowed, and another operation may + // actually be using the buffer already. Plus, it'll be rare for _buffer to be non-null, as typically + // CopyToAsync is used as the only operation performed on the stream, and the buffer is lazily initialized. + // Further, typically the CopyToAsync buffer size will be larger than that used by the FileStream, such that + // we'd likely be unable to use it anyway. Instead, we rent the buffer from a pool. + byte[] copyBuffer = ArrayPool.Shared.Rent(bufferSize); + + // Allocate an Overlapped we can use repeatedly for all operations + var awaitableOverlapped = new PreAllocatedOverlapped(AsyncCopyToAwaitable.s_callback, readAwaitable, copyBuffer); + var cancellationReg = default(CancellationTokenRegistration); + try + { + // Register for cancellation. We do this once for the whole copy operation, and just try to cancel + // whatever read operation may currently be in progress, if there is one. It's possible the cancellation + // request could come in between operations, in which case we flag that with explicit calls to ThrowIfCancellationRequested + // in the read/write copy loop. + if (cancellationToken.CanBeCanceled) + { + cancellationReg = cancellationToken.UnsafeRegister(static s => + { + Debug.Assert(s is AsyncCopyToAwaitable); + var innerAwaitable = (AsyncCopyToAwaitable)s; + unsafe + { + lock (innerAwaitable.CancellationLock) // synchronize with cleanup of the overlapped + { + if (innerAwaitable._nativeOverlapped != null) + { + // Try to cancel the I/O. We ignore the return value, as cancellation is opportunistic and we + // don't want to fail the operation because we couldn't cancel it. + Interop.Kernel32.CancelIoEx(innerAwaitable._fileStream._fileHandle, innerAwaitable._nativeOverlapped); + } + } + } + }, readAwaitable); + } + + // Repeatedly read from this FileStream and write the results to the destination stream. + while (true) + { + cancellationToken.ThrowIfCancellationRequested(); + readAwaitable.ResetForNextOperation(); + + try + { + bool synchronousSuccess; + int errorCode; + unsafe + { + // Allocate a native overlapped for our reusable overlapped, and set position to read based on the next + // desired address stored in the awaitable. (This position may be 0, if either we're at the beginning or + // if the stream isn't seekable.) + readAwaitable._nativeOverlapped = _fileHandle.ThreadPoolBinding!.AllocateNativeOverlapped(awaitableOverlapped); + if (canSeek) + { + readAwaitable._nativeOverlapped->OffsetLow = unchecked((int)readAwaitable._position); + readAwaitable._nativeOverlapped->OffsetHigh = (int)(readAwaitable._position >> 32); + } + + // Kick off the read. + synchronousSuccess = ReadFileNative(_fileHandle, copyBuffer, readAwaitable._nativeOverlapped, out errorCode) >= 0; + } + + // If the operation did not synchronously succeed, it either failed or initiated the asynchronous operation. + if (!synchronousSuccess) + { + switch (errorCode) + { + case ERROR_IO_PENDING: + // Async operation in progress. + break; + case ERROR_BROKEN_PIPE: + case ERROR_HANDLE_EOF: + // We're at or past the end of the file, and the overlapped callback + // won't be raised in these cases. Mark it as completed so that the await + // below will see it as such. + readAwaitable.MarkCompleted(); + break; + default: + // Everything else is an error (and there won't be a callback). + throw Win32Marshal.GetExceptionForWin32Error(errorCode, _path); + } + } + + // Wait for the async operation (which may or may not have already completed), then throw if it failed. + await readAwaitable; + switch (readAwaitable._errorCode) + { + case 0: // success + break; + case ERROR_BROKEN_PIPE: // logically success with 0 bytes read (write end of pipe closed) + case ERROR_HANDLE_EOF: // logically success with 0 bytes read (read at end of file) + Debug.Assert(readAwaitable._numBytes == 0, $"Expected 0 bytes read, got {readAwaitable._numBytes}"); + break; + case Interop.Errors.ERROR_OPERATION_ABORTED: // canceled + throw new OperationCanceledException(cancellationToken.IsCancellationRequested ? cancellationToken : new CancellationToken(true)); + default: // error + throw Win32Marshal.GetExceptionForWin32Error((int)readAwaitable._errorCode, _path); + } + + // Successful operation. If we got zero bytes, we're done: exit the read/write loop. + int numBytesRead = (int)readAwaitable._numBytes; + if (numBytesRead == 0) + { + break; + } + + // Otherwise, update the read position for next time accordingly. + if (canSeek) + { + readAwaitable._position += numBytesRead; + } + } + finally + { + // Free the resources for this read operation + unsafe + { + NativeOverlapped* overlapped; + lock (readAwaitable.CancellationLock) // just an Exchange, but we need this to be synchronized with cancellation, so using the same lock + { + overlapped = readAwaitable._nativeOverlapped; + readAwaitable._nativeOverlapped = null; + } + if (overlapped != null) + { + _fileHandle.ThreadPoolBinding!.FreeNativeOverlapped(overlapped); + } + } + } + + // Write out the read data. + await destination.WriteAsync(new ReadOnlyMemory(copyBuffer, 0, (int)readAwaitable._numBytes), cancellationToken).ConfigureAwait(false); + } + } + finally + { + // Cleanup from the whole copy operation + cancellationReg.Dispose(); + awaitableOverlapped.Dispose(); + + ArrayPool.Shared.Return(copyBuffer); + + // Make sure the stream's current position reflects where we ended up + if (!_fileHandle.IsClosed && CanSeek) + { + SeekCore(_fileHandle, 0, SeekOrigin.End); + } + } + } + + /// Used by CopyToAsync to enable awaiting the result of an overlapped I/O operation with minimal overhead. + private sealed unsafe class AsyncCopyToAwaitable : ICriticalNotifyCompletion + { + /// Sentinel object used to indicate that the I/O operation has completed before being awaited. + private static readonly Action s_sentinel = () => { }; + /// Cached delegate to IOCallback. + internal static readonly IOCompletionCallback s_callback = IOCallback; + + /// The FileStream that owns this instance. + internal readonly AsyncWindowsFileStreamStrategy _fileStream; + + /// Tracked position representing the next location from which to read. + internal long _position; + /// The current native overlapped pointer. This changes for each operation. + internal NativeOverlapped* _nativeOverlapped; + /// + /// null if the operation is still in progress, + /// s_sentinel if the I/O operation completed before the await, + /// s_callback if it completed after the await yielded. + /// + internal Action? _continuation; + /// Last error code from completed operation. + internal uint _errorCode; + /// Last number of read bytes from completed operation. + internal uint _numBytes; + + /// Lock object used to protect cancellation-related access to _nativeOverlapped. + internal object CancellationLock => this; + + /// Initialize the awaitable. + internal AsyncCopyToAwaitable(AsyncWindowsFileStreamStrategy fileStream) + { + _fileStream = fileStream; + } + + /// Reset state to prepare for the next read operation. + internal void ResetForNextOperation() + { + Debug.Assert(_position >= 0, $"Expected non-negative position, got {_position}"); + _continuation = null; + _errorCode = 0; + _numBytes = 0; + } + + /// Overlapped callback: store the results, then invoke the continuation delegate. + internal static void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* pOVERLAP) + { + var awaitable = (AsyncCopyToAwaitable?)ThreadPoolBoundHandle.GetNativeOverlappedState(pOVERLAP); + Debug.Assert(awaitable != null); + + Debug.Assert(!ReferenceEquals(awaitable._continuation, s_sentinel), "Sentinel must not have already been set as the continuation"); + awaitable._errorCode = errorCode; + awaitable._numBytes = numBytes; + + (awaitable._continuation ?? Interlocked.CompareExchange(ref awaitable._continuation, s_sentinel, null))?.Invoke(); + } + + /// + /// Called when it's known that the I/O callback for an operation will not be invoked but we'll + /// still be awaiting the awaitable. + /// + internal void MarkCompleted() + { + Debug.Assert(_continuation == null, "Expected null continuation"); + _continuation = s_sentinel; + } + + public AsyncCopyToAwaitable GetAwaiter() => this; + public bool IsCompleted => ReferenceEquals(_continuation, s_sentinel); + public void GetResult() { } + public void OnCompleted(Action continuation) => UnsafeOnCompleted(continuation); + public void UnsafeOnCompleted(Action continuation) + { + if (ReferenceEquals(_continuation, s_sentinel) || + Interlocked.CompareExchange(ref _continuation, continuation, null) != null) + { + Debug.Assert(ReferenceEquals(_continuation, s_sentinel), $"Expected continuation set to s_sentinel, got ${_continuation}"); + Task.Run(continuation); + } + } + } + } +} diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/FileStreamCompletionSource.Win32.cs b/src/libraries/System.Private.CoreLib/src/System/IO/FileStreamCompletionSource.Win32.cs index fb30f40a30f1b..f2f0114884c5a 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/FileStreamCompletionSource.Win32.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/FileStreamCompletionSource.Win32.cs @@ -9,8 +9,13 @@ namespace System.IO { - internal sealed partial class WindowsFileStreamStrategy : FileStreamStrategyBase + internal sealed partial class AsyncWindowsFileStreamStrategy : WindowsFileStreamStrategy { + /// + /// owned by BufferedStream, do NOT use outside of FileStreamCompletionSource + /// + private object? _buffer; + // This is an internal object extending TaskCompletionSource with fields // for all of the relevant data necessary to complete the IO operation. // This is used by IOCallback and all of the async methods. @@ -25,7 +30,7 @@ private unsafe class FileStreamCompletionSource : TaskCompletionSource private static Action? s_cancelCallback; - private readonly WindowsFileStreamStrategy _stream; + private readonly AsyncWindowsFileStreamStrategy _stream; private readonly int _numBufferedBytes; private CancellationTokenRegistration _cancellationRegistration; #if DEBUG @@ -35,7 +40,7 @@ private unsafe class FileStreamCompletionSource : TaskCompletionSource private long _result; // Using long since this needs to be used in Interlocked APIs // Using RunContinuationsAsynchronously for compat reasons (old API used Task.Factory.StartNew for continuations) - protected FileStreamCompletionSource(WindowsFileStreamStrategy stream, int numBufferedBytes, byte[]? bytes) + protected FileStreamCompletionSource(AsyncWindowsFileStreamStrategy stream, int numBufferedBytes, byte[]? bytes) : base(TaskCreationOptions.RunContinuationsAsynchronously) { _numBufferedBytes = numBufferedBytes; @@ -132,8 +137,8 @@ internal static void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* // be directly the FileStreamCompletionSource that's completing (in the case where the preallocated // overlapped was already in use by another operation). object? state = ThreadPoolBoundHandle.GetNativeOverlappedState(pOverlapped); - Debug.Assert(state is WindowsFileStreamStrategy || state is FileStreamCompletionSource); - FileStreamCompletionSource completionSource = state is WindowsFileStreamStrategy fs ? + Debug.Assert(state is AsyncWindowsFileStreamStrategy || state is FileStreamCompletionSource); + FileStreamCompletionSource completionSource = state is AsyncWindowsFileStreamStrategy fs ? fs._currentOverlappedOwner! : // must be owned (FileStreamCompletionSource)state!; Debug.Assert(completionSource != null); @@ -220,7 +225,7 @@ private static void Cancel(object? state) } } - public static FileStreamCompletionSource Create(WindowsFileStreamStrategy stream, int numBufferedBytesRead, ReadOnlyMemory memory) + public static FileStreamCompletionSource Create(AsyncWindowsFileStreamStrategy stream, int numBufferedBytesRead, ReadOnlyMemory memory) { // If the memory passed in is the stream's internal buffer, we can use the base FileStreamCompletionSource, // which has a PreAllocatedOverlapped with the memory already pinned. Otherwise, we use the derived @@ -241,7 +246,7 @@ private sealed class MemoryFileStreamCompletionSource : FileStreamCompletionSour { private MemoryHandle _handle; // mutable struct; do not make this readonly - internal MemoryFileStreamCompletionSource(WindowsFileStreamStrategy stream, int numBufferedBytes, ReadOnlyMemory memory) : + internal MemoryFileStreamCompletionSource(AsyncWindowsFileStreamStrategy stream, int numBufferedBytes, ReadOnlyMemory memory) : base(stream, numBufferedBytes, bytes: null) // this type handles the pinning, so null is passed for bytes { _handle = memory.Pin(); diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/FileStreamHelpers.Windows.cs b/src/libraries/System.Private.CoreLib/src/System/IO/FileStreamHelpers.Windows.cs index 061b85dde852b..4898b792cf982 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/FileStreamHelpers.Windows.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/FileStreamHelpers.Windows.cs @@ -12,19 +12,21 @@ internal static class FileStreamHelpers { internal static FileStreamStrategy ChooseStrategy(FileStream fileStream, SafeFileHandle handle, FileAccess access, int bufferSize, bool isAsync) { - // the switch exitst to measure the overhead of introducing a factory method to the FileStream ctor - // we are going to have more implementations soon and then it's going to make more sense - switch (isAsync) - { - case true: - return new WindowsFileStreamStrategy(fileStream, handle, access, bufferSize, true); - case false: - return new WindowsFileStreamStrategy(fileStream, handle, access, bufferSize, false); - } + FileStreamStrategy actualStrategy = isAsync + ? new AsyncWindowsFileStreamStrategy(handle, access) + : new SyncWindowsFileStreamStrategy(handle, access); + + return new BufferedFileStreamStrategy(fileStream, actualStrategy, bufferSize); } internal static FileStreamStrategy ChooseStrategy(FileStream fileStream, string path, FileMode mode, FileAccess access, FileShare share, int bufferSize, FileOptions options) - => new WindowsFileStreamStrategy(fileStream, path, mode, access, share, bufferSize, options); + { + FileStreamStrategy actualStrategy = ((options & FileOptions.Asynchronous) != 0) + ? new AsyncWindowsFileStreamStrategy(path, mode, access, share, options) + : new SyncWindowsFileStreamStrategy(path, mode, access, share, options); + + return new BufferedFileStreamStrategy(fileStream, actualStrategy, bufferSize); + } internal static SafeFileHandle OpenHandle(string path, FileMode mode, FileAccess access, FileShare share, FileOptions options) => CreateFileOpenHandle(path, mode, access, share, options); @@ -68,7 +70,7 @@ internal static bool GetDefaultIsAsync(SafeFileHandle handle, bool defaultIsAsyn return handle.IsAsync ?? !IsHandleSynchronous(handle, ignoreInvalid: true) ?? defaultIsAsync; } - private static unsafe bool? IsHandleSynchronous(SafeFileHandle fileHandle, bool ignoreInvalid) + internal static unsafe bool? IsHandleSynchronous(SafeFileHandle fileHandle, bool ignoreInvalid) { if (fileHandle.IsInvalid) return null; @@ -106,20 +108,6 @@ internal static bool GetDefaultIsAsync(SafeFileHandle handle, bool defaultIsAsyn return (fileMode & (Interop.NtDll.FILE_SYNCHRONOUS_IO_ALERT | Interop.NtDll.FILE_SYNCHRONOUS_IO_NONALERT)) > 0; } - internal static void VerifyHandleIsSync(SafeFileHandle handle) - { - // As we can accurately check the handle type when we have access to NtQueryInformationFile we don't need to skip for - // any particular file handle type. - - // If the handle was passed in without an explicit async setting, we already looked it up in GetDefaultIsAsync - if (!handle.IsAsync.HasValue) - return; - - // If we can't check the handle, just assume it is ok. - if (!(IsHandleSynchronous(handle, ignoreInvalid: false) ?? true)) - throw new ArgumentException(SR.Arg_HandleNotSync, nameof(handle)); - } - private static unsafe Interop.Kernel32.SECURITY_ATTRIBUTES GetSecAttrs(FileShare share) { Interop.Kernel32.SECURITY_ATTRIBUTES secAttrs = default; From b8eb7f2213d68ba90f493b8a13cc3d5f155547a2 Mon Sep 17 00:00:00 2001 From: Adam Sitnik Date: Thu, 18 Feb 2021 10:34:43 +0100 Subject: [PATCH 06/10] adopt BufferedStream to FileStream requirements: - blocking 0 byte reads - passing _buffer to async strategy - perf --- .../tests/FileStream/SafeFileHandle.cs | 13 + .../src/System/IO/BufferedStream.cs | 263 +++++++++++++----- .../src/System/Threading/SemaphoreSlim.cs | 4 +- 3 files changed, 203 insertions(+), 77 deletions(-) diff --git a/src/libraries/System.IO.FileSystem/tests/FileStream/SafeFileHandle.cs b/src/libraries/System.IO.FileSystem/tests/FileStream/SafeFileHandle.cs index 04773b5a8df72..2c5b428e644ba 100644 --- a/src/libraries/System.IO.FileSystem/tests/FileStream/SafeFileHandle.cs +++ b/src/libraries/System.IO.FileSystem/tests/FileStream/SafeFileHandle.cs @@ -37,6 +37,19 @@ public void DisposeClosesHandle() } } + [Fact] + public void DisposingBufferedStreamThatWrapsAFileStreamWhichHasBennClosedViaSafeFileHandleCloseDoesNotThrow() + { + using (FileStream fs = new FileStream(GetTestFilePath(), FileMode.Create)) + { + var bufferedStream = new BufferedStream(fs, 100); + + fs.SafeFileHandle.Dispose(); + + bufferedStream.Dispose(); // must not throw + } + } + [Fact] public void AccessFlushesFileClosesHandle() { diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/BufferedStream.cs b/src/libraries/System.Private.CoreLib/src/System/IO/BufferedStream.cs index bc1ea76470377..f72907fd6bb4e 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/BufferedStream.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/BufferedStream.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. using System.Diagnostics; +using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; @@ -51,6 +52,7 @@ public sealed class BufferedStream : Stream private Stream? _stream; // Underlying stream. Close sets _stream to null. private byte[]? _buffer; // Shared read/write buffer. Alloc on first use. private readonly int _bufferSize; // Length of internal buffer (not counting the shadow buffer). + private readonly bool _allowForZeroByteReads; private int _readPos; // Read pointer within shared buffer. private int _readLen; // Number of bytes read in buffer from _stream. private int _writePos; // Write pointer within shared buffer. @@ -82,10 +84,17 @@ public BufferedStream(Stream stream, int bufferSize) throw new ObjectDisposedException(null, SR.ObjectDisposed_StreamClosed); } + internal BufferedStream(Stream stream, int bufferSize, bool allowForZeroByteReads) : this(stream, bufferSize) + { + _allowForZeroByteReads = allowForZeroByteReads; + } + private void EnsureNotClosed() { if (_stream == null) - throw new ObjectDisposedException(null, SR.ObjectDisposed_StreamClosed); + Throw(); + + static void Throw() => throw new ObjectDisposedException(null, SR.ObjectDisposed_StreamClosed); } private void EnsureCanSeek() @@ -93,7 +102,9 @@ private void EnsureCanSeek() Debug.Assert(_stream != null); if (!_stream.CanSeek) - throw new NotSupportedException(SR.NotSupported_UnseekableStream); + Throw(); + + static void Throw() => throw new NotSupportedException(SR.NotSupported_UnseekableStream); } private void EnsureCanRead() @@ -101,7 +112,9 @@ private void EnsureCanRead() Debug.Assert(_stream != null); if (!_stream.CanRead) - throw new NotSupportedException(SR.NotSupported_UnreadableStream); + Throw(); + + static void Throw() => throw new NotSupportedException(SR.NotSupported_UnreadableStream); } private void EnsureCanWrite() @@ -109,7 +122,9 @@ private void EnsureCanWrite() Debug.Assert(_stream != null); if (!_stream.CanWrite) - throw new NotSupportedException(SR.NotSupported_UnwritableStream); + Throw(); + + static void Throw() => throw new NotSupportedException(SR.NotSupported_UnwritableStream); } private void EnsureShadowBufferAllocated() @@ -127,13 +142,24 @@ private void EnsureShadowBufferAllocated() _buffer = shadowBuffer; } + [MethodImpl(MethodImplOptions.AggressiveInlining)] private void EnsureBufferAllocated() { Debug.Assert(_bufferSize > 0); // BufferedStream is not intended for multi-threaded use, so no worries about the get/set race on _buffer. if (_buffer == null) + { _buffer = new byte[_bufferSize]; + +#if TARGET_WINDOWS + // it's quite ugly, but I currently don't have a better idea + if (_stream is AsyncWindowsFileStreamStrategy asyncWindowsFileStreamStrategy) + { + asyncWindowsFileStreamStrategy.OnBufferAllocated(_buffer); + } +#endif + } } public Stream UnderlyingStream @@ -184,7 +210,7 @@ public override long Length EnsureNotClosed(); if (_writePos > 0) - FlushWrite(); + FlushWrite(true); return _stream!.Length; } @@ -209,7 +235,7 @@ public override long Position EnsureCanSeek(); if (_writePos > 0) - FlushWrite(); + FlushWrite(true); _readPos = 0; _readLen = 0; @@ -217,6 +243,10 @@ public override long Position } } + internal bool HasSomeDataInTheBuffer => _buffer != null && (_writePos > 0 || _readPos < _readLen); + + internal void DisposeInternal(bool disposing) => Dispose(disposing); + protected override void Dispose(bool disposing) { try @@ -247,7 +277,8 @@ public override async ValueTask DisposeAsync() { try { - if (_stream != null) + Stream? stream = _stream; + if (stream != null) { try { @@ -255,7 +286,7 @@ public override async ValueTask DisposeAsync() } finally { - await _stream.DisposeAsync().ConfigureAwait(false); + await stream.DisposeAsync().ConfigureAwait(false); } } } @@ -266,16 +297,24 @@ public override async ValueTask DisposeAsync() } } - public override void Flush() + public override void Flush() => Flush(true); + + internal void Flush(bool performActualFlush) { EnsureNotClosed(); // Has write data in the buffer: if (_writePos > 0) { - FlushWrite(); - Debug.Assert(_writePos == 0 && _readPos == 0 && _readLen == 0); - return; + // EnsureNotClosed does not guarantee that the Stream has not been closed + // an example could be a call to fileStream.SafeFileHandle.Dispose() + // so to avoid getting exception here, we just ensure that we can Write before doing it + if (_stream!.CanWrite) + { + FlushWrite(performActualFlush); + Debug.Assert(_writePos == 0 && _readPos == 0 && _readLen == 0); + return; + } } // Has read data in the buffer: @@ -292,7 +331,7 @@ public override void Flush() // User streams may have opted to throw from Flush if CanWrite is false (although the abstract Stream does not do so). // However, if we do not forward the Flush to the underlying stream, we may have problems when chaining several streams. // Let us make a best effort attempt: - if (_stream.CanWrite) + if (performActualFlush && _stream.CanWrite) _stream.Flush(); // If the Stream was seekable, then we should have called FlushRead which resets _readPos & _readLen. @@ -301,7 +340,7 @@ public override void Flush() } // We had no data in the buffer, but we still need to tell the underlying stream to flush. - if (_stream!.CanWrite) + if (performActualFlush && _stream!.CanWrite) _stream.Flush(); _writePos = _readPos = _readLen = 0; @@ -384,6 +423,7 @@ private void FlushRead() /// /// Called by Write methods to clear the Read Buffer /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] private void ClearReadBufferBeforeWrite() { Debug.Assert(_stream != null); @@ -403,12 +443,14 @@ private void ClearReadBufferBeforeWrite() // However, since the user did not call a method that is intuitively expected to seek, a better message is in order. // Ideally, we would throw an InvalidOperation here, but for backward compat we have to stick with NotSupported. if (!_stream.CanSeek) - throw new NotSupportedException(SR.NotSupported_CannotWriteToBufferedStreamIfReadBufferCannotBeFlushed); + Throw(); FlushRead(); + + static void Throw() => throw new NotSupportedException(SR.NotSupported_CannotWriteToBufferedStreamIfReadBufferCannotBeFlushed); } - private void FlushWrite() + private void FlushWrite(bool performActualFlush) { Debug.Assert(_stream != null); Debug.Assert(_readPos == 0 && _readLen == 0, @@ -418,7 +460,11 @@ private void FlushWrite() _stream.Write(_buffer, 0, _writePos); _writePos = 0; - _stream.Flush(); + + if (performActualFlush) + { + _stream.Flush(); + } } private async ValueTask FlushWriteAsync(CancellationToken cancellationToken) @@ -434,6 +480,7 @@ private async ValueTask FlushWriteAsync(CancellationToken cancellationToken) await _stream.FlushAsync(cancellationToken).ConfigureAwait(false); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] private int ReadFromBuffer(byte[] buffer, int offset, int count) { int readbytes = _readLen - _readPos; @@ -493,7 +540,7 @@ public override int Read(byte[] buffer, int offset, int count) // BUT - this is a breaking change. // So: If we could not read all bytes the user asked for from the buffer, we will try once from the underlying // stream thus ensuring the same blocking behaviour as if the underlying stream was not wrapped in this BufferedStream. - if (bytesFromBuffer == count) + if (bytesFromBuffer == count && (count > 0 || !_allowForZeroByteReads)) return bytesFromBuffer; int alreadySatisfied = bytesFromBuffer; @@ -509,7 +556,7 @@ public override int Read(byte[] buffer, int offset, int count) // If there was anything in the write buffer, clear it. if (_writePos > 0) - FlushWrite(); + FlushWrite(true); // If the requested read is larger than buffer size, avoid the buffer and still use a single read: if (count >= _bufferSize) @@ -519,7 +566,7 @@ public override int Read(byte[] buffer, int offset, int count) // Ok. We can fill the buffer: EnsureBufferAllocated(); - _readLen = _stream.Read(_buffer!, 0, _bufferSize); + _readLen = _stream.Read(_buffer!, 0, _buffer!.Length); bytesFromBuffer = ReadFromBuffer(buffer, offset, count); @@ -541,7 +588,7 @@ public override int Read(Span destination) // Try to read from the buffer. int bytesFromBuffer = ReadFromBuffer(destination); - if (bytesFromBuffer == destination.Length) + if (bytesFromBuffer == destination.Length && (destination.Length > 0 || !_allowForZeroByteReads)) { // We got as many bytes as were asked for; we're done. return bytesFromBuffer; @@ -561,7 +608,7 @@ public override int Read(Span destination) // If there was anything in the write buffer, clear it. if (_writePos > 0) { - FlushWrite(); + FlushWrite(true); } if (destination.Length >= _bufferSize) @@ -574,7 +621,7 @@ public override int Read(Span destination) { // Otherwise, fill the buffer, then read from that. EnsureBufferAllocated(); - _readLen = _stream.Read(_buffer!, 0, _bufferSize); + _readLen = _stream.Read(_buffer!, 0, _buffer!.Length); return ReadFromBuffer(destination) + bytesFromBuffer; } } @@ -609,13 +656,13 @@ public override Task ReadAsync(byte[] buffer, int offset, int count, Cancel // an Async operation. SemaphoreSlim sem = EnsureAsyncActiveSemaphoreInitialized(); Task semaphoreLockTask = sem.WaitAsync(cancellationToken); - if (semaphoreLockTask.IsCompletedSuccessfully) + bool locked = semaphoreLockTask.IsCompletedSuccessfully; + if (locked) { - bool completeSynchronously = true; - try + // hot path #1: there is data in the buffer + if (_readLen - _readPos > 0 || (count == 0 && !_allowForZeroByteReads)) { - Exception? error; - bytesFromBuffer = ReadFromBuffer(buffer, offset, count, out error); + bytesFromBuffer = ReadFromBuffer(buffer, offset, count, out Exception? error); // If we satisfied enough data from the buffer, we can complete synchronously. // Reading again for more data may cause us to block if we're using a device with no clear end of file, @@ -624,27 +671,35 @@ public override Task ReadAsync(byte[] buffer, int offset, int count, Cancel // BUT - this is a breaking change. // So: If we could not read all bytes the user asked for from the buffer, we will try once from the underlying // stream thus ensuring the same blocking behaviour as if the underlying stream was not wrapped in this BufferedStream. - completeSynchronously = (bytesFromBuffer == count || error != null); - - if (completeSynchronously) + if (bytesFromBuffer == count || error != null) { + // if the above is false, we will be entering ReadFromUnderlyingStreamAsync and releasing there. + sem.Release(); return (error == null) - ? LastSyncCompletedReadTask(bytesFromBuffer) - : Task.FromException(error); + ? LastSyncCompletedReadTask(bytesFromBuffer) + : Task.FromException(error); } } - finally + // hot path #2: there is nothing to Flush and buffering would not be beneficial + else if (_writePos == 0 && count >= _bufferSize) { - if (completeSynchronously) // if this is FALSE, we will be entering ReadFromUnderlyingStreamAsync and releasing there. - sem.Release(); + _readPos = _readLen = 0; + + // start the async operation + ValueTask result = _stream!.ReadAsync(new Memory(buffer, offset, count), cancellationToken); + + // release the lock (we are not using shared state anymore) + sem.Release(); + + return result.AsTask(); } } // Delegate to the async implementation. return ReadFromUnderlyingStreamAsync( new Memory(buffer, offset + bytesFromBuffer, count - bytesFromBuffer), - cancellationToken, bytesFromBuffer, semaphoreLockTask).AsTask(); + cancellationToken, bytesFromBuffer, semaphoreLockTask, locked).AsTask(); } public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) @@ -660,38 +715,51 @@ public override ValueTask ReadAsync(Memory buffer, CancellationToken int bytesFromBuffer = 0; SemaphoreSlim sem = EnsureAsyncActiveSemaphoreInitialized(); Task semaphoreLockTask = sem.WaitAsync(cancellationToken); - if (semaphoreLockTask.IsCompletedSuccessfully) + bool locked = semaphoreLockTask.IsCompletedSuccessfully; + if (locked) { - bool completeSynchronously = true; - try + // hot path #1: there is data in the buffer + if (_readLen - _readPos > 0 || (buffer.Length == 0 && !_allowForZeroByteReads)) { bytesFromBuffer = ReadFromBuffer(buffer.Span); - completeSynchronously = bytesFromBuffer == buffer.Length; - if (completeSynchronously) + + if (bytesFromBuffer == buffer.Length) { + // if above is FALSE, we will be entering ReadFromUnderlyingStreamAsync and releasing there. + sem.Release(); + // If we satisfied enough data from the buffer, we can complete synchronously. return new ValueTask(bytesFromBuffer); } + + buffer = buffer.Slice(bytesFromBuffer); } - finally + // hot path #2: there is nothing to Flush and buffering would not be beneficial + else if (_writePos == 0 && buffer.Length >= _bufferSize) { - if (completeSynchronously) // if this is FALSE, we will be entering ReadFromUnderlyingStreamAsync and releasing there. - { - sem.Release(); - } + _readPos = _readLen = 0; + + // start the async operation + ValueTask result = _stream!.ReadAsync(buffer, cancellationToken); + + // release the lock (we are not using shared state anymore) + sem.Release(); + + return result; } } // Delegate to the async implementation. - return ReadFromUnderlyingStreamAsync(buffer.Slice(bytesFromBuffer), cancellationToken, bytesFromBuffer, semaphoreLockTask); + return ReadFromUnderlyingStreamAsync(buffer, cancellationToken, bytesFromBuffer, semaphoreLockTask, locked); } /// BufferedStream should be as thin a wrapper as possible. We want ReadAsync to delegate to /// ReadAsync of the underlying _stream rather than calling the base Stream which implements the one in terms of the other. /// This allows BufferedStream to affect the semantics of the stream it wraps as little as possible. /// -2 if _bufferSize was set to 0 while waiting on the semaphore; otherwise num of bytes read. + /// TODO: this method could benefit from IValueTaskSource (all the awaits allocate and we can take advantage of having single read op at a time) private async ValueTask ReadFromUnderlyingStreamAsync( - Memory buffer, CancellationToken cancellationToken, int bytesAlreadySatisfied, Task semaphoreLockTask) + Memory buffer, CancellationToken cancellationToken, int bytesAlreadySatisfied, Task semaphoreLockTask, bool locked) { // Same conditions validated with exceptions in ReadAsync: Debug.Assert(_stream != null); @@ -700,22 +768,32 @@ private async ValueTask ReadFromUnderlyingStreamAsync( Debug.Assert(_asyncActiveSemaphore != null); Debug.Assert(semaphoreLockTask != null); - // Employ async waiting based on the same synchronization used in BeginRead of the abstract Stream. - await semaphoreLockTask.ConfigureAwait(false); + if (!locked) + { + // Employ async waiting based on the same synchronization used in BeginRead of the abstract Stream. + await semaphoreLockTask.ConfigureAwait(false); + } + try { - // The buffer might have been changed by another async task while we were waiting on the semaphore. - // Check it now again. - int bytesFromBuffer = ReadFromBuffer(buffer.Span); - if (bytesFromBuffer == buffer.Length) - { - return bytesAlreadySatisfied + bytesFromBuffer; - } + int bytesFromBuffer = 0; - if (bytesFromBuffer > 0) + // we have already tried to read it from the buffer + if (!locked && (buffer.Length > 0 || !_allowForZeroByteReads)) { - buffer = buffer.Slice(bytesFromBuffer); - bytesAlreadySatisfied += bytesFromBuffer; + // The buffer might have been changed by another async task while we were waiting on the semaphore. + // Check it now again. + bytesFromBuffer = ReadFromBuffer(buffer.Span); + if (bytesFromBuffer == buffer.Length) + { + return bytesAlreadySatisfied + bytesFromBuffer; + } + + if (bytesFromBuffer > 0) + { + buffer = buffer.Slice(bytesFromBuffer); + bytesAlreadySatisfied += bytesFromBuffer; + } } Debug.Assert(_readLen == _readPos); @@ -773,7 +851,7 @@ private int ReadByteSlow() Debug.Assert(_stream != null); if (_writePos > 0) - FlushWrite(); + FlushWrite(true); EnsureBufferAllocated(); _readLen = _stream.Read(_buffer!, 0, _bufferSize); @@ -1035,19 +1113,21 @@ public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationTo // Try to satisfy the request from the buffer synchronously. SemaphoreSlim sem = EnsureAsyncActiveSemaphoreInitialized(); Task semaphoreLockTask = sem.WaitAsync(cancellationToken); - if (semaphoreLockTask.IsCompletedSuccessfully) + bool locked = semaphoreLockTask.IsCompletedSuccessfully; + if (locked) { bool completeSynchronously = true; + try { if (_writePos == 0) { - ClearReadBufferBeforeWrite(); + ClearReadBufferBeforeWrite(); // Seeks, but does not perform sync IO } Debug.Assert(_writePos < _bufferSize); - // If the write completely fits into the buffer, we can complete synchronously: + // hot path #1 If the write completely fits into the buffer, we can complete synchronously: completeSynchronously = buffer.Length < _bufferSize - _writePos; if (completeSynchronously) { @@ -1061,10 +1141,20 @@ public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationTo if (completeSynchronously) // if this is FALSE, we will be entering WriteToUnderlyingStreamAsync and releasing there. sem.Release(); } + + // hot path #2: there is nothing to Flush and buffering would not be beneficial + if (_writePos == 0 && buffer.Length >= _bufferSize) + { + ValueTask result = _stream!.WriteAsync(buffer, cancellationToken); + + sem.Release(); + + return result; + } } // Delegate to the async implementation. - return WriteToUnderlyingStreamAsync(buffer, cancellationToken, semaphoreLockTask); + return WriteToUnderlyingStreamAsync(buffer, cancellationToken, semaphoreLockTask, locked); } /// BufferedStream should be as thin a wrapper as possible. We want WriteAsync to delegate to @@ -1073,7 +1163,7 @@ public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationTo /// little as possible. /// private async ValueTask WriteToUnderlyingStreamAsync( - ReadOnlyMemory buffer, CancellationToken cancellationToken, Task semaphoreLockTask) + ReadOnlyMemory buffer, CancellationToken cancellationToken, Task semaphoreLockTask, bool locked) { Debug.Assert(_stream != null); Debug.Assert(_stream.CanWrite); @@ -1083,14 +1173,23 @@ private async ValueTask WriteToUnderlyingStreamAsync( // See the LARGE COMMENT in Write(..) for the explanation of the write buffer algorithm. - await semaphoreLockTask.ConfigureAwait(false); + if (!locked) + { + await semaphoreLockTask.ConfigureAwait(false); + } + try { - // The buffer might have been changed by another async task while we were waiting on the semaphore. - // However, note that if we recalculate the sync completion condition to TRUE, then useBuffer will also be TRUE. + if (!locked) + { + // The buffer might have been changed by another async task while we were waiting on the semaphore. + // However, note that if we recalculate the sync completion condition to TRUE, then useBuffer will also be TRUE. - if (_writePos == 0) - ClearReadBufferBeforeWrite(); + if (_writePos == 0) + { + ClearReadBufferBeforeWrite(); + } + } int totalUserBytes; bool useBuffer; @@ -1105,7 +1204,7 @@ private async ValueTask WriteToUnderlyingStreamAsync( { buffer = buffer.Slice(WriteToBuffer(buffer.Span)); - if (_writePos < _bufferSize) + if (_writePos < _buffer!.Length) { Debug.Assert(buffer.Length == 0); return; @@ -1164,6 +1263,18 @@ public override void EndWrite(IAsyncResult asyncResult) => TaskToApm.End(asyncResult); public override void WriteByte(byte value) + { + if (_writePos > 0 && _writePos < _bufferSize - 1) + { + _buffer![_writePos++] = value; + } + else + { + WriteByteSlow(value); + } + } + + private void WriteByteSlow(byte value) { EnsureNotClosed(); @@ -1176,7 +1287,7 @@ public override void WriteByte(byte value) // We should not be flushing here, but only writing to the underlying stream, but previous version flushed, so we keep this. if (_writePos >= _bufferSize - 1) - FlushWrite(); + FlushWrite(true); _buffer![_writePos++] = value; @@ -1194,7 +1305,7 @@ public override long Seek(long offset, SeekOrigin origin) { // We should be only writing the buffer and not flushing, // but the previous version did flush and we stick to it for back-compat reasons. - FlushWrite(); + FlushWrite(true); return _stream.Seek(offset, origin); } @@ -1267,7 +1378,7 @@ public override void CopyTo(Stream destination, int bufferSize) else if (_writePos > 0) { // If there's write data in the buffer, flush it back to the underlying stream, as does ReadAsync. - FlushWrite(); + FlushWrite(true); } // Our buffer is now clear. Copy data directly from the source stream to the destination stream. diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/SemaphoreSlim.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/SemaphoreSlim.cs index b9a28f2e4c469..e6fee3a2f4a3d 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/SemaphoreSlim.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/SemaphoreSlim.cs @@ -920,8 +920,10 @@ private void CheckDispose() { if (m_lockObjAndDisposed.Value) { - throw new ObjectDisposedException(null, SR.SemaphoreSlim_Disposed); + Throw(); } + + static void Throw() => throw new ObjectDisposedException(null, SR.SemaphoreSlim_Disposed); } #endregion } From f305723a784d747956171e586cf68af6dc2023f9 Mon Sep 17 00:00:00 2001 From: Adam Sitnik Date: Thu, 18 Feb 2021 10:44:17 +0100 Subject: [PATCH 07/10] previously accessing Length was not causing a Flush of the internal buffer after using BufferedStream by FileStream iternally, now it does TODO: verify with the reviewers if such change is OK --- .../System.IO.FileSystem/tests/FileStream/Flush.cs | 6 +++++- .../System.IO.FileSystem/tests/FileStream/FlushAsync.cs | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/libraries/System.IO.FileSystem/tests/FileStream/Flush.cs b/src/libraries/System.IO.FileSystem/tests/FileStream/Flush.cs index 10e522185726e..a5c2ba288b29d 100644 --- a/src/libraries/System.IO.FileSystem/tests/FileStream/Flush.cs +++ b/src/libraries/System.IO.FileSystem/tests/FileStream/Flush.cs @@ -105,11 +105,15 @@ public void FlushWriteWithOtherClient(bool? flushToDisk) using (FileStream fsr = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.ReadWrite)) { fs.Write(TestBuffer, 0, TestBuffer.Length); - Assert.Equal(TestBuffer.Length, fs.Length); // Make sure that we've actually buffered it, read handle won't see any changes Assert.Equal(0, fsr.Length); + // previously accessing Length was not causing a Flush of the internal buffer + // after using BufferedStream by FileStream iternally, now it does + // TODO: verify with the reviewers if such change is OK + Assert.Equal(TestBuffer.Length, fs.Length); + // This should cause a write, after it completes the two handles should be in sync Flush(fs, flushToDisk); Assert.Equal(TestBuffer.Length, fsr.Length); diff --git a/src/libraries/System.IO.FileSystem/tests/FileStream/FlushAsync.cs b/src/libraries/System.IO.FileSystem/tests/FileStream/FlushAsync.cs index 327f942a9a7a4..ea1b1ac800393 100644 --- a/src/libraries/System.IO.FileSystem/tests/FileStream/FlushAsync.cs +++ b/src/libraries/System.IO.FileSystem/tests/FileStream/FlushAsync.cs @@ -37,11 +37,15 @@ public async Task FlushAsyncWriteWithOtherClient() using (FileStream fsr = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.ReadWrite)) { fs.Write(TestBuffer, 0, TestBuffer.Length); - Assert.Equal(TestBuffer.Length, fs.Length); // Make sure that we've actually buffered it, read handle won't see any changes Assert.Equal(0, fsr.Length); + // previously accessing Length was not causing a Flush of the internal buffer + // after using BufferedStream by FileStream iternally, now it does + // TODO: verify with the reviewers if such change is OK + Assert.Equal(TestBuffer.Length, fs.Length); + // This should cause a write, after it completes the two handles should be in sync await fs.FlushAsync(); Assert.Equal(TestBuffer.Length, fsr.Length); From 62302d8139f23036ad22d299ece6de3e22e13b34 Mon Sep 17 00:00:00 2001 From: Adam Sitnik Date: Thu, 18 Feb 2021 10:46:42 +0100 Subject: [PATCH 08/10] the throwing in this example is not synchronous anymore I hope it's not a big deal --- .../tests/FileStream/SafeFileHandle.cs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/libraries/System.IO.FileSystem/tests/FileStream/SafeFileHandle.cs b/src/libraries/System.IO.FileSystem/tests/FileStream/SafeFileHandle.cs index 2c5b428e644ba..92ad0ff503bdc 100644 --- a/src/libraries/System.IO.FileSystem/tests/FileStream/SafeFileHandle.cs +++ b/src/libraries/System.IO.FileSystem/tests/FileStream/SafeFileHandle.cs @@ -117,14 +117,7 @@ private async Task ThrowWhenHandlePositionIsChanged(bool useAsync) fs.WriteByte(0); fsr.Position++; - if (useAsync && OperatingSystem.IsWindows()) // Async I/O behaviors differ due to kernel-based implementation on Windows - { - Assert.Throws(() => FSAssert.CompletesSynchronously(fs.ReadAsync(new byte[1], 0, 1))); - } - else - { - await Assert.ThrowsAsync(() => fs.ReadAsync(new byte[1], 0, 1)); - } + await Assert.ThrowsAsync(() => fs.ReadAsync(new byte[1], 0, 1)); fs.WriteByte(0); fsr.Position++; From 24ecc64ea838673539f19949fdc75130cf4efc5f Mon Sep 17 00:00:00 2001 From: Adam Sitnik Date: Thu, 18 Feb 2021 10:47:14 +0100 Subject: [PATCH 09/10] breaking change: parallel reads|writes don't always update Position before being awaited --- .../System.IO.FileSystem/tests/FileStream/WriteAsync.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libraries/System.IO.FileSystem/tests/FileStream/WriteAsync.cs b/src/libraries/System.IO.FileSystem/tests/FileStream/WriteAsync.cs index 6ed4d1db2c4cc..948c58dbe2db7 100644 --- a/src/libraries/System.IO.FileSystem/tests/FileStream/WriteAsync.cs +++ b/src/libraries/System.IO.FileSystem/tests/FileStream/WriteAsync.cs @@ -220,7 +220,7 @@ public async Task ManyConcurrentWriteAsyncs_OuterLoop( { writes[i] = WriteAsync(fs, expectedData, i * writeSize, writeSize, cancellationToken); Assert.Null(writes[i].Exception); - if (useAsync) + if (useAsync && writes[i].IsCompletedSuccessfully) // TODO: breaking change, verify with reviewers { Assert.Equal((i + 1) * writeSize, fs.Position); } From f2ddd38b125d0fc842843ddf291d88ee9df20020 Mon Sep 17 00:00:00 2001 From: Adam Sitnik Date: Thu, 18 Feb 2021 10:59:14 +0100 Subject: [PATCH 10/10] remove outdated comments and unused usings --- .../IO/AsyncWindowsFileStreamStrategy.cs | 19 ------------------ .../IO/SyncWindowsFileStreamStrategy.cs | 20 ------------------- 2 files changed, 39 deletions(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/AsyncWindowsFileStreamStrategy.cs b/src/libraries/System.Private.CoreLib/src/System/IO/AsyncWindowsFileStreamStrategy.cs index e1ede8b489aed..74ed4905288a2 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/AsyncWindowsFileStreamStrategy.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/AsyncWindowsFileStreamStrategy.cs @@ -3,7 +3,6 @@ using System.Buffers; using System.Diagnostics; -using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; using Microsoft.Win32.SafeHandles; @@ -17,24 +16,6 @@ * and we have to deal with this pain. Stream has implementations of * the sync methods in terms of the async ones, so we'll * call through to our base class to get those methods when necessary. - * - * Also buffering is added into Win32FileStream as well. Folded in the - * code from BufferedStream, so all the comments about it being mostly - * aggressive (and the possible perf improvement) apply to Win32FileStream as - * well. Also added some buffering to the async code paths. - * - * Class Invariants: - * The class has one buffer, shared for reading & writing. It can only be - * used for one or the other at any point in time - not both. The following - * should be true: - * 0 <= _readPos <= _readLen < _bufferSize - * 0 <= _writePos < _bufferSize - * _readPos == _readLen && _readPos > 0 implies the read buffer is valid, - * but we're at the end of the buffer. - * _readPos == _readLen == 0 means the read buffer contains garbage. - * Either _writePos can be greater than 0, or _readLen & _readPos can be - * greater than zero, but neither can be greater than zero at the same time. - * */ namespace System.IO diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/SyncWindowsFileStreamStrategy.cs b/src/libraries/System.Private.CoreLib/src/System/IO/SyncWindowsFileStreamStrategy.cs index 456ac18ed4c1f..1ac5ebb9697f2 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/SyncWindowsFileStreamStrategy.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/SyncWindowsFileStreamStrategy.cs @@ -1,13 +1,11 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. -using System.Buffers; using System.Diagnostics; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; using Microsoft.Win32.SafeHandles; -using System.Runtime.CompilerServices; /* * Win32FileStream supports different modes of accessing the disk - async mode @@ -17,24 +15,6 @@ * and we have to deal with this pain. Stream has implementations of * the sync methods in terms of the async ones, so we'll * call through to our base class to get those methods when necessary. - * - * Also buffering is added into Win32FileStream as well. Folded in the - * code from BufferedStream, so all the comments about it being mostly - * aggressive (and the possible perf improvement) apply to Win32FileStream as - * well. Also added some buffering to the async code paths. - * - * Class Invariants: - * The class has one buffer, shared for reading & writing. It can only be - * used for one or the other at any point in time - not both. The following - * should be true: - * 0 <= _readPos <= _readLen < _bufferSize - * 0 <= _writePos < _bufferSize - * _readPos == _readLen && _readPos > 0 implies the read buffer is valid, - * but we're at the end of the buffer. - * _readPos == _readLen == 0 means the read buffer contains garbage. - * Either _writePos can be greater than 0, or _readLen & _readPos can be - * greater than zero, but neither can be greater than zero at the same time. - * */ namespace System.IO