Skip to content

Commit

Permalink
Feature: Add FilterOnObservable for Cache ChangeSets (#735)
Browse files Browse the repository at this point in the history
Implemented FilterOnObservable and added extension methods.
  • Loading branch information
dwcullop authored Oct 19, 2023
1 parent 0be0d95 commit 61900ef
Show file tree
Hide file tree
Showing 4 changed files with 299 additions and 1 deletion.
208 changes: 208 additions & 0 deletions src/DynamicData.Tests/Cache/FilterOnObservableFixture.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using DynamicData.Kernel;
using DynamicData.Tests.Domain;
using FluentAssertions;
using Microsoft.Reactive.Testing;
using Xunit;

namespace DynamicData.Tests.Cache;

/// <summary>
/// Test Fixture for the FilterOnObservable extension method.
/// </summary>
public class FilterOnObservableFixture : IDisposable
{
private const int MagicNumber = 37;

private readonly ChangeSetAggregator<Person, string> _sourceResults;

private readonly ISourceCache<Person, string> _source;

public FilterOnObservableFixture()
{
_source = new SourceCache<Person, string>(p => p.Name);
_sourceResults = _source.Connect().AsAggregator();
}

[Fact]
public void FactoryIsInvoked()
{
// having
var invoked = false;
var val = -1;
IObservable<bool> factory(Person p)
{
invoked = true;
val = p.Age;
return Observable.Return(true);
}
using var sub = _source.Connect().FilterOnObservable(factory).Subscribe();

// when
AddPerson(MagicNumber);

// then
_sourceResults.Data.Count.Should().Be(1);
invoked.Should().BeTrue();
val.Should().Be(MagicNumber, "Was value added to cache");
Assert.Throws<ArgumentNullException>(() => _source.Connect().FilterOnObservable((Func<Person, IObservable<bool>>)null!));
}

[Fact]
public void FactoryWithKeyIsInvoked()
{
// having
var invoked = false;
var val = -1;
IObservable<bool> factory(Person p, string name)
{
invoked = true;
val = p.Age;
return Observable.Return(true);
}
using var sub = _source.Connect().FilterOnObservable(factory).Subscribe();

// when
AddPerson(MagicNumber);

// then
_sourceResults.Data.Count.Should().Be(1);
invoked.Should().BeTrue();
val.Should().Be(MagicNumber, "Was value added to cache");
Assert.Throws<ArgumentNullException>(() => _source.Connect().FilterOnObservable((Func<Person, string, IObservable<bool>>)null!));
Assert.Throws<ArgumentNullException>(() => ObservableCacheEx.FilterOnObservable(null!, (Func<Person, string, IObservable<bool>>)null!));
}

[Fact]
public void FilteredOutIfNoObservableValue()
{
// having
using var filterStats = _source.Connect().FilterOnObservable(p => Observable.Never<bool>()).AsAggregator();

// when
AddPeople(MagicNumber);

// then
_sourceResults.Data.Count.Should().Be(MagicNumber);
_sourceResults.Messages[0].Adds.Should().Be(MagicNumber);
_sourceResults.Messages.Count.Should().Be(1, "Should have all been added at once");
filterStats.Messages.Count.Should().Be(0, "All items should be filtered out");
}

[Fact]
public void ObservableFilterUsedToDetermineInclusion()
{
// having
Predicate<Person> predicate = p => p.Age % 2 == 0;
Func<Person, IObservable<bool>> filterFactory = p => Observable.Return(predicate(p));
var passCount = 0;
var failCount = 0;
using var filterStats = _source.Connect().FilterOnObservable(filterFactory).AsAggregator();

// when
AddPeople(MagicNumber).ForEach(p => _ = predicate(p) ? passCount++ : failCount++);

// then
_sourceResults.Data.Count.Should().Be(passCount + failCount);
filterStats.Data.Count.Should().Be(passCount);
}

[Fact]
public void ObservableFilterTriggersAddAndRemove()
{
// having
ISubject<bool> filterSubject = new Subject<bool>();

using var filterStats = _source.Connect().FilterOnObservable(_ => filterSubject).AsAggregator();

AddPeople(MagicNumber);

// when
filterSubject.OnNext(true);
filterSubject.OnNext(false);

// then
_sourceResults.Data.Count.Should().Be(MagicNumber);
_sourceResults.Messages.Count.Should().Be(1, "Should have all been added at once");
filterStats.Data.Count.Should().Be(0);
filterStats.Messages.Count.Should().Be(MagicNumber*2, "Each should be added and removed");
filterStats.Summary.Overall.Adds.Should().Be(MagicNumber);
filterStats.Summary.Overall.Removes.Should().Be(MagicNumber);
}

[Fact]
public void ObservableFilterDuplicateValuesHaveNoEffect()
{
// having
ISubject<bool> filterSubject = new Subject<bool>();

using var filterStats = _source.Connect().FilterOnObservable(_ => filterSubject).AsAggregator();

AddPeople(MagicNumber);

// when
filterSubject.OnNext(false);
filterSubject.OnNext(false);
filterSubject.OnNext(false);
filterSubject.OnNext(true);
filterSubject.OnNext(true);
filterSubject.OnNext(true);

// then
_sourceResults.Data.Count.Should().Be(MagicNumber);
_sourceResults.Messages.Count.Should().Be(1, "Should have all been added at once");
filterStats.Data.Count.Should().Be(MagicNumber);
filterStats.Messages.Count.Should().Be(MagicNumber, "Each should be added individually");
filterStats.Summary.Overall.Adds.Should().Be(MagicNumber);
}

[Fact]
public void ObservableFilterChangesCanBeBuffered()
{
// having
TestScheduler? scheduler = new TestScheduler();
ISubject<bool> filterSubject = new Subject<bool>();

using var filterStats = _source.Connect().FilterOnObservable(_ => filterSubject, TimeSpan.FromSeconds(1), scheduler).AsAggregator();

AddPeople(MagicNumber);

// when
filterSubject.OnNext(true);
scheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks);

// then
_sourceResults.Data.Count.Should().Be(MagicNumber);
_sourceResults.Messages.Count.Should().Be(1, "Should have all been added at once");
filterStats.Data.Count.Should().Be(MagicNumber);
filterStats.Messages.Count.Should().Be(1, "Should have all been added at once");
filterStats.Summary.Overall.Adds.Should().Be(MagicNumber);
}

private static Person NewPerson(int n) => new Person("Name" + n, n);

private IEnumerable<Person> AddPeople(int count)
{
var people = Enumerable.Range(0, count).Select(NewPerson).ToArray();
_source.AddOrUpdate(people);
return people;
}

private Person AddPerson(int n)
{
var p = NewPerson(n);
_source.AddOrUpdate(p);
return p;
}

public void Dispose()
{
_source.Dispose();
_sourceResults.Dispose();
}
}
49 changes: 49 additions & 0 deletions src/DynamicData/Cache/Internal/FilterOnObservable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (c) 2011-2023 Roland Pheasant. All rights reserved.
// Roland Pheasant licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.

using System.Reactive.Concurrency;
using System.Reactive.Linq;

namespace DynamicData.Cache.Internal;

internal class FilterOnObservable<TObject, TKey>
where TObject : notnull
where TKey : notnull
{
private readonly Func<TObject, TKey, IObservable<bool>> _filterFactory;
private readonly IObservable<IChangeSet<TObject, TKey>> _source;
private readonly TimeSpan? _buffer;
private readonly IScheduler? _scheduler;

public FilterOnObservable(IObservable<IChangeSet<TObject, TKey>> source, Func<TObject, TKey, IObservable<bool>> filterFactory, TimeSpan? buffer = null, IScheduler? scheduler = null)
{
_source = source ?? throw new ArgumentNullException(nameof(source));
_filterFactory = filterFactory ?? throw new ArgumentNullException(nameof(filterFactory));
_buffer = buffer;
_scheduler = scheduler;
}

public IObservable<IChangeSet<TObject, TKey>> Run()
{
return _source.Transform((val, key) => new FilterProxy(val, _filterFactory(val, key)))
.AutoRefreshOnObservable(proxy => proxy.FilterObservable, _buffer, _scheduler)
.Filter(proxy => proxy.PassesFilter)
.Transform(proxy => proxy.Value);
}

private class FilterProxy
{
public FilterProxy(TObject obj, IObservable<bool> observable)
{
Value = obj;
FilterObservable = observable.DistinctUntilChanged().Do(filterValue => PassesFilter = filterValue);
}

public IObservable<bool> FilterObservable { get; }

public TObject Value { get; }

public bool PassesFilter { get; private set; }
}
}
41 changes: 41 additions & 0 deletions src/DynamicData/Cache/ObservableCacheEx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1818,6 +1818,47 @@ public static IObservable<IChangeSet<TObject, TKey>> Filter<TObject, TKey>(this
return new DynamicFilter<TObject, TKey>(source, predicateChanged, reapplyFilter, suppressEmptyChangeSets).Run();
}

/// <summary>
/// Filters the stream of changes according to an Observable bool that is created for each item using the specified factory function.
/// </summary>
/// <typeparam name="TObject">The type of the object.</typeparam>
/// <typeparam name="TKey">The type of the key.</typeparam>
/// <param name="source">The source.</param>
/// <param name="filterFactory">Factory function used to create the observable bool that controls whether that given item passes the filter or not.</param>
/// <param name="buffer">Optional time to buffer changes from the observable bools.</param>
/// <param name="scheduler">Optional scheduler to use when buffering the changes.</param>
/// <returns>An observable changeset that only contains items whose corresponding observable bool has emitted true as its most recent value.</returns>
/// <exception cref="ArgumentNullException">One of the given parameters was null.</exception>
public static IObservable<IChangeSet<TObject, TKey>> FilterOnObservable<TObject, TKey>(this IObservable<IChangeSet<TObject, TKey>> source, Func<TObject, TKey, IObservable<bool>> filterFactory, TimeSpan? buffer = null, IScheduler? scheduler = null)
where TObject : notnull
where TKey : notnull
{
if (source is null) throw new ArgumentNullException(nameof(source));
if (filterFactory is null) throw new ArgumentNullException(nameof(filterFactory));

return new FilterOnObservable<TObject, TKey>(source, filterFactory, buffer, scheduler).Run();
}

/// <summary>
/// Filters the stream of changes according to an Observable bool that is created for each item using the specified factory function.
/// </summary>
/// <typeparam name="TObject">The type of the object.</typeparam>
/// <typeparam name="TKey">The type of the key.</typeparam>
/// <param name="source">The source.</param>
/// <param name="filterFactory">Factory function used to create the observable bool that controls whether that given item passes the filter or not.</param>
/// <param name="buffer">Optional time to buffer changes from the observable bools.</param>
/// <param name="scheduler">Optional scheduler to use when buffering the changes.</param>
/// <returns>An observable changeset that only contains items whose corresponding observable bool has emitted true as its most recent value.</returns>
/// <exception cref="ArgumentNullException">One of the given parameters was null.</exception>
public static IObservable<IChangeSet<TObject, TKey>> FilterOnObservable<TObject, TKey>(this IObservable<IChangeSet<TObject, TKey>> source, Func<TObject, IObservable<bool>> filterFactory, TimeSpan? buffer = null, IScheduler? scheduler = null)
where TObject : notnull
where TKey : notnull
{
if (filterFactory is null) throw new ArgumentNullException(nameof(filterFactory));

return source.FilterOnObservable((obj, _) => filterFactory(obj), buffer, scheduler);
}

/// <summary>
/// Filters source on the specified property using the specified predicate.
/// The filter will automatically reapply when a property changes.
Expand Down
2 changes: 1 addition & 1 deletion version.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"version": "8.0",
"version": "8.1",
"publicReleaseRefSpec": [
"^refs/heads/main$", // we release out of master
"^refs/heads/preview/.*", // we release previews
Expand Down

0 comments on commit 61900ef

Please sign in to comment.