Skip to content

Commit

Permalink
Thread safety in ObservableCache and SourceList. Fixes #376. (#377)
Browse files Browse the repository at this point in the history
  • Loading branch information
pmg23 authored Jun 27, 2020
1 parent d213d2e commit 63960b0
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 38 deletions.
70 changes: 36 additions & 34 deletions src/DynamicData/Cache/ObservableCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ internal sealed class ObservableCache<TObject, TKey> : IObservableCache<TObject,
private readonly ReaderWriter<TObject, TKey> _readerWriter;
private readonly IDisposable _cleanUp;
private readonly object _locker = new object();
private readonly object _writeLock = new object();

private int _editLevel; // The level of recursion in editing.

Expand Down Expand Up @@ -80,7 +79,7 @@ internal void UpdateFromIntermediate(Action<ICacheUpdater<TObject, TKey>> update
throw new ArgumentNullException(nameof(updateAction));
}

lock (_writeLock)
lock (_locker)
{
ChangeSet<TObject, TKey> changes = null;

Expand Down Expand Up @@ -111,7 +110,7 @@ internal void UpdateFromSource(Action<ISourceUpdater<TObject, TKey>> updateActio
throw new ArgumentNullException(nameof(updateAction));
}

lock (_writeLock)
lock (_locker)
{
ChangeSet<TObject, TKey> changes = null;

Expand Down Expand Up @@ -162,50 +161,53 @@ private void InvokeNext(ChangeSet<TObject, TKey> changes)
}
}

public IObservable<int> CountChanged => _countChanged.Value.StartWith(_readerWriter.Count).DistinctUntilChanged();
public IObservable<int> CountChanged => Observable.Create<int>(observer =>
{
lock (_locker)
{
var source = _countChanged.Value.StartWith(_readerWriter.Count).DistinctUntilChanged();
return source.SubscribeSafe(observer);
}
});

public IObservable<Change<TObject, TKey>> Watch(TKey key)
public IObservable<Change<TObject, TKey>> Watch(TKey key) => Observable.Create<Change<TObject, TKey>>(observer =>
{
return Observable.Create<Change<TObject, TKey>>
(
observer =>
lock (_locker)
{
var initial = _readerWriter.Lookup(key);
if (initial.HasValue)
{
observer.OnNext(new Change<TObject, TKey>(ChangeReason.Add, key, initial.Value));
}

return _changes.Finally(observer.OnCompleted).Subscribe(changes =>
{
lock (_locker)
foreach (var change in changes)
{
var initial = _readerWriter.Lookup(key);
if (initial.HasValue)
var match = EqualityComparer<TKey>.Default.Equals(change.Key, key);
if (match)
{
observer.OnNext(new Change<TObject, TKey>(ChangeReason.Add, key, initial.Value));
observer.OnNext(change);
}

return _changes.Finally(observer.OnCompleted).Subscribe(changes =>
{
foreach (var change in changes)
{
var match = EqualityComparer<TKey>.Default.Equals(change.Key, key);
if (match)
{
observer.OnNext(change);
}
}
});
}
});
}
}
});

public IObservable<IChangeSet<TObject, TKey>> Connect(Func<TObject, bool> predicate = null)
public IObservable<IChangeSet<TObject, TKey>> Connect(Func<TObject, bool> predicate = null) => Observable.Create<IChangeSet<TObject, TKey>>(observer =>
{
return Observable.Defer(() =>
lock (_locker)
{
lock (_locker)
var initial = GetInitialUpdates(predicate);
if (initial.Count != 0)
{
var initial = GetInitialUpdates(predicate);
var changes = Observable.Return(initial).Concat(_changes);

return (predicate == null ? changes : changes.Filter(predicate)).NotEmpty();
observer.OnNext(initial);
}
});
}

var updateSource = (predicate == null ? _changes : _changes.Filter(predicate)).NotEmpty();
return updateSource.SubscribeSafe(observer);
}
});

public IObservable<IChangeSet<TObject, TKey>> Preview(Func<TObject, bool> predicate = null)
{
Expand Down
14 changes: 10 additions & 4 deletions src/DynamicData/List/SourceList.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ public sealed class SourceList<T> : ISourceList<T>
private readonly ReaderWriter<T> _readerWriter = new ReaderWriter<T>();
private readonly IDisposable _cleanUp;
private readonly object _locker = new object();
private readonly object _writeLock = new object();

private int _editLevel;

Expand All @@ -50,7 +49,7 @@ public SourceList(IObservable<IChangeSet<T>> source = null)

private IDisposable LoadFromSource(IObservable<IChangeSet<T>> source)
{
return source
return source.Synchronize(_locker)
.Finally(OnCompleted)
.Select(_readerWriter.Write)
.Subscribe(InvokeNext, OnError, OnCompleted);
Expand All @@ -64,7 +63,7 @@ public void Edit([NotNull] Action<IExtendedList<T>> updateAction)
throw new ArgumentNullException(nameof(updateAction));
}

lock (_writeLock)
lock (_locker)
{
IChangeSet<T> changes = null;

Expand Down Expand Up @@ -151,7 +150,14 @@ private void OnError(Exception exception)
public int Count => _readerWriter.Count;

/// <inheritdoc />
public IObservable<int> CountChanged => _countChanged.Value.StartWith(_readerWriter.Count).DistinctUntilChanged();
public IObservable<int> CountChanged => Observable.Create<int>(observer =>
{
lock (_locker)
{
var source = _countChanged.Value.StartWith(_readerWriter.Count).DistinctUntilChanged();
return source.SubscribeSafe(observer);
}
});

/// <inheritdoc />
public IObservable<IChangeSet<T>> Connect(Func<T, bool> predicate = null)
Expand Down

0 comments on commit 63960b0

Please sign in to comment.