diff --git a/Package/Core/Cancelations/CancelationToken.cs b/Package/Core/Cancelations/CancelationToken.cs index 99823b9a..6fa4ddc7 100644 --- a/Package/Core/Cancelations/CancelationToken.cs +++ b/Package/Core/Cancelations/CancelationToken.cs @@ -43,7 +43,11 @@ partial struct CancelationToken : IRetainable, IEquatable /// /// Returns an empty . /// - public static CancelationToken None { get { return default(CancelationToken); } } + public static CancelationToken None + { + [MethodImpl(Internal.InlineOption)] + get { return default(CancelationToken); } + } /// /// FOR INTERNAL USE ONLY! diff --git a/Package/Core/Linq.meta b/Package/Core/Linq.meta new file mode 100644 index 00000000..71ed5c3b --- /dev/null +++ b/Package/Core/Linq.meta @@ -0,0 +1,8 @@ +fileFormatVersion: 2 +guid: 3b9fed9eba029d049a58939052ecb7e0 +folderAsset: yes +DefaultImporter: + externalObjects: {} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Package/Core/Linq/AsyncEnumerable.cs b/Package/Core/Linq/AsyncEnumerable.cs new file mode 100644 index 00000000..81bbfdc3 --- /dev/null +++ b/Package/Core/Linq/AsyncEnumerable.cs @@ -0,0 +1,171 @@ +using Proto.Promises.Async.CompilerServices; +using System; +using System.Collections.Generic; +using System.ComponentModel; +using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Threading; + +#pragma warning disable IDE0090 // Use 'new(...)' + +namespace Proto.Promises.Linq +{ +#if NET47_OR_GREATER || NETSTANDARD2_1_OR_GREATER || NETCOREAPP || UNITY_2021_2_OR_NEWER + /// + /// Provides helper functions to create async streams. + /// +#if !PROTO_PROMISE_DEVELOPER_MODE + [DebuggerNonUserCode, StackTraceHidden] +#endif + public static class AsyncEnumerable + { + // We use AsyncEnumerableMethod instead of Promise so it can specially handle early-exits (`break` keyword). + + /// + /// Create a new async stream from the specified function. + /// + public static AsyncEnumerable Create(Func, CancelationToken, AsyncEnumerableMethod> asyncIterator) + { + var enumerable = Internal.AsyncEnumerableImpl>.GetOrCreate(new Internal.AsyncIterator(asyncIterator)); + return new AsyncEnumerable(enumerable); + } + + /// + /// Create a new async stream from the specified and function. + /// + public static AsyncEnumerable Create(TCapture captureValue, Func, CancelationToken, AsyncEnumerableMethod> asyncIterator) + { + var enumerable = Internal.AsyncEnumerableImpl>.GetOrCreate(new Internal.AsyncIterator(captureValue, asyncIterator)); + return new AsyncEnumerable(enumerable); + } + } + + /// + /// Exposes an enumerator that provides asynchronous iteration over values of a specified type. + /// An instance of this type may only be consumed once. + /// +#if !PROTO_PROMISE_DEVELOPER_MODE + [DebuggerNonUserCode, StackTraceHidden] +#endif + public readonly struct AsyncEnumerable : IAsyncEnumerable + { + private readonly Internal.PromiseRefBase.AsyncEnumerableBase _target; + private readonly int _id; + + /// + /// Gets whether this instance is valid for enumeration. Once enumeration has begun, this will return false. + /// + public bool IsValid + { + get + { + var target = _target; + return target != null && target.EnumerableId == _id; + } + } + + [MethodImpl(Internal.InlineOption)] + internal AsyncEnumerable(Internal.PromiseRefBase.AsyncEnumerableBase target) + { + _target = target; + _id = _target.EnumerableId; + } + + /// + /// Returns an enumerator that iterates asynchronously through the collection. + /// + [MethodImpl(Internal.InlineOption)] + public AsyncEnumerator GetAsyncEnumerator(CancelationToken cancelationToken) + => _target.GetAsyncEnumerator(_id, cancelationToken); + + /// + /// Returns an enumerator that iterates asynchronously through the collection. + /// + [MethodImpl(Internal.InlineOption)] + public AsyncEnumerator GetAsyncEnumerator() => GetAsyncEnumerator(CancelationToken.None); + + IAsyncEnumerator IAsyncEnumerable.GetAsyncEnumerator(CancellationToken cancellationToken) => GetAsyncEnumerator(cancellationToken.ToCancelationToken()); + + /// + /// Sets the to be passed to when iterating. + /// + /// The cancelation token to use. + /// The configured enumerable. + public ConfiguredAsyncEnumerable WithCancelation(CancelationToken cancelationToken) + => new ConfiguredAsyncEnumerable(this, cancelationToken, Internal.SynchronizationOption.Synchronous, null, false); + + /// + /// Configures how awaits on the promises returned from an async iteration will be performed. + /// + /// On which context the continuations will be executed. + /// If true, forces the continuations to be invoked asynchronously. If is , this value will be ignored. + /// The configured enumerable. + public ConfiguredAsyncEnumerable ConfigureAwait(SynchronizationOption synchronizationOption, bool forceAsync = false) + => new ConfiguredAsyncEnumerable(this, CancelationToken.None, (Internal.SynchronizationOption) synchronizationOption, null, forceAsync); + + /// + /// Configures how awaits on the promises returned from an async iteration will be performed. + /// + /// The context on which the continuations will be executed. + /// If true, forces the continuations to be invoked asynchronously. + /// The configured enumerable. + public ConfiguredAsyncEnumerable ConfigureAwait(SynchronizationContext synchronizationContext, bool forceAsync = false) + => new ConfiguredAsyncEnumerable(this, CancelationToken.None, Internal.SynchronizationOption.Explicit, synchronizationContext, forceAsync); + +#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member + // These methods exist to hide the built-in IAsyncEnumerable extension methods, and use promise-optimized implementations instead if they are used. + [EditorBrowsable(EditorBrowsableState.Never)] + public ConfiguredAsyncEnumerable WithCancellation(CancellationToken cancellationToken) + => WithCancelation(cancellationToken.ToCancelationToken()); + + [EditorBrowsable(EditorBrowsableState.Never)] + public ConfiguredAsyncEnumerable ConfigureAwait(bool continueOnCapturedContext) + => continueOnCapturedContext ? ConfigureAwait(SynchronizationContext.Current) : ConfigureAwait(SynchronizationOption.Background); +#pragma warning restore CS1591 // Missing XML comment for publicly visible type or member + } + + /// + /// Supports a simple asynchronous iteration over a generic collection. + /// An instance of this type may only be consumed once. + /// +#if !PROTO_PROMISE_DEVELOPER_MODE + [DebuggerNonUserCode, StackTraceHidden] +#endif + public readonly struct AsyncEnumerator : IAsyncEnumerator + { + private readonly Internal.PromiseRefBase.AsyncEnumerableBase _target; + private readonly int _id; + + [MethodImpl(Internal.InlineOption)] + internal AsyncEnumerator(Internal.PromiseRefBase.AsyncEnumerableBase target, int id) + { + _target = target; + _id = id; + } + + /// + /// Gets the element in the collection at the current position of the enumerator. + /// + public T Current + { + [MethodImpl(Internal.InlineOption)] + get { return _target.GetCurrent(_id); } + } + + /// + /// Advances the enumerator asynchronously to the next element of the collection. + /// + public Promise MoveNextAsync() + => _target.MoveNextAsync(_id); + + /// + /// Asynchronously releases resources used by this enumerator. + /// + public Promise DisposeAsync() + => _target.DisposeAsync(_id); + + System.Threading.Tasks.ValueTask IAsyncEnumerator.MoveNextAsync() => MoveNextAsync(); + System.Threading.Tasks.ValueTask IAsyncDisposable.DisposeAsync() => DisposeAsync(); + } +#endif +} \ No newline at end of file diff --git a/Package/Core/Linq/AsyncEnumerable.cs.meta b/Package/Core/Linq/AsyncEnumerable.cs.meta new file mode 100644 index 00000000..61341323 --- /dev/null +++ b/Package/Core/Linq/AsyncEnumerable.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: 616c179fd4ead7d439fe3a494deb9a5b +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Package/Core/Linq/CompilerServices.meta b/Package/Core/Linq/CompilerServices.meta new file mode 100644 index 00000000..a4fd0816 --- /dev/null +++ b/Package/Core/Linq/CompilerServices.meta @@ -0,0 +1,8 @@ +fileFormatVersion: 2 +guid: a95ca65e149bac64393f669b1cc01bf2 +folderAsset: yes +DefaultImporter: + externalObjects: {} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Package/Core/Linq/CompilerServices/AsyncEnumerableMethodBuilder.cs b/Package/Core/Linq/CompilerServices/AsyncEnumerableMethodBuilder.cs new file mode 100644 index 00000000..2b983f69 --- /dev/null +++ b/Package/Core/Linq/CompilerServices/AsyncEnumerableMethodBuilder.cs @@ -0,0 +1,123 @@ +using System; +using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Security; + +// AsyncMethodBuilderAttribute +#pragma warning disable 0436 // Type conflicts with imported type + +namespace Proto.Promises.Async.CompilerServices +{ +#if NET47_OR_GREATER || NETSTANDARD2_1_OR_GREATER || NETCOREAPP || UNITY_2021_2_OR_NEWER + /// + /// Type used to create an with . + /// + /// This type is intended for compiler use rather than use directly in code. +#if !PROTO_PROMISE_DEVELOPER_MODE + [DebuggerNonUserCode, StackTraceHidden] +#endif + [AsyncMethodBuilder(typeof(AsyncEnumerableMethodBuilder))] + public readonly struct AsyncEnumerableMethod + { + internal readonly Promise _promise; + + [MethodImpl(Internal.InlineOption)] + internal AsyncEnumerableMethod(Promise promise) + => _promise = promise; + } + + /// + /// Awaitable type used to wait for the consumer to move the async iterator forward. + /// + /// This type is intended for compiler use rather than use directly in code. +#if !PROTO_PROMISE_DEVELOPER_MODE + [DebuggerNonUserCode, StackTraceHidden] +#endif + public struct AsyncEnumerableMethodBuilder + { + private PromiseMethodBuilder _builder; + + [MethodImpl(Internal.InlineOption)] + private AsyncEnumerableMethodBuilder(PromiseMethodBuilder builder) + => _builder = builder; + + /// Gets the for this builder. + /// The representing the builder's asynchronous operation. + public AsyncEnumerableMethod Task + { + [MethodImpl(Internal.InlineOption)] + get { return new AsyncEnumerableMethod(_builder.Task); } + } + + /// Initializes a new . + /// The initialized . + [MethodImpl(Internal.InlineOption)] + public static AsyncEnumerableMethodBuilder Create() + => new AsyncEnumerableMethodBuilder(PromiseMethodBuilder.Create()); + + /// + /// Completes the in the Rejected state with the specified exception. + /// + /// The to use to reject the promise. + public void SetException(Exception exception) + { + if (exception == AsyncEnumerableDisposedException.s_instance) + { + // The await foreach loop was stopped with a `break`. + SetResult(); + } + else + { + _builder.SetException(exception); + } + } + + /// + /// Completes the in the Resolved state. + /// + [MethodImpl(Internal.InlineOption)] + public void SetResult() + => _builder.SetResult(); + + /// + /// Schedules the specified state machine to be pushed forward when the specified awaiter completes. + /// + /// Specifies the type of the awaiter. + /// Specifies the type of the state machine. + /// The awaiter. + /// The state machine. + [MethodImpl(Internal.InlineOption)] + public void AwaitOnCompleted(ref TAwaiter awaiter, ref TStateMachine stateMachine) + where TAwaiter : INotifyCompletion + where TStateMachine : IAsyncStateMachine + => _builder.AwaitOnCompleted(ref awaiter, ref stateMachine); + + /// + /// Schedules the specified state machine to be pushed forward when the specified awaiter completes. + /// + /// Specifies the type of the awaiter. + /// Specifies the type of the state machine. + /// The awaiter. + /// The state machine. + [SecuritySafeCritical] + [MethodImpl(Internal.InlineOption)] + public void AwaitUnsafeOnCompleted(ref TAwaiter awaiter, ref TStateMachine stateMachine) + where TAwaiter : ICriticalNotifyCompletion + where TStateMachine : IAsyncStateMachine + => _builder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine); + + /// Initiates the builder's execution with the associated state machine. + /// Specifies the type of the state machine. + /// The state machine instance, passed by reference. + [MethodImpl(Internal.InlineOption)] + public void Start(ref TStateMachine stateMachine) + where TStateMachine : IAsyncStateMachine + => _builder.Start(ref stateMachine); + + /// Does nothing. + /// The heap-allocated state machine object. + [MethodImpl(Internal.InlineOption)] + public void SetStateMachine(IAsyncStateMachine stateMachine) { } + } +#endif +} \ No newline at end of file diff --git a/Package/Core/Linq/CompilerServices/AsyncEnumerableMethodBuilder.cs.meta b/Package/Core/Linq/CompilerServices/AsyncEnumerableMethodBuilder.cs.meta new file mode 100644 index 00000000..15e3fecc --- /dev/null +++ b/Package/Core/Linq/CompilerServices/AsyncEnumerableMethodBuilder.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: a4bec1d312dbac440a10ffa611cc1750 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Package/Core/Linq/CompilerServices/AsyncStreamWriter.cs b/Package/Core/Linq/CompilerServices/AsyncStreamWriter.cs new file mode 100644 index 00000000..12b5e3f2 --- /dev/null +++ b/Package/Core/Linq/CompilerServices/AsyncStreamWriter.cs @@ -0,0 +1,101 @@ +using System; +using System.Diagnostics; +using System.Runtime.CompilerServices; + +namespace Proto.Promises.Async.CompilerServices +{ +#if NET47_OR_GREATER || NETSTANDARD2_1_OR_GREATER || NETCOREAPP || UNITY_2021_2_OR_NEWER + /// + /// Type that allows writing to the async stream created from . + /// +#if !PROTO_PROMISE_DEVELOPER_MODE + [DebuggerNonUserCode, StackTraceHidden] +#endif + public readonly struct AsyncStreamWriter + { + private readonly Internal.PromiseRefBase.AsyncEnumerableBase _target; + private readonly int _id; + + [MethodImpl(Internal.InlineOption)] + internal AsyncStreamWriter(Internal.PromiseRefBase.AsyncEnumerableBase target, int id) + { + _target = target; + _id = id; + } + + /// + /// Asynchronously writes the to the stream. + /// the returned to pause execution until the reader has requested the async iterator to move forward. + /// + [MethodImpl(Internal.InlineOption)] + public AsyncStreamYielder YieldAsync(T value) + => _target.YieldAsync(value, _id); + } + + /// + /// Awaitable type used to wait for the consumer to move the async iterator forward. + /// + /// This type is intended for compiler use rather than use directly in code. +#if !PROTO_PROMISE_DEVELOPER_MODE + [DebuggerNonUserCode, StackTraceHidden] +#endif + public readonly partial struct AsyncStreamYielder : ICriticalNotifyCompletion, Internal.IPromiseAwaiter + { + private readonly Internal.PromiseRefBase.AsyncEnumerableBase _target; + private readonly int _enumerableId; + + [MethodImpl(Internal.InlineOption)] + internal AsyncStreamYielder(Internal.PromiseRefBase.AsyncEnumerableBase target, int enumerableId) + { + _target = target; + _enumerableId = enumerableId; + CreateOverride(); + } + + static partial void CreateOverride(); + +#if !NETCOREAPP + // Fix for IL2CPP not invoking the static constructor. +#if ENABLE_IL2CPP + [MethodImpl(Internal.InlineOption)] + static partial void CreateOverride() +#else + static AsyncStreamYielder() +#endif + { + Internal.AwaitOverriderImpl>.Create(); + } +#endif + + /// + /// Returns this. + /// + [MethodImpl(Internal.InlineOption)] + public AsyncStreamYielder GetAwaiter() => this; + + /// Gets whether the reader has requested the async iterator to move forward. + /// This property is intended for compiler use rather than use directly in code. + public bool IsCompleted + { + [MethodImpl(Internal.InlineOption)] + get { return false; } + } + + /// Ends the await. + /// This method is intended for compiler use rather than use directly in code. + [MethodImpl(Internal.InlineOption)] + public void GetResult() + => _target.GetResultForAsyncStreamYielder(_enumerableId); + + [MethodImpl(Internal.InlineOption)] + void Internal.IPromiseAwaiter.AwaitOnCompletedInternal(Internal.PromiseRefBase asyncPromiseRef, ref Internal.PromiseRefBase.AsyncPromiseFields asyncFields) + => _target.AwaitOnCompletedForAsyncStreamYielder(asyncPromiseRef, _enumerableId); + + void INotifyCompletion.OnCompleted(Action continuation) + => throw new InvalidOperationException("AsyncStreamYielder must only be used in AsyncEnumerable methods."); + + void ICriticalNotifyCompletion.UnsafeOnCompleted(Action continuation) + => throw new InvalidOperationException("AsyncStreamYielder must only be used in AsyncEnumerable methods."); + } +#endif +} \ No newline at end of file diff --git a/Package/Core/Linq/CompilerServices/AsyncStreamWriter.cs.meta b/Package/Core/Linq/CompilerServices/AsyncStreamWriter.cs.meta new file mode 100644 index 00000000..3c4306ac --- /dev/null +++ b/Package/Core/Linq/CompilerServices/AsyncStreamWriter.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: 842286b66c99b914696343d9bcd0e0d2 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Package/Core/Linq/CompilerServices/ConfiguredAsyncEnumerable.cs b/Package/Core/Linq/CompilerServices/ConfiguredAsyncEnumerable.cs new file mode 100644 index 00000000..b7423e32 --- /dev/null +++ b/Package/Core/Linq/CompilerServices/ConfiguredAsyncEnumerable.cs @@ -0,0 +1,109 @@ +using Proto.Promises.Linq; +using System.Runtime.CompilerServices; +using System.Threading; + +namespace Proto.Promises.Async.CompilerServices +{ +#if NET47_OR_GREATER || NETSTANDARD2_1_OR_GREATER || NETCOREAPP || UNITY_2021_2_OR_NEWER + /// + /// Provides an awaitable async enumerable that enables cancelable iteration and configured awaits. + /// + /// The type of values to enumerate. + public readonly struct ConfiguredAsyncEnumerable + { + private readonly AsyncEnumerable _enumerable; + private readonly CancelationToken _cancelationToken; + private readonly SynchronizationContext _synchronizationContext; + private readonly Internal.SynchronizationOption _synchronizationOption; + private readonly bool _forceAsync; + + internal ConfiguredAsyncEnumerable(AsyncEnumerable enumerable, + CancelationToken cancelationToken, + Internal.SynchronizationOption synchronizationOption, + SynchronizationContext synchronizationContext, + bool forceAsync) + { + _enumerable = enumerable; + _cancelationToken = cancelationToken; + _synchronizationOption = synchronizationOption; + _synchronizationContext = synchronizationContext; + _forceAsync = forceAsync; + } + + /// + /// Sets the to be passed to when iterating. + /// + /// The cancelation token to use. + /// The configured enumerable. + public ConfiguredAsyncEnumerable WithCancelation(CancelationToken cancelationToken) + => new ConfiguredAsyncEnumerable(_enumerable, cancelationToken, _synchronizationOption, _synchronizationContext, _forceAsync); + + /// + /// Configures how awaits on the promises returned from an async iteration will be performed. + /// + /// On which context the continuations will be executed. + /// If true, forces the continuations to be invoked asynchronously. If is , this value will be ignored. + /// The configured enumerable. + public ConfiguredAsyncEnumerable ConfigureAwait(SynchronizationOption synchronizationOption, bool forceAsync = false) + => new ConfiguredAsyncEnumerable(_enumerable, _cancelationToken, (Internal.SynchronizationOption) synchronizationOption, null, forceAsync); + + /// + /// Configures how awaits on the promises returned from an async iteration will be performed. + /// + /// The context on which the continuations will be executed. + /// If true, forces the continuations to be invoked asynchronously. + /// The configured enumerable. + public ConfiguredAsyncEnumerable ConfigureAwait(SynchronizationContext synchronizationContext, bool forceAsync = false) + => new ConfiguredAsyncEnumerable(_enumerable, _cancelationToken, Internal.SynchronizationOption.Explicit, synchronizationContext, forceAsync); + + /// + /// Returns an enumerator that iterates asynchronously through collections that enables cancelable iteration and configured awaits. + /// + /// An enumerator for the . + public Enumerator GetAsyncEnumerator() + => new Enumerator(_enumerable.GetAsyncEnumerator(_cancelationToken), _synchronizationContext, _synchronizationOption, _forceAsync); + + /// + /// Provides an awaitable async enumerator that enables cancelable iteration and configured awaits. + /// + public readonly struct Enumerator + { + private readonly AsyncEnumerator _enumerator; + private readonly SynchronizationContext _synchronizationContext; + private readonly Internal.SynchronizationOption _synchronizationOption; + private readonly bool _forceAsync; + + internal Enumerator(AsyncEnumerator enumerator, SynchronizationContext synchronizationContext, Internal.SynchronizationOption synchronizationOption, bool forceAsync) + { + _enumerator = enumerator; + _synchronizationContext = synchronizationContext; + _synchronizationOption = synchronizationOption; + _forceAsync = forceAsync; + } + + /// + /// Gets the element in the collection at the current position of the enumerator. + /// + public T Current + { + [MethodImpl(Internal.InlineOption)] + get { return _enumerator.Current; } + } + + /// + /// Advances the enumerator asynchronously to the next element of the collection. + /// + [MethodImpl(Internal.InlineOption)] + public Promise MoveNextAsync() + => Internal.PromiseRefBase.CallbackHelperVoid.WaitAsync(_enumerator.MoveNextAsync(), _synchronizationOption, _synchronizationContext, _forceAsync, CancelationToken.None); + + /// + /// Asynchronously releases resources used by the . + /// + [MethodImpl(Internal.InlineOption)] + public Promise DisposeAsync() + => Internal.PromiseRefBase.CallbackHelperVoid.WaitAsync(_enumerator.DisposeAsync(), _synchronizationOption, _synchronizationContext, _forceAsync, CancelationToken.None); + } + } +#endif +} \ No newline at end of file diff --git a/Package/Core/Linq/CompilerServices/ConfiguredAsyncEnumerable.cs.meta b/Package/Core/Linq/CompilerServices/ConfiguredAsyncEnumerable.cs.meta new file mode 100644 index 00000000..7c584ca7 --- /dev/null +++ b/Package/Core/Linq/CompilerServices/ConfiguredAsyncEnumerable.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: e037ecc641981e64faf2cf7df896ab02 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Package/Core/Linq/Internal.meta b/Package/Core/Linq/Internal.meta new file mode 100644 index 00000000..46581396 --- /dev/null +++ b/Package/Core/Linq/Internal.meta @@ -0,0 +1,8 @@ +fileFormatVersion: 2 +guid: 2e8d90d875307694b89809911ef76f79 +folderAsset: yes +DefaultImporter: + externalObjects: {} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Package/Core/Linq/Internal/AsyncEnumerableDisposedException.cs b/Package/Core/Linq/Internal/AsyncEnumerableDisposedException.cs new file mode 100644 index 00000000..aa02c9ee --- /dev/null +++ b/Package/Core/Linq/Internal/AsyncEnumerableDisposedException.cs @@ -0,0 +1,16 @@ +using System; + +#pragma warning disable IDE0090 // Use 'new(...)' + +namespace Proto.Promises.Async.CompilerServices +{ +#if NET47_OR_GREATER || NETSTANDARD2_1_OR_GREATER || NETCOREAPP || UNITY_2021_2_OR_NEWER + internal sealed class AsyncEnumerableDisposedException : Exception + { + // We can use a singleton instance since we never care about the stack trace. + internal static readonly AsyncEnumerableDisposedException s_instance = new AsyncEnumerableDisposedException(); + + private AsyncEnumerableDisposedException() : base("This is a special exception used for async enumerables. It should never be caught by user code!") { } + } +#endif +} \ No newline at end of file diff --git a/Package/Core/Linq/Internal/AsyncEnumerableDisposedException.cs.meta b/Package/Core/Linq/Internal/AsyncEnumerableDisposedException.cs.meta new file mode 100644 index 00000000..4a72854e --- /dev/null +++ b/Package/Core/Linq/Internal/AsyncEnumerableDisposedException.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: 18aa94999009f5d4caa27dfe57256118 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Package/Core/Linq/Internal/AsyncEnumerableInternal.cs b/Package/Core/Linq/Internal/AsyncEnumerableInternal.cs new file mode 100644 index 00000000..324c1650 --- /dev/null +++ b/Package/Core/Linq/Internal/AsyncEnumerableInternal.cs @@ -0,0 +1,380 @@ +#if PROTO_PROMISE_DEBUG_ENABLE || (!PROTO_PROMISE_DEBUG_DISABLE && DEBUG) +#define PROMISE_DEBUG +#else +#undef PROMISE_DEBUG +#endif + +using Proto.Promises.Async.CompilerServices; +using System; +using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Threading; + +namespace Proto.Promises +{ +#if NET47_OR_GREATER || NETSTANDARD2_1_OR_GREATER || NETCOREAPP || UNITY_2021_2_OR_NEWER + partial class Internal + { + internal interface IAsyncIterator + { + AsyncEnumerableMethod Start(AsyncStreamWriter streamWriter, CancelationToken cancelationToken); + bool IsNull { get; } + } + +#if !PROTO_PROMISE_DEVELOPER_MODE + [DebuggerNonUserCode, StackTraceHidden] +#endif + internal readonly struct AsyncIterator : IAsyncIterator + { + private readonly Func, CancelationToken, AsyncEnumerableMethod> _func; + + public bool IsNull + { + [MethodImpl(InlineOption)] + get { return _func == null; } + } + + [MethodImpl(InlineOption)] + internal AsyncIterator(Func, CancelationToken, AsyncEnumerableMethod> func) + => _func = func; + + [MethodImpl(InlineOption)] + public AsyncEnumerableMethod Start(AsyncStreamWriter streamWriter, CancelationToken cancelationToken) + => _func.Invoke(streamWriter, cancelationToken); + } + +#if !PROTO_PROMISE_DEVELOPER_MODE + [DebuggerNonUserCode, StackTraceHidden] +#endif + internal readonly struct AsyncIterator : IAsyncIterator + { + private readonly TCapture _capturedValue; + private readonly Func, CancelationToken, AsyncEnumerableMethod> _func; + + public bool IsNull + { + [MethodImpl(InlineOption)] + get { return _func == null; } + } + + [MethodImpl(InlineOption)] + internal AsyncIterator(TCapture captureValue, Func, CancelationToken, AsyncEnumerableMethod> func) + { + _capturedValue = captureValue; + _func = func; + } + + [MethodImpl(InlineOption)] + public AsyncEnumerableMethod Start(AsyncStreamWriter streamWriter, CancelationToken cancelationToken) + => _func.Invoke(_capturedValue, streamWriter, cancelationToken); + } + + partial class PromiseRefBase + { +#if !PROTO_PROMISE_DEVELOPER_MODE + [DebuggerNonUserCode, StackTraceHidden] +#endif + internal abstract class AsyncEnumerableBase : PromiseSingleAwait + { + // This is used as the backing reference to 3 different awaiters. MoveNextAsync (Promise), DisposeAsync (Promise), and YieldAsync (AsyncStreamYielder). + // We use `Interlocked.CompareExchange(ref _enumerableId` to enforce only 1 awaiter uses it at a time, in the correct order. + // We use a separate field for AsyncStreamYielder continuation, because using _next for 2 separate async functions (the iterator and the consumer) proves problematic. + protected PromiseRefBase _iteratorPromiseRef; + private T _current; + private int _iteratorCompleteExpectedId; + private int _iteratorCompleteId; + protected int _enumerableId = 1; // Start with Id 1 instead of 0 to reduce risk of false positives. + protected bool _disposed; + protected CancelationToken _cancelationToken; + + internal int EnumerableId + { + [MethodImpl(InlineOption)] + get { return _enumerableId; } + } + + ~AsyncEnumerableBase() + { + try + { + if (!_disposed) + { + string message = "An AsyncEnumerable's resources were garbage collected without it being disposed. You must call DisposeAsync on the AsyncEnumerator."; + ReportRejection(new UnreleasedObjectException(message), this); + } + } + catch (Exception e) + { + // This should never happen. + ReportRejection(e, this); + } + } + + internal Linq.AsyncEnumerator GetAsyncEnumerator(int id, CancelationToken cancelationToken) + { + int newId = id + 1; + if (Interlocked.CompareExchange(ref _enumerableId, newId, id) != id) + { + throw new InvalidOperationException("AsyncEnumerable.GetAsyncEnumerator: instance is not valid. AsyncEnumerable may only be used once.", GetFormattedStacktrace(2)); + } + _cancelationToken = cancelationToken; + return new Linq.AsyncEnumerator(this, newId); + } + + [MethodImpl(InlineOption)] + internal T GetCurrent(int id) + { + if (_enumerableId != id) + { + throw new InvalidOperationException("AsyncEnumerable.GetCurrent: instance is not valid, or the MoveNextAsync operation is still pending.", GetFormattedStacktrace(2)); + } + return _current; + } + + [MethodImpl(InlineOption)] + internal Promise MoveNextAsync(int id) + { + // We increment by 1 when MoveNextAsync, then decrement by 1 when YieldAsync. + int newId = id + 1; + // When the async iterator function completes, we set it to the id + 2 so we can detect that case. + int iteratorCompleteId = id + 2; + int oldId = Interlocked.CompareExchange(ref _enumerableId, newId, id); + if (oldId != id) + { + if (oldId == iteratorCompleteId) + { + // The async iterator function is already complete. Return a resolved promise with `false` result. + return Promise.Resolved(false); + } + throw new InvalidOperationException("AsyncEnumerable.MoveNextAsync: instance is not valid, or the previous MoveNextAsync operation is still pending.", GetFormattedStacktrace(2)); + } + ThrowIfInPool(this); + _current = default; + _iteratorCompleteExpectedId = newId; + _iteratorCompleteId = iteratorCompleteId; + _result = false; + StartOrMoveNext(newId); + return new Promise(this, Id, 0); + } + + [MethodImpl(InlineOption)] + internal AsyncStreamYielder YieldAsync(in T value, int id) + { + int newId = id - 1; + if (Interlocked.CompareExchange(ref _enumerableId, newId, id) != id) + { + throw new InvalidOperationException("AsyncStreamWriter.YieldAsync: instance is not valid. This must only be called from the iterator method, and not within any catch or finally blocks.", GetFormattedStacktrace(2)); + } + ThrowIfInPool(this); + _current = value; + _iteratorCompleteExpectedId = newId; + // When the async iterator function completes, we set it to the original id + 2 so we can detect that case. + _iteratorCompleteId = newId + 2; + return new AsyncStreamYielder(this, newId); + } + + [MethodImpl(InlineOption)] + internal Promise DisposeAsync(int id) + { + int newId = id + 3; + // When the async iterator function completes before DisposeAsync is called, it's set to id + 2. + int iteratorCompleteId = id + 2; + // Common case is DisposeAsync is called after the async iterator function is complete. + int oldId = Interlocked.CompareExchange(ref _enumerableId, newId, iteratorCompleteId); + if (oldId == iteratorCompleteId) + { + // The async iterator function is already complete, dispose this and return a resolved promise. + _disposed = true; + DisposeAndReturnToPool(); + return Promise.Resolved(); + } + + // Otherwise, DisposeAsync was likely called to stop iterating early (`break` keyword in `await foreach` loop). + // We do another CompareExchange in case it was an invalid call. + oldId = Interlocked.CompareExchange(ref _enumerableId, newId, id); + if (oldId != id) + { + if (oldId == id + 1) + { + throw new InvalidOperationException("AsyncEnumerable.DisposeAsync: the previous MoveNextAsync operation is still pending.", GetFormattedStacktrace(2)); + } + // IAsyncDisposable.DisposeAsync must not throw if it's called multiple times, according to MSDN documentation. + return Promise.Resolved(); + } + + ThrowIfInPool(this); + _disposed = true; + var iteratorPromise = InterlockedExchange(ref _iteratorPromiseRef, null); + if (iteratorPromise == null) + { + // DisposeAsync was called before MoveNextAsync, the async iterator function never started. + // Dispose this and return a resolved promise. + State = Promise.State.Resolved; + DisposeAndReturnToPool(); + return Promise.Resolved(); + } + + // The async iterator function is not already complete, we move the async state machine forward. + // Once that happens, GetResultForAsyncStreamYielder will be called which throws the special exception. + _current = default; + _iteratorCompleteExpectedId = newId; + _iteratorCompleteId = newId; + // Invalidate the previous awaiter. + IncrementPromiseIdAndClearPrevious(); + // Reset for the next awaiter. + ResetWithoutStacktrace(); + iteratorPromise.Handle(this, null, Promise.State.Resolved); + return new Promise(this, Id, 0); + } + + internal override void Handle(PromiseRefBase handler, object rejectContainer, Promise.State state) + { + // This is called when the async iterator function completes. + ThrowIfInPool(this); + handler.SetCompletionState(rejectContainer, state); + if (Interlocked.CompareExchange(ref _enumerableId, _iteratorCompleteId, _iteratorCompleteExpectedId) != _iteratorCompleteExpectedId) + { + handler.MaybeReportUnhandledAndDispose(rejectContainer, state); + rejectContainer = CreateRejectContainer(new InvalidOperationException("AsyncEnumerable.Create iterator function completed invalidly. Did you YieldAsync without await?"), int.MinValue, null, this); + state = Promise.State.Rejected; + } + else + { + handler.SuppressRejection = true; + handler.MaybeDispose(); + } + HandleNextInternal(rejectContainer, state); + } + + protected void HandleFromSynchronouslyCompletedIterator() + { + ThrowIfInPool(this); + object rejectContainer = null; + Promise.State state = Promise.State.Resolved; + if (Interlocked.CompareExchange(ref _enumerableId, _iteratorCompleteId, _iteratorCompleteExpectedId) != _iteratorCompleteExpectedId) + { + rejectContainer = CreateRejectContainer(new InvalidOperationException("AsyncEnumerable.Create iterator function completed invalidly. Did you YieldAsync without await?"), int.MinValue, null, this); + state = Promise.State.Rejected; + } + HandleNextInternal(rejectContainer, state); + } + + internal void GetResultForAsyncStreamYielder(int enumerableId) + { + int enumId = _enumerableId; + // We add 1 because MoveNextAsync is expected to be called before this. + if (enumId != enumerableId + 1) + { + // If it wasn't MoveNextAsync, then it should've been DisposeAsync. + if (enumId == enumerableId + 3) + { + // DisposeAsync was called early (before the async iterator function completed). + // Reset in case the async iterator function completes synchronously from Start. + ResetWithoutStacktrace(); + // Throw this special exception so that the async iterator function will run any finally blocks and complete. + throw AsyncEnumerableDisposedException.s_instance; + } + throw new InvalidOperationException("AsyncStreamYielder.GetResult: instance is not valid. This should only be called from the iterator method, and it may only be called once.", GetFormattedStacktrace(2)); + } + // Reset in case the async iterator function completes synchronously from Start. + ResetWithoutStacktrace(); + } + + [MethodImpl(InlineOption)] + internal void AwaitOnCompletedForAsyncStreamYielder(PromiseRefBase asyncPromiseRef, int enumerableId) + { + if (_enumerableId != enumerableId || Interlocked.CompareExchange(ref _iteratorPromiseRef, asyncPromiseRef, null) != null) + { + throw new InvalidOperationException("AsyncStreamYielder: invalid await. Only one await is allowed.", GetFormattedStacktrace(2)); + } + // Complete the MoveNextAsync promise. + _result = true; + HandleNextInternal(null, Promise.State.Resolved); + } + + protected abstract void StartOrMoveNext(int enumerableId); + + protected abstract void DisposeAndReturnToPool(); + } // class AsyncEnumerableBase + } // class PromiseRefBase + +#if !PROTO_PROMISE_DEVELOPER_MODE + [DebuggerNonUserCode, StackTraceHidden] +#endif + internal sealed class AsyncEnumerableImpl : PromiseRefBase.AsyncEnumerableBase + where TIterator : IAsyncIterator + { + private TIterator _iterator; + + private AsyncEnumerableImpl() { } + + [MethodImpl(InlineOption)] + private static AsyncEnumerableImpl GetOrCreate() + { + var obj = ObjectPool.TryTakeOrInvalid>(); + return obj == InvalidAwaitSentinel.s_instance + ? new AsyncEnumerableImpl() + : obj.UnsafeAs>(); + } + + [MethodImpl(InlineOption)] + internal static AsyncEnumerableImpl GetOrCreate(in TIterator iterator) + { + var enumerable = GetOrCreate(); + enumerable.Reset(); + enumerable._iterator = iterator; + enumerable._disposed = false; + return enumerable; + } + + protected override void DisposeAndReturnToPool() + { + Dispose(); + _cancelationToken = default; + ObjectPool.MaybeRepool(this); + } + + internal override void MaybeDispose() + { + // This is called on every MoveNextAsync, we only fully dispose and return to pool after DisposeAsync is called. + if (_disposed) + { + DisposeAndReturnToPool(); + } + } + + protected override void StartOrMoveNext(int enumerableId) + { + if (_iterator.IsNull) + { + // Invalidate the previous awaiter. + IncrementPromiseIdAndClearPrevious(); + // Reset for the next awaiter. + ResetWithoutStacktrace(); + // Handle iterator promise to move the async state machine forward. + InterlockedExchange(ref _iteratorPromiseRef, null).Handle(this, null, Promise.State.Resolved); + return; + } + + var iterator = _iterator; + _iterator = default; + var iteratorPromise = iterator.Start(new AsyncStreamWriter(this, enumerableId), _cancelationToken)._promise; + if (iteratorPromise._ref == null) + { + // Already complete. This can happen if no awaits occurred in the async iterator function. + HandleFromSynchronouslyCompletedIterator(); + return; + } + + // We only set _previous to support circular await detection. + // We don't set _rejectContainerOrPreviousOrLink to prevent progress subscriptions from going down the chain, because progress is meaningless for AsyncEnumerable. +#if PROMISE_DEBUG + _previous = iteratorPromise._ref; +#endif + // We hook this up directly to the returned promise so we can know when the iteration is complete, and use this for the DisposeAsync promise. + iteratorPromise._ref.HookupExistingWaiter(iteratorPromise._id, this); + } + } // class AsyncEnumerableImpl + } // class Internal +#endif +} // namespace Proto.Promises \ No newline at end of file diff --git a/Package/Core/Linq/Internal/AsyncEnumerableInternal.cs.meta b/Package/Core/Linq/Internal/AsyncEnumerableInternal.cs.meta new file mode 100644 index 00000000..40ee93b7 --- /dev/null +++ b/Package/Core/Linq/Internal/AsyncEnumerableInternal.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: 5a63e545109e9d14da0ca1d342d23616 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Package/Core/Promises/CompilerServices/PromiseMethodBuilders.cs b/Package/Core/Promises/CompilerServices/PromiseMethodBuilders.cs index 4dc65724..6affe96c 100644 --- a/Package/Core/Promises/CompilerServices/PromiseMethodBuilders.cs +++ b/Package/Core/Promises/CompilerServices/PromiseMethodBuilders.cs @@ -88,8 +88,8 @@ namespace Async.CompilerServices { /// /// Provides a builder for asynchronous methods that return . - /// This type is intended for compiler use only. /// + /// This type is intended for compiler use rather than use directly in code. #if !PROTO_PROMISE_DEVELOPER_MODE [DebuggerNonUserCode, StackTraceHidden] #endif @@ -144,8 +144,8 @@ public void SetStateMachine(IAsyncStateMachine stateMachine) { } /// /// Provides a builder for asynchronous methods that return . - /// This type is intended for compiler use only. /// + /// This type is intended for compiler use rather than use directly in code. #if !PROTO_PROMISE_DEVELOPER_MODE [DebuggerNonUserCode, StackTraceHidden] #endif diff --git a/Package/Core/Promises/Internal/PromiseInternal.cs b/Package/Core/Promises/Internal/PromiseInternal.cs index b67f0918..f7fe255d 100644 --- a/Package/Core/Promises/Internal/PromiseInternal.cs +++ b/Package/Core/Promises/Internal/PromiseInternal.cs @@ -393,13 +393,18 @@ protected PromiseRefBase() { } } [MethodImpl(InlineOption)] - protected void Reset() + protected void ResetWithoutStacktrace() { _next = PendingAwaitSentinel.s_instance; _state = Promise.State.Pending; _wasAwaitedorForgotten = false; _suppressRejection = false; + } + [MethodImpl(InlineOption)] + protected void Reset() + { + ResetWithoutStacktrace(); SetCreatedStacktrace(this, 3); } @@ -422,6 +427,12 @@ private void Dispose() throw new System.InvalidOperationException("Promise disposed while pending: " + this); } #endif + IncrementPromiseIdAndClearPrevious(); + } + + [MethodImpl(InlineOption)] + protected void IncrementPromiseIdAndClearPrevious() + { IncrementPromiseId(); #if PROMISE_DEBUG _previous = null; diff --git a/Package/Tests/CoreTests/APIs/Linq.meta b/Package/Tests/CoreTests/APIs/Linq.meta new file mode 100644 index 00000000..619a3e2a --- /dev/null +++ b/Package/Tests/CoreTests/APIs/Linq.meta @@ -0,0 +1,8 @@ +fileFormatVersion: 2 +guid: 75782efb025199f4194a96e0d9bd93a7 +folderAsset: yes +DefaultImporter: + externalObjects: {} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Package/Tests/CoreTests/APIs/Linq/AsyncEnumerableTests.cs b/Package/Tests/CoreTests/APIs/Linq/AsyncEnumerableTests.cs new file mode 100644 index 00000000..0bf428d6 --- /dev/null +++ b/Package/Tests/CoreTests/APIs/Linq/AsyncEnumerableTests.cs @@ -0,0 +1,509 @@ +#if NET47_OR_GREATER || NETSTANDARD2_1_OR_GREATER || NETCOREAPP || UNITY_2021_2_OR_NEWER + +using NUnit.Framework; +using Proto.Promises; +using Proto.Promises.Linq; +using System; +using System.Threading; + +namespace ProtoPromiseTests.APIs +{ + public class AsyncEnumerableTests + { + [SetUp] + public void Setup() + { + TestHelper.Setup(); + } + + [TearDown] + public void Teardown() + { + TestHelper.Cleanup(); + } + + [Test] + public void AsyncEnumerableCompletesSynchronouslyWithValues( + [Values(0, 1, 2, 10)] int yieldCount) + { + var enumerable = AsyncEnumerable.Create(async (writer, _) => + { + for (int i = 0; i < yieldCount; i++) + { + await writer.YieldAsync(i); + } + }); + + Promise.Run(async () => + { + int count = 0; + + await foreach (var item in enumerable) + { + Assert.AreEqual(count, item); + ++count; + } + + Assert.AreEqual(yieldCount, count); + }, SynchronizationOption.Synchronous) + .WaitWithTimeout(TimeSpan.FromSeconds(1)); + } + + [Test] + public void AsyncEnumerableProducesCorrectValues( + [Values(0, 1, 2, 10)] int yieldCount, + [Values] bool iteratorIsAsync, + [Values] bool consumerIsAsync) + { + var deferred = Promise.NewDeferred(); + bool runnerIsComplete = false; + + var enumerable = AsyncEnumerable.Create(async (writer, _) => + { + if (iteratorIsAsync) + { + await deferred.Promise; + } + for (int i = 0; i < yieldCount; i++) + { + await writer.YieldAsync(i); + if (iteratorIsAsync) + { + await deferred.Promise; + } + } + }); + + int count = 0; + + var runner = Promise.Run(async () => + { + await foreach (var item in enumerable) + { + Assert.AreEqual(count, item); + ++count; + if (consumerIsAsync) + { + await deferred.Promise; + } + } + if (consumerIsAsync) + { + await deferred.Promise; + } + + Assert.AreEqual(yieldCount, count); + runnerIsComplete = true; + }, SynchronizationOption.Synchronous); + + Assert.AreNotEqual(iteratorIsAsync || consumerIsAsync, runnerIsComplete); + int awaitCount = iteratorIsAsync && consumerIsAsync ? yieldCount * 2 + : iteratorIsAsync || consumerIsAsync ? yieldCount + : 0; + for (int i = 0; i < awaitCount; i++) + { + var def = deferred; + deferred = Promise.NewDeferred(); + def.Resolve(); + } + + if (iteratorIsAsync) + { + Assert.False(runnerIsComplete); + var def = deferred; + deferred = Promise.NewDeferred(); + def.Resolve(); + } + Assert.AreNotEqual(consumerIsAsync, runnerIsComplete); + deferred.Resolve(); + Assert.True(runnerIsComplete); + + runner + .WaitWithTimeout(TimeSpan.FromSeconds(1)); + + if (deferred.IsValid) + { + deferred.Promise.Forget(); + } + } + + [Test] + public void AsyncEnumerableSynchronousEarlyExit( + [Values(0, 1, 2, 10)] int yieldCount) + { + bool didRunFinallyBlock = false; + var enumerable = AsyncEnumerable.Create(async (writer, _) => + { + try + { + for (int i = 0; i < yieldCount; i++) + { + await writer.YieldAsync(i); + } + } + finally + { + didRunFinallyBlock = true; + } + }); + + Promise.Run(async () => + { + int count = 0; + + await foreach (var item in enumerable) + { + Assert.AreEqual(count, item); + ++count; + break; + } + + Assert.LessOrEqual(count, 1); + }, SynchronizationOption.Synchronous) + .WaitWithTimeout(TimeSpan.FromSeconds(1)); + + Assert.True(didRunFinallyBlock); + } + + [Test] + public void AsyncEnumerableAsynchronousEarlyExit( + [Values(0, 1, 2, 10)] int yieldCount) + { + var deferred = Promise.NewDeferred(); + bool didStartFinallyBlock = false; + bool didCompleteFinallyBlock = false; + bool runnerIsComplete = false; + + var enumerable = AsyncEnumerable.Create(async (writer, _) => + { + try + { + await deferred.Promise; + for (int i = 0; i < yieldCount; i++) + { + await writer.YieldAsync(i); + await deferred.Promise; + } + } + finally + { + didStartFinallyBlock = true; + await deferred.Promise; + didCompleteFinallyBlock = true; + } + }); + + int count = 0; + + var runner = Promise.Run(async () => + { + await foreach (var item in enumerable) + { + Assert.AreEqual(count, item); + ++count; + break; + } + + Assert.LessOrEqual(count, 1); + runnerIsComplete = true; + }, SynchronizationOption.Synchronous); + + Assert.False(runnerIsComplete); + Assert.False(didStartFinallyBlock); + var def = deferred; + deferred = Promise.NewDeferred(); + def.Resolve(); + + Assert.False(runnerIsComplete); + Assert.False(didCompleteFinallyBlock); + Assert.True(didStartFinallyBlock); + deferred.Resolve(); + Assert.True(runnerIsComplete); + + runner + .WaitWithTimeout(TimeSpan.FromSeconds(1)); + + Assert.True(didCompleteFinallyBlock); + } + + [Test] + public void AsyncEnumerableDisposeAsyncEnumeratorWithoutIterating() + { + bool didIterate = false; + var enumerable = AsyncEnumerable.Create(async (writer, _) => + { + didIterate = true; + await writer.YieldAsync(42); + }); + + bool runnerIsComplete = false; + int count = 0; + + var runner = Promise.Run(async () => + { + var enumerator = enumerable.GetAsyncEnumerator(); + await enumerator.DisposeAsync(); + + Assert.AreEqual(0, count); + runnerIsComplete = true; + }, SynchronizationOption.Synchronous); + + Assert.True(runnerIsComplete); + Assert.False(didIterate); + + runner + .WaitWithTimeout(TimeSpan.FromSeconds(1)); + } + + [Test] + public void AsyncEnumerableRespectsCancelationToken( + [Values] bool iteratorIsAsync, + [Values] bool consumerIsAsync) + { + const int yieldCount = 10; + var cancelationSource = CancelationSource.New(); + var deferred = Promise.NewDeferred(); + bool runnerIsComplete = false; + + var enumerable = AsyncEnumerable.Create(async (writer, cancelationToken) => + { + cancelationToken.ThrowIfCancelationRequested(); + if (iteratorIsAsync) + { + await deferred.Promise.WaitAsync(cancelationToken); + } + for (int i = 0; i < yieldCount; i++) + { + await writer.YieldAsync(i); + cancelationToken.ThrowIfCancelationRequested(); + if (iteratorIsAsync) + { + await deferred.Promise.WaitAsync(cancelationToken); + } + } + }); + + int count = 0; + + var runner = Promise.Run(async () => + { + try + { + await foreach (var item in enumerable.WithCancelation(cancelationSource.Token)) + { + Assert.AreEqual(count, item); + ++count; + if (consumerIsAsync) + { + await deferred.Promise; + } + if (count == 2) + { + cancelationSource.Cancel(); + } + } + } + catch (OperationCanceledException) + { + if (consumerIsAsync) + { + await deferred.Promise; + } + Assert.AreEqual(2, count); + runnerIsComplete = true; + } + }, SynchronizationOption.Synchronous); + + Assert.AreNotEqual(iteratorIsAsync || consumerIsAsync, runnerIsComplete); + int awaitCount = iteratorIsAsync && consumerIsAsync ? 4 + : iteratorIsAsync || consumerIsAsync ? 2 + : 0; + for (int i = 0; i < awaitCount; i++) + { + var def = deferred; + deferred = Promise.NewDeferred(); + def.Resolve(); + } + + Assert.AreNotEqual(consumerIsAsync, runnerIsComplete); + deferred.Resolve(); + Assert.True(runnerIsComplete); + + runner + .WaitWithTimeout(TimeSpan.FromSeconds(1)); + + if (deferred.IsValid) + { + deferred.Promise.Forget(); + } + cancelationSource.Dispose(); + } + + [Test] + public void AsyncEnumerableRespectsConfigureAwait( + [Values(0, 1, 2, 10)] int yieldCount, + [Values] bool iteratorIsAsync, + [Values] bool consumerIsAsync, + [Values] SynchronizationType synchronizationType) + { + var foregroundThread = Thread.CurrentThread; + + bool didAwaitDeferred = false; + var deferred = Promise.NewDeferred(); + bool runnerIsComplete = false; + + var enumerable = AsyncEnumerable.Create(async (writer, _) => + { + if (iteratorIsAsync) + { + var promise = deferred.Promise; + didAwaitDeferred = true; + await promise; + } + if (synchronizationType != SynchronizationType.Synchronous) + { + await Promise.SwitchToContextAwait(synchronizationType == SynchronizationType.Background ? TestHelper._foregroundContext : TestHelper._backgroundContext); + } + for (int i = 0; i < yieldCount; i++) + { + await writer.YieldAsync(i); + if (iteratorIsAsync) + { + var promise = deferred.Promise; + didAwaitDeferred = true; + await promise; + } + if (synchronizationType != SynchronizationType.Synchronous) + { + await Promise.SwitchToContextAwait(synchronizationType == SynchronizationType.Background ? TestHelper._foregroundContext : TestHelper._backgroundContext); + } + } + }); + + int count = 0; + + var runner = Promise.Run(async () => + { + var configuredEnumerable = synchronizationType == SynchronizationType.Explicit + ? enumerable.ConfigureAwait(TestHelper._foregroundContext) + : enumerable.ConfigureAwait((SynchronizationOption) synchronizationType); + await foreach (var item in configuredEnumerable) + { + TestHelper.AssertCallbackContext(synchronizationType, SynchronizationType.Foreground, foregroundThread); + Assert.AreEqual(count, item); + ++count; + if (consumerIsAsync) + { + var promise = deferred.Promise; + didAwaitDeferred = true; + await promise; + } + } + TestHelper.AssertCallbackContext(synchronizationType, SynchronizationType.Foreground, foregroundThread); + if (consumerIsAsync) + { + var promise = deferred.Promise; + didAwaitDeferred = true; + await promise; + } + + Assert.AreEqual(yieldCount, count); + runnerIsComplete = true; + }, SynchronizationOption.Synchronous); + + if (iteratorIsAsync || consumerIsAsync) + { + if (!SpinWait.SpinUntil(() => + { + TestHelper.ExecuteForegroundCallbacks(); + return didAwaitDeferred; + }, TimeSpan.FromSeconds(1))) + { + throw new TimeoutException(); + } + Assert.False(runnerIsComplete); + } + else + { + if (!SpinWait.SpinUntil(() => + { + TestHelper.ExecuteForegroundCallbacks(); + return runnerIsComplete; + }, TimeSpan.FromSeconds(1))) + { + throw new TimeoutException(); + } + } + int awaitCount = iteratorIsAsync && consumerIsAsync ? yieldCount * 2 + : iteratorIsAsync || consumerIsAsync ? yieldCount + : 0; + for (int i = 0; i < awaitCount; i++) + { + if (!SpinWait.SpinUntil(() => + { + TestHelper.ExecuteForegroundCallbacks(); + return didAwaitDeferred; + }, TimeSpan.FromSeconds(1))) + { + throw new TimeoutException(); + } + didAwaitDeferred = false; + var def = deferred; + deferred = Promise.NewDeferred(); + def.Resolve(); + } + + if (iteratorIsAsync) + { + if (!SpinWait.SpinUntil(() => + { + TestHelper.ExecuteForegroundCallbacks(); + return didAwaitDeferred; + }, TimeSpan.FromSeconds(1))) + { + throw new TimeoutException(); + } + didAwaitDeferred = false; + Assert.False(runnerIsComplete); + var def = deferred; + deferred = Promise.NewDeferred(); + def.Resolve(); + } + if (consumerIsAsync) + { + if (!SpinWait.SpinUntil(() => + { + TestHelper.ExecuteForegroundCallbacks(); + return didAwaitDeferred; + }, TimeSpan.FromSeconds(1))) + { + throw new TimeoutException(); + } + Assert.False(runnerIsComplete); + } + else + { + if (!SpinWait.SpinUntil(() => + { + TestHelper.ExecuteForegroundCallbacks(); + return runnerIsComplete; + }, TimeSpan.FromSeconds(1))) + { + throw new TimeoutException(); + } + } + deferred.Resolve(); + Assert.True(runnerIsComplete); + + runner + .WaitWithTimeout(TimeSpan.FromSeconds(1)); + + if (deferred.IsValid) + { + deferred.Promise.Forget(); + } + } + } +} + +#endif \ No newline at end of file diff --git a/Package/Tests/CoreTests/APIs/Linq/AsyncEnumerableTests.cs.meta b/Package/Tests/CoreTests/APIs/Linq/AsyncEnumerableTests.cs.meta new file mode 100644 index 00000000..c2df887b --- /dev/null +++ b/Package/Tests/CoreTests/APIs/Linq/AsyncEnumerableTests.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: c948913bd492440439af3fc1620e2bf6 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: