Skip to content

Commit

Permalink
Feature: TransformOnObservable Operator for SourceCache (reactivemarb…
Browse files Browse the repository at this point in the history
…les#841)

* TransformOnObservable operators
- Transforms each item in a changeset to an `IObservable<TOther>` and that observable is used to supply the latest value for the corresponding key in the resulting transformed changeset
* Improvements to OnItem* operators
- Overloads that take an Action that uses the Key
- Now use common implementation
- OnItemRemoved doesn't use special operator unless necessary
  • Loading branch information
dwcullop authored Jan 30, 2024
1 parent dac44e1 commit 38c6a38
Show file tree
Hide file tree
Showing 5 changed files with 404 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1585,15 +1585,27 @@ namespace DynamicData
public static System.IObservable<DynamicData.IChangeSet<TObject, TKey>> OnItemAdded<TObject, TKey>(this System.IObservable<DynamicData.IChangeSet<TObject, TKey>> source, System.Action<TObject> addAction)
where TObject : notnull
where TKey : notnull { }
public static System.IObservable<DynamicData.IChangeSet<TObject, TKey>> OnItemAdded<TObject, TKey>(this System.IObservable<DynamicData.IChangeSet<TObject, TKey>> source, System.Action<TObject, TKey> addAction)
where TObject : notnull
where TKey : notnull { }
public static System.IObservable<DynamicData.IChangeSet<TObject, TKey>> OnItemRefreshed<TObject, TKey>(this System.IObservable<DynamicData.IChangeSet<TObject, TKey>> source, System.Action<TObject> refreshAction)
where TObject : notnull
where TKey : notnull { }
public static System.IObservable<DynamicData.IChangeSet<TObject, TKey>> OnItemRefreshed<TObject, TKey>(this System.IObservable<DynamicData.IChangeSet<TObject, TKey>> source, System.Action<TObject, TKey> refreshAction)
where TObject : notnull
where TKey : notnull { }
public static System.IObservable<DynamicData.IChangeSet<TObject, TKey>> OnItemRemoved<TObject, TKey>(this System.IObservable<DynamicData.IChangeSet<TObject, TKey>> source, System.Action<TObject> removeAction, bool invokeOnUnsubscribe = true)
where TObject : notnull
where TKey : notnull { }
public static System.IObservable<DynamicData.IChangeSet<TObject, TKey>> OnItemRemoved<TObject, TKey>(this System.IObservable<DynamicData.IChangeSet<TObject, TKey>> source, System.Action<TObject, TKey> removeAction, bool invokeOnUnsubscribe = true)
where TObject : notnull
where TKey : notnull { }
public static System.IObservable<DynamicData.IChangeSet<TObject, TKey>> OnItemUpdated<TObject, TKey>(this System.IObservable<DynamicData.IChangeSet<TObject, TKey>> source, System.Action<TObject, TObject> updateAction)
where TObject : notnull
where TKey : notnull { }
public static System.IObservable<DynamicData.IChangeSet<TObject, TKey>> OnItemUpdated<TObject, TKey>(this System.IObservable<DynamicData.IChangeSet<TObject, TKey>> source, System.Action<TObject, TObject, TKey> updateAction)
where TObject : notnull
where TKey : notnull { }
public static System.IObservable<DynamicData.IChangeSet<TObject, TKey>> Or<TObject, TKey>(this DynamicData.IObservableList<DynamicData.IObservableCache<TObject, TKey>> sources)
where TObject : notnull
where TKey : notnull { }
Expand Down Expand Up @@ -1931,6 +1943,14 @@ namespace DynamicData
where TSource : notnull
where TSourceKey : notnull
where TCollection : System.Collections.Specialized.INotifyCollectionChanged, System.Collections.Generic.IEnumerable<TDestination> { }
public static System.IObservable<DynamicData.IChangeSet<TDestination, TKey>> TransformOnObservable<TSource, TKey, TDestination>(this System.IObservable<DynamicData.IChangeSet<TSource, TKey>> source, System.Func<TSource, System.IObservable<TDestination>> transformFactory)
where TSource : notnull
where TKey : notnull
where TDestination : notnull { }
public static System.IObservable<DynamicData.IChangeSet<TDestination, TKey>> TransformOnObservable<TSource, TKey, TDestination>(this System.IObservable<DynamicData.IChangeSet<TSource, TKey>> source, System.Func<TSource, TKey, System.IObservable<TDestination>> transformFactory)
where TSource : notnull
where TKey : notnull
where TDestination : notnull { }
public static System.IObservable<DynamicData.IChangeSet<TDestination, TKey>> TransformSafe<TDestination, TSource, TKey>(this System.IObservable<DynamicData.IChangeSet<TSource, TKey>> source, System.Func<TSource, TDestination> transformFactory, System.Action<DynamicData.Kernel.Error<TSource, TKey>> errorHandler, System.IObservable<System.Reactive.Unit> forceTransform)
where TDestination : notnull
where TSource : notnull
Expand Down
164 changes: 164 additions & 0 deletions src/DynamicData.Tests/Cache/TransformOnObservableFixture.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
using System;
using System.Linq;
using System.Reactive;
using System.Reactive.Linq;
using System.Threading.Tasks;
using Bogus;
using DynamicData.Kernel;
using DynamicData.Tests.Domain;
using DynamicData.Tests.Utilities;

using FluentAssertions;

using Xunit;

namespace DynamicData.Tests.Cache;

public class TransformOnObservableFixture : IDisposable
{
#if DEBUG
private const int InitialCount = 7;
private const int AddCount = 5;
private const int RemoveCount = 3;
private const int UpdateCount = 2;
#else
private const int InitialCount = 103;
private const int AddCount = 53;
private const int RemoveCount = 37;
private const int UpdateCount = 31;
#endif
private static readonly TimeSpan UpdateTime = TimeSpan.FromMilliseconds(50);

private readonly ISourceCache<Animal, int> _animalCache = new SourceCache<Animal, int>(a => a.Id);
private readonly ChangeSetAggregator<Animal, int> _animalResults;
private readonly Faker<Animal> _animalFaker;
private readonly Randomizer _randomizer = new (0x2112_2112);

public TransformOnObservableFixture()
{
_animalFaker = Fakers.Animal.Clone().WithSeed(_randomizer);
_animalCache.AddOrUpdate(_animalFaker.Generate(InitialCount));
_animalResults = _animalCache.Connect().AsAggregator();
}

[Fact]
public void ResultContainsAllInitialChildren()
{
// Arrange

// Act
using var results = _animalCache.Connect().TransformOnObservable((ani, id) => Observable.Return(ani.Name)).AsAggregator();

// Assert
_animalResults.Data.Count.Should().Be(InitialCount);
results.Data.Count.Should().Be(InitialCount);
results.Messages.Count.Should().Be(1, "The child observables fire on subscription so everything should appear as a single changeset");
}

[Fact]
public void ResultContainsAddedValues()
{
// Arrange
using var results = _animalCache.Connect().TransformOnObservable((ani, id) => Observable.Return(ani.Name)).AsAggregator();

// Act
_animalCache.AddOrUpdate(_animalFaker.Generate(AddCount));

// Assert
_animalResults.Data.Count.Should().Be(InitialCount + AddCount);
results.Data.Count.Should().Be(_animalResults.Data.Count);
results.Messages.Count.Should().Be(2, "Initial Adds and then the subsequent Additions should each be a single message");
}

[Fact]
public void ResultDoesNotContainRemovedValues()
{
// Arrange
using var results = _animalCache.Connect().TransformOnObservable((ani, id) => Observable.Return(ani.Name)).AsAggregator();

// Act
_animalCache.RemoveKeys(_randomizer.ListItems(_animalCache.Items.ToList(), RemoveCount).Select(a => a.Id));

// Assert
_animalResults.Data.Count.Should().Be(InitialCount - RemoveCount);
results.Data.Count.Should().Be(_animalResults.Data.Count);
results.Messages.Count.Should().Be(2, "1 for Adds and 1 for Removes");
}

[Fact]
public async Task ResultUpdatesOnFutureValues()
{
// Create an observable that fires a wrong value on an interval a fixed number of times
// then fires the expected value before completing
IObservable<string> CreateChildObs(Animal a, int id) =>
Observable.Interval(UpdateTime)
.Select(n => $"{a.Name}-{id}-{n}")
.Take(UpdateCount)
.Concat(Observable.Return(a.Name));

// Arrange
var shared = _animalCache.Connect().TransformOnObservable(CreateChildObs).Publish();
using var results = shared.AsAggregator();
var task = Task.Run(async () => await shared);
using var cleanup = shared.Connect();
_animalCache.Dispose();

// Act
await task;

// Assert
_animalResults.Data.Count.Should().Be(InitialCount);
results.Data.Count.Should().Be(_animalResults.Data.Count);
results.Summary.Overall.Adds.Should().Be(InitialCount);
results.Summary.Overall.Updates.Should().Be(InitialCount * UpdateCount, $"Each item should update {UpdateCount} times");
results.Messages.Count.Should().BeGreaterThanOrEqualTo(1, "The delay may cause the messages to appear as multiple changesets");
_animalCache.Items.ForEach(animal => results.Data.Lookup(animal.Id).Should().Be(Optional.Some(animal.Name)));
}

[Theory]
[InlineData(false, false)]
[InlineData(false, true)]
[InlineData(true, false)]
[InlineData(true, true)]
public void ResultCompletesOnlyWhenSourceAndAllChildrenComplete(bool completeSource, bool completeChildren)
{
IObservable<string> CreateChildObs(Animal a, int id) =>
completeChildren
? Observable.Return(a.Name)
: Observable.Return(a.Name).Concat(Observable.Never<string>());

// Arrange
using var results = _animalCache.Connect().TransformOnObservable(CreateChildObs).AsAggregator();

// Act
if (completeSource)
{
_animalCache.Dispose();
}

// Assert
_animalResults.IsCompleted.Should().Be(completeSource);
results.IsCompleted.Should().Be(completeSource && completeChildren);
}

[Fact]
public void ResultFailsIfSourceFails()
{
// Arrange
var expectedError = new Exception("Expected");
var throwObservable = Observable.Throw<IChangeSet<Animal, int>>(expectedError);
using var results = _animalCache.Connect().Concat(throwObservable).TransformOnObservable(animal => Observable.Return(animal)).AsAggregator();

// Act
_animalCache.Dispose();

// Assert
results.Error.Should().Be(expectedError);
}

public void Dispose()
{
_animalCache.Dispose();
_animalResults.Dispose();
}
}
12 changes: 4 additions & 8 deletions src/DynamicData/Cache/Internal/OnBeingRemoved.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@

namespace DynamicData.Cache.Internal;

internal sealed class OnBeingRemoved<TObject, TKey>(IObservable<IChangeSet<TObject, TKey>> source, Action<TObject> removeAction, bool invokeOnUnsubscribe)
internal sealed class OnBeingRemoved<TObject, TKey>(IObservable<IChangeSet<TObject, TKey>> source, Action<TObject, TKey> removeAction)
where TObject : notnull
where TKey : notnull
{
private readonly Action<TObject> _removeAction = removeAction ?? throw new ArgumentNullException(nameof(removeAction));
private readonly Action<TObject, TKey> _removeAction = removeAction ?? throw new ArgumentNullException(nameof(removeAction));
private readonly IObservable<IChangeSet<TObject, TKey>> _source = source ?? throw new ArgumentNullException(nameof(source));

public IObservable<IChangeSet<TObject, TKey>> Run() => Observable.Create<IChangeSet<TObject, TKey>>(
Expand All @@ -30,11 +30,7 @@ public IObservable<IChangeSet<TObject, TKey>> Run() => Observable.Create<IChange

lock (locker)
{
if (invokeOnUnsubscribe)
{
cache.Items.ForEach(t => _removeAction(t));
}

cache.KeyValues.ForEach(kvp => _removeAction(kvp.Value, kvp.Key));
cache.Clear();
}
});
Expand All @@ -49,7 +45,7 @@ private void RegisterForRemoval(IChangeSet<TObject, TKey> changes, Cache<TObject
{
case ChangeReason.Remove:
// ReSharper disable once InconsistentlySynchronizedField
_removeAction(change.Current);
_removeAction(change.Current, change.Key);
break;
}
});
Expand Down
65 changes: 65 additions & 0 deletions src/DynamicData/Cache/Internal/TransformOnObservable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) 2011-2023 Roland Pheasant. All rights reserved.
// Roland Pheasant licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.

using System.Reactive.Disposables;
using System.Reactive.Linq;
using DynamicData.Internal;
using DynamicData.Kernel;

namespace DynamicData.Cache.Internal;

internal sealed class TransformOnObservable<TSource, TKey, TDestination>(IObservable<IChangeSet<TSource, TKey>> source, Func<TSource, TKey, IObservable<TDestination>> transform)
where TSource : notnull
where TKey : notnull
where TDestination : notnull
{
public IObservable<IChangeSet<TDestination, TKey>> Run() => Observable.Create<IChangeSet<TDestination, TKey>>(observer =>
{
var cache = new ChangeAwareCache<TDestination, TKey>();
var locker = new object();
var pendingUpdates = 0;

// Helper to emit any pending changes when all the updates have been handled
void EmitChanges()
{
if (Interlocked.Decrement(ref pendingUpdates) == 0)
{
var changes = cache!.CaptureChanges();
if (changes.Count > 0)
{
observer.OnNext(changes);
}
}
}

// Create the sub-observable that takes the result of the transformation,
// filters out unchanged values, and then updates the cache
IObservable<TDestination> CreateSubObservable(TSource obj, TKey key) =>
transform(obj, key)
.DistinctUntilChanged()
.Do(_ => Interlocked.Increment(ref pendingUpdates))
.Synchronize(locker!)
.Do(val => cache!.AddOrUpdate(val, key));

// Always increment the counter OUTSIDE of the lock to signal any thread currently holding the lock
// to not emit the changeset because more changes are incoming.
var shared = source
.Do(_ => Interlocked.Increment(ref pendingUpdates))
.Synchronize(locker!)
.Publish();

// Use MergeMany because it automatically handles Add/Update/Remove and OnCompleted/OnError correctly
var subMerged = shared
.MergeMany(CreateSubObservable)
.SubscribeSafe(_ => EmitChanges(), observer.OnError, observer.OnCompleted);

// Subscribe to the shared Observable to handle Remove events. MergeMany will unsubscribe from the sub-observable,
// but the corresponding key value needs to be removed from the Cache so the remove is observed downstream.
var subRemove = shared
.OnItemRemoved((_, key) => cache!.Remove(key), invokeOnUnsubscribe: false)
.SubscribeSafe(_ => EmitChanges());

return new CompositeDisposable(shared.Connect(), subMerged, subRemove);
});
}
Loading

0 comments on commit 38c6a38

Please sign in to comment.