Skip to content

Commit

Permalink
Add dedicated watch stream to track cancellations
Browse files Browse the repository at this point in the history
  • Loading branch information
glopesdev committed Sep 3, 2024
1 parent b6a2eaf commit 3416317
Show file tree
Hide file tree
Showing 7 changed files with 465 additions and 58 deletions.
77 changes: 63 additions & 14 deletions Bonsai.Core/Expressions/InspectBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
namespace Bonsai.Expressions
{
/// <summary>
/// Represents an expression builder that replays the latest notification from all the
/// Represents an expression builder that monitors the notifications from all the
/// subscriptions made to its decorated builder.
/// </summary>
public sealed class InspectBuilder : ExpressionBuilder, INamedElement
Expand All @@ -23,7 +23,7 @@ public sealed class InspectBuilder : ExpressionBuilder, INamedElement
/// specified expression builder.
/// </summary>
/// <param name="builder">
/// The expression builder whose notifications will be replayed by this inspector.
/// The expression builder whose notifications will be monitored by this inspector.
/// </param>
public InspectBuilder(ExpressionBuilder builder)
: base(builder, decorator: false)
Expand Down Expand Up @@ -101,6 +101,12 @@ internal IReadOnlyList<VisualizerMapping> VisualizerMappings
/// </summary>
public IObservable<Exception> ErrorEx { get; private set; }

/// <summary>
/// Gets an observable sequence that multicasts watch notifications from all
/// the subscriptions made to the output of the decorated expression builder.
/// </summary>
public IObservable<IObservable<WatchNotification>> Watch { get; private set; }

/// <summary>
/// Gets the range of input arguments that the decorated expression builder accepts.
/// </summary>
Expand Down Expand Up @@ -142,6 +148,7 @@ public override Expression Build(IEnumerable<Expression> arguments)
if (VisualizerElement != null)
{
Output = VisualizerElement.Output;
Watch = VisualizerElement.Watch;
ErrorEx = Observable.Empty<Exception>();
VisualizerElement = BuildVisualizerElement(VisualizerElement, VisualizerMappings);
return source;
Expand All @@ -162,6 +169,7 @@ public override Expression Build(IEnumerable<Expression> arguments)
else
{
Output = Observable.Empty<IObservable<object>>();
Watch = Observable.Empty<IObservable<WatchNotification>>();
ErrorEx = Observable.Empty<Exception>();
if (VisualizerElement != null)
{
Expand Down Expand Up @@ -241,17 +249,15 @@ methodCall.Arguments[0] is MethodCallExpression lazy &&
return null;
}

ReplaySubject<IObservable<TSource>> CreateInspectorSubject<TSource>()
ReplaySubject<Inspector<TSource>> CreateInspectorSubject<TSource>()
{
var subject = new ReplaySubject<IObservable<TSource>>(1);
Output = subject.Select(ys => ys.Select(xs => (object)xs));
var subject = new ReplaySubject<Inspector<TSource>>(1);
Output = subject.Select(ys => ys.Output);
#pragma warning disable CS0612 // Type or member is obsolete
Error = subject.Merge().IgnoreElements().Select(xs => Unit.Default);
Error = subject.SelectMany(ys => ys.Error);
#pragma warning restore CS0612 // Type or member is obsolete
ErrorEx = subject.SelectMany(xs => xs
.IgnoreElements()
.Select(x => default(Exception))
.Catch<Exception, Exception>(ex => Observable.Return(ex)));
ErrorEx = subject.SelectMany(xs => xs.ErrorEx);
Watch = subject.Select(xs => xs.Watch);
return subject;
}

Expand All @@ -260,13 +266,13 @@ IObservable<TSource> Process<TSource>(IObservable<TSource> source)
return source;
}

IObservable<TSource> Process<TSource>(IObservable<TSource> source, ReplaySubject<IObservable<TSource>> subject)
IObservable<TSource> Process<TSource>(IObservable<TSource> source, ReplaySubject<Inspector<TSource>> subject)
{
return Observable.Create<TSource>(observer =>
{
var sourceInspector = new Subject<TSource>();
var sourceInspector = new Inspector<TSource>();
subject.OnNext(sourceInspector);
var subscription = source.Do(sourceInspector).SubscribeSafe(observer);
var subscription = source.Do(sourceInspector.Subject).SubscribeSafe(observer);
return Disposable.Create(() =>
{
try { subscription.Dispose(); }
Expand All @@ -275,11 +281,54 @@ IObservable<TSource> Process<TSource>(IObservable<TSource> source, ReplaySubject
{
throw new WorkflowRuntimeException(ex.Message, this, ex);
}
finally { sourceInspector.OnCompleted(); }
finally
{
if (!sourceInspector.Subject.HasTerminated)
{
sourceInspector.IsCanceled = true;
sourceInspector.Subject.OnCompleted();
}
}
});
});
}

class Inspector<T>
{
public InspectSubject<T> Subject { get; } = new();

public bool IsCanceled { get; internal set; }

public IObservable<object> Output => Subject.Select(value => (object)value);

public IObservable<Unit> Error => Subject.IgnoreElements().Select(xs => Unit.Default);

public IObservable<Exception> ErrorEx => Subject
.IgnoreElements()
.Select(x => default(Exception))
.Catch<Exception, Exception>(ex => Observable.Return(ex));

public IObservable<WatchNotification> Watch =>
Observable.Create<WatchNotification>(observer =>
{
observer.OnNext(WatchNotification.Subscribe);
var notificationObserver = Observer.Create<T>(
value => observer.OnNext(WatchNotification.OnNext),
error =>
{
observer.OnNext(WatchNotification.OnError);
observer.OnNext(WatchNotification.Unsubscribe);
},
() =>
{
if (!IsCanceled)
observer.OnNext(WatchNotification.OnCompleted);
observer.OnNext(WatchNotification.Unsubscribe);
});
return Subject.SubscribeSafe(notificationObserver);
});
}

class VisualizerMappingList
{
readonly SortedList<int, VisualizerMapping> localMappings = new SortedList<int, VisualizerMapping>();
Expand Down
Loading

0 comments on commit 3416317

Please sign in to comment.