Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AsyncEnumerable #276

Merged
merged 8 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Package/Core/Cancelations/CancelationToken.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ partial struct CancelationToken : IRetainable, IEquatable<CancelationToken>
/// <summary>
/// Returns an empty <see cref="CancelationToken"/>.
/// </summary>
public static CancelationToken None { get { return default(CancelationToken); } }
public static CancelationToken None
{
[MethodImpl(Internal.InlineOption)]
get { return default(CancelationToken); }
}

/// <summary>
/// FOR INTERNAL USE ONLY!
Expand Down
8 changes: 8 additions & 0 deletions Package/Core/Linq.meta

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

171 changes: 171 additions & 0 deletions Package/Core/Linq/AsyncEnumerable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
using Proto.Promises.Async.CompilerServices;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;

#pragma warning disable IDE0090 // Use 'new(...)'

namespace Proto.Promises.Linq
{
#if NET47_OR_GREATER || NETSTANDARD2_1_OR_GREATER || NETCOREAPP || UNITY_2021_2_OR_NEWER
/// <summary>
/// Provides helper functions to create <see cref="AsyncEnumerable{T}"/> async streams.
/// </summary>
#if !PROTO_PROMISE_DEVELOPER_MODE
[DebuggerNonUserCode, StackTraceHidden]
#endif
public static class AsyncEnumerable
{
// We use AsyncEnumerableMethod instead of Promise so it can specially handle early-exits (`break` keyword).

/// <summary>
/// Create a new <see cref="AsyncEnumerable{T}"/> async stream from the specified <paramref name="asyncIterator"/> function.
/// </summary>
public static AsyncEnumerable<T> Create<T>(Func<AsyncStreamWriter<T>, CancelationToken, AsyncEnumerableMethod> asyncIterator)
{
var enumerable = Internal.AsyncEnumerableImpl<T, Internal.AsyncIterator<T>>.GetOrCreate(new Internal.AsyncIterator<T>(asyncIterator));
return new AsyncEnumerable<T>(enumerable);
}

/// <summary>
/// Create a new <see cref="AsyncEnumerable{T}"/> async stream from the specified <paramref name="captureValue"/> and <paramref name="asyncIterator"/> function.
/// </summary>
public static AsyncEnumerable<T> Create<T, TCapture>(TCapture captureValue, Func<TCapture, AsyncStreamWriter<T>, CancelationToken, AsyncEnumerableMethod> asyncIterator)
{
var enumerable = Internal.AsyncEnumerableImpl<T, Internal.AsyncIterator<T, TCapture>>.GetOrCreate(new Internal.AsyncIterator<T, TCapture>(captureValue, asyncIterator));
return new AsyncEnumerable<T>(enumerable);
}
}

/// <summary>
/// Exposes an enumerator that provides asynchronous iteration over values of a specified type.
/// An instance of this type may only be consumed once.
/// </summary>
#if !PROTO_PROMISE_DEVELOPER_MODE
[DebuggerNonUserCode, StackTraceHidden]
#endif
public readonly struct AsyncEnumerable<T> : IAsyncEnumerable<T>
{
private readonly Internal.PromiseRefBase.AsyncEnumerableBase<T> _target;
private readonly int _id;

/// <summary>
/// Gets whether this instance is valid for enumeration. Once enumeration has begun, this will return false.
/// </summary>
public bool IsValid
{
get
{
var target = _target;
return target != null && target.EnumerableId == _id;
}
}

[MethodImpl(Internal.InlineOption)]
internal AsyncEnumerable(Internal.PromiseRefBase.AsyncEnumerableBase<T> target)
{
_target = target;
_id = _target.EnumerableId;
}

/// <summary>
/// Returns an enumerator that iterates asynchronously through the collection.
/// </summary>
[MethodImpl(Internal.InlineOption)]
public AsyncEnumerator<T> GetAsyncEnumerator(CancelationToken cancelationToken)
=> _target.GetAsyncEnumerator(_id, cancelationToken);

/// <summary>
/// Returns an enumerator that iterates asynchronously through the collection.
/// </summary>
[MethodImpl(Internal.InlineOption)]
public AsyncEnumerator<T> GetAsyncEnumerator() => GetAsyncEnumerator(CancelationToken.None);

IAsyncEnumerator<T> IAsyncEnumerable<T>.GetAsyncEnumerator(CancellationToken cancellationToken) => GetAsyncEnumerator(cancellationToken.ToCancelationToken());

/// <summary>
/// Sets the <see cref="CancelationToken"/> to be passed to <see cref="AsyncEnumerable{T}.GetAsyncEnumerator(CancelationToken)"/> when iterating.
/// </summary>
/// <param name="cancelationToken">The cancelation token to use.</param>
/// <returns>The configured enumerable.</returns>
public ConfiguredAsyncEnumerable<T> WithCancelation(CancelationToken cancelationToken)
=> new ConfiguredAsyncEnumerable<T>(this, cancelationToken, Internal.SynchronizationOption.Synchronous, null, false);

/// <summary>
/// Configures how awaits on the promises returned from an async iteration will be performed.
/// </summary>
/// <param name="synchronizationOption">On which context the continuations will be executed.</param>
/// <param name="forceAsync">If true, forces the continuations to be invoked asynchronously. If <paramref name="synchronizationOption"/> is <see cref="SynchronizationOption.Synchronous"/>, this value will be ignored.</param>
/// <returns>The configured enumerable.</returns>
public ConfiguredAsyncEnumerable<T> ConfigureAwait(SynchronizationOption synchronizationOption, bool forceAsync = false)
=> new ConfiguredAsyncEnumerable<T>(this, CancelationToken.None, (Internal.SynchronizationOption) synchronizationOption, null, forceAsync);

/// <summary>
/// Configures how awaits on the promises returned from an async iteration will be performed.
/// </summary>
/// <param name="synchronizationContext">The context on which the continuations will be executed.</param>
/// <param name="forceAsync">If true, forces the continuations to be invoked asynchronously.</param>
/// <returns>The configured enumerable.</returns>
public ConfiguredAsyncEnumerable<T> ConfigureAwait(SynchronizationContext synchronizationContext, bool forceAsync = false)
=> new ConfiguredAsyncEnumerable<T>(this, CancelationToken.None, Internal.SynchronizationOption.Explicit, synchronizationContext, forceAsync);

#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member
// These methods exist to hide the built-in IAsyncEnumerable extension methods, and use promise-optimized implementations instead if they are used.
[EditorBrowsable(EditorBrowsableState.Never)]
public ConfiguredAsyncEnumerable<T> WithCancellation(CancellationToken cancellationToken)
=> WithCancelation(cancellationToken.ToCancelationToken());

[EditorBrowsable(EditorBrowsableState.Never)]
public ConfiguredAsyncEnumerable<T> ConfigureAwait(bool continueOnCapturedContext)
=> continueOnCapturedContext ? ConfigureAwait(SynchronizationContext.Current) : ConfigureAwait(SynchronizationOption.Background);
#pragma warning restore CS1591 // Missing XML comment for publicly visible type or member
}

/// <summary>
/// Supports a simple asynchronous iteration over a generic collection.
/// An instance of this type may only be consumed once.
/// </summary>
#if !PROTO_PROMISE_DEVELOPER_MODE
[DebuggerNonUserCode, StackTraceHidden]
#endif
public readonly struct AsyncEnumerator<T> : IAsyncEnumerator<T>
{
private readonly Internal.PromiseRefBase.AsyncEnumerableBase<T> _target;
private readonly int _id;

[MethodImpl(Internal.InlineOption)]
internal AsyncEnumerator(Internal.PromiseRefBase.AsyncEnumerableBase<T> target, int id)
{
_target = target;
_id = id;
}

/// <summary>
/// Gets the element in the collection at the current position of the enumerator.
/// </summary>
public T Current
{
[MethodImpl(Internal.InlineOption)]
get { return _target.GetCurrent(_id); }
}

/// <summary>
/// Advances the enumerator asynchronously to the next element of the collection.
/// </summary>
public Promise<bool> MoveNextAsync()
=> _target.MoveNextAsync(_id);

/// <summary>
/// Asynchronously releases resources used by this enumerator.
/// </summary>
public Promise DisposeAsync()
=> _target.DisposeAsync(_id);

System.Threading.Tasks.ValueTask<bool> IAsyncEnumerator<T>.MoveNextAsync() => MoveNextAsync();
System.Threading.Tasks.ValueTask IAsyncDisposable.DisposeAsync() => DisposeAsync();
}
#endif
}
11 changes: 11 additions & 0 deletions Package/Core/Linq/AsyncEnumerable.cs.meta

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions Package/Core/Linq/CompilerServices.meta

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

123 changes: 123 additions & 0 deletions Package/Core/Linq/CompilerServices/AsyncEnumerableMethodBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Security;

// AsyncMethodBuilderAttribute
#pragma warning disable 0436 // Type conflicts with imported type

namespace Proto.Promises.Async.CompilerServices
{
#if NET47_OR_GREATER || NETSTANDARD2_1_OR_GREATER || NETCOREAPP || UNITY_2021_2_OR_NEWER
/// <summary>
/// Type used to create an <see cref="Linq.AsyncEnumerable{T}"/> with <see cref="Linq.AsyncEnumerable.Create{T}(Func{AsyncStreamWriter{T}, CancelationToken, AsyncEnumerableMethod})"/>.
/// </summary>
/// <remarks>This type is intended for compiler use rather than use directly in code.</remarks>
#if !PROTO_PROMISE_DEVELOPER_MODE
[DebuggerNonUserCode, StackTraceHidden]
#endif
[AsyncMethodBuilder(typeof(AsyncEnumerableMethodBuilder))]
public readonly struct AsyncEnumerableMethod
{
internal readonly Promise _promise;

[MethodImpl(Internal.InlineOption)]
internal AsyncEnumerableMethod(Promise promise)
=> _promise = promise;
}

/// <summary>
/// Awaitable type used to wait for the consumer to move the async iterator forward.
/// </summary>
/// <remarks>This type is intended for compiler use rather than use directly in code.</remarks>
#if !PROTO_PROMISE_DEVELOPER_MODE
[DebuggerNonUserCode, StackTraceHidden]
#endif
public struct AsyncEnumerableMethodBuilder
{
private PromiseMethodBuilder _builder;

[MethodImpl(Internal.InlineOption)]
private AsyncEnumerableMethodBuilder(PromiseMethodBuilder builder)
=> _builder = builder;

/// <summary>Gets the <see cref="AsyncEnumerableMethod"/> for this builder.</summary>
/// <returns>The <see cref="AsyncEnumerableMethod"/> representing the builder's asynchronous operation.</returns>
public AsyncEnumerableMethod Task
{
[MethodImpl(Internal.InlineOption)]
get { return new AsyncEnumerableMethod(_builder.Task); }
}

/// <summary>Initializes a new <see cref="PromiseMethodBuilder"/>.</summary>
/// <returns>The initialized <see cref="PromiseMethodBuilder"/>.</returns>
[MethodImpl(Internal.InlineOption)]
public static AsyncEnumerableMethodBuilder Create()
=> new AsyncEnumerableMethodBuilder(PromiseMethodBuilder.Create());

/// <summary>
/// Completes the <see cref="Promise"/> in the <see cref="Promise.State">Rejected</see> state with the specified exception.
/// </summary>
/// <param name="exception">The <see cref="Exception"/> to use to reject the promise.</param>
public void SetException(Exception exception)
{
if (exception == AsyncEnumerableDisposedException.s_instance)
{
// The await foreach loop was stopped with a `break`.
SetResult();
}
else
{
_builder.SetException(exception);
}
}

/// <summary>
/// Completes the <see cref="Promise"/> in the <see cref="Promise.State">Resolved</see> state.
/// </summary>
[MethodImpl(Internal.InlineOption)]
public void SetResult()
=> _builder.SetResult();

/// <summary>
/// Schedules the specified state machine to be pushed forward when the specified awaiter completes.
/// </summary>
/// <typeparam name="TAwaiter">Specifies the type of the awaiter.</typeparam>
/// <typeparam name="TStateMachine">Specifies the type of the state machine.</typeparam>
/// <param name="awaiter">The awaiter.</param>
/// <param name="stateMachine">The state machine.</param>
[MethodImpl(Internal.InlineOption)]
public void AwaitOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine)
where TAwaiter : INotifyCompletion
where TStateMachine : IAsyncStateMachine
=> _builder.AwaitOnCompleted(ref awaiter, ref stateMachine);

/// <summary>
/// Schedules the specified state machine to be pushed forward when the specified awaiter completes.
/// </summary>
/// <typeparam name="TAwaiter">Specifies the type of the awaiter.</typeparam>
/// <typeparam name="TStateMachine">Specifies the type of the state machine.</typeparam>
/// <param name="awaiter">The awaiter.</param>
/// <param name="stateMachine">The state machine.</param>
[SecuritySafeCritical]
[MethodImpl(Internal.InlineOption)]
public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine)
where TAwaiter : ICriticalNotifyCompletion
where TStateMachine : IAsyncStateMachine
=> _builder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine);

/// <summary>Initiates the builder's execution with the associated state machine.</summary>
/// <typeparam name="TStateMachine">Specifies the type of the state machine.</typeparam>
/// <param name="stateMachine">The state machine instance, passed by reference.</param>
[MethodImpl(Internal.InlineOption)]
public void Start<TStateMachine>(ref TStateMachine stateMachine)
where TStateMachine : IAsyncStateMachine
=> _builder.Start(ref stateMachine);

/// <summary>Does nothing.</summary>
/// <param name="stateMachine">The heap-allocated state machine object.</param>
[MethodImpl(Internal.InlineOption)]
public void SetStateMachine(IAsyncStateMachine stateMachine) { }
}
#endif
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading