Skip to content

Commit

Permalink
Add GetRefDataText by identifier. (#49)
Browse files Browse the repository at this point in the history
Improve ReferenceDataOrchestrator.
  • Loading branch information
chullybun authored Dec 9, 2022
1 parent 859404f commit 5dcb481
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 76 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

Represents the **NuGet** versions.

## v2.1.0
- *Enhancement:* Added additional `ReferenceDataBaseEx.GetRefDataText` method overload with a parameter of `id`; as an alternative to the pre-existing `code`.
- *Enhancement:* `ReferenceDataOrchestrator` caching supports virtual `OnGetCacheKey` to enable overridding classes to further define the key characteristics.

## v2.0.0
- *Enhancement:* Added support for [`MySQL`](https://dev.mysql.com/) through introduction of `MySqlDatabase` that supports similar pattern to `SqlServerDatabase`.
- *Enhancement:* Added new `EncodedStringToDateTimeConverter` to simulate row versioning from a `DateTime` (timestamp) as an alternative.
Expand Down
2 changes: 1 addition & 1 deletion Common.targets
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project>
<PropertyGroup>
<Version>2.0.0</Version>
<Version>2.1.0</Version>
<LangVersion>preview</LangVersion>
<Authors>Avanade</Authors>
<Company>Avanade</Company>
Expand Down
29 changes: 0 additions & 29 deletions CoreEx.Abstractions/CoreEx.Abstractions.csproj

This file was deleted.

1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Package | Status | Source & documentation
`CoreEx` | [![NuGet version](https://badge.fury.io/nu/CoreEx.svg)](https://badge.fury.io/nu/CoreEx) | [Link](./src/CoreEx)
`CoreEx.AutoMapper` | [![NuGet version](https://badge.fury.io/nu/CoreEx.AutoMapper.svg)](https://badge.fury.io/nu/CoreEx.AutoMapper) | [Link](./src/CoreEx.AutoMapper)
`CoreEx.Azure` | [![NuGet version](https://badge.fury.io/nu/CoreEx.Azure.svg)](https://badge.fury.io/nu/CoreEx.Azure) | [Link](./src/CoreEx.Azure)
`CoreEx.Cosmos` | [![NuGet version](https://badge.fury.io/nu/CoreEx.Cosmos.svg)](https://badge.fury.io/nu/CoreEx.Cosmos) | [Link](./src/CoreEx.Cosmos)
`CoreEx.Database` | [![NuGet version](https://badge.fury.io/nu/CoreEx.Database.svg)](https://badge.fury.io/nu/CoreEx.Database) | [Link](./src/CoreEx.Database)
`CoreEx.Database.SqlServer` | [![NuGet version](https://badge.fury.io/nu/CoreEx.Database.SqlServer.svg)](https://badge.fury.io/nu/CoreEx.Database.SqlServer) | [Link](./src/CoreEx.Database.SqlServer)
`CoreEx.Database.MySql` | [![NuGet version](https://badge.fury.io/nu/CoreEx.Database.MySql.svg)](https://badge.fury.io/nu/CoreEx.Database.MySql) | [Link](./src/CoreEx.Database.MySql)
Expand Down
8 changes: 8 additions & 0 deletions src/CoreEx/RefData/Extended/ReferenceDataBaseEx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,14 @@ protected override IEnumerable<IPropertyValue> GetPropertyValues()
/// <remarks>This is intended to be consumed by classes that wish to provide an opt-in serialization of corresponding <see cref="IReferenceData.Text"/>.</remarks>
public static string? GetRefDataText(string? code) => code != null && ExecutionContext.HasCurrent && ExecutionContext.Current.IsTextSerializationEnabled ? ConvertFromCode(code)?.Text : null;

/// <summary>
/// Gets the corresponding <see cref="IReferenceData.Text"/> for the specified <paramref name="id"/> where <see cref="ExecutionContext.IsTextSerializationEnabled"/> is <c>true</c>.
/// </summary>
/// <param name="id">The <see cref="IReferenceData"/> <see cref="IIdentifier{TId}.Id"/>.</param>
/// <returns>The <see cref="IReferenceData.Text"/>.</returns>
/// <remarks>This is intended to be consumed by classes that wish to provide an opt-in serialization of corresponding <see cref="IReferenceData.Text"/>.</remarks>
public static string? GetRefDataText(object? id) => id != null && ExecutionContext.HasCurrent && ExecutionContext.Current.IsTextSerializationEnabled ? ConvertFromId((TId)id)?.Text : null;

#region MappingsDictionary

/// <summary>
Expand Down
89 changes: 43 additions & 46 deletions src/CoreEx/RefData/ReferenceDataOrchestrator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ namespace CoreEx.RefData
/// Provides the centralized reference data orchestration. Primarily responsible for the management of one or more <see cref="IReferenceDataProvider"/> instances.
/// </summary>
/// <remarks>Provides <i>cached</i> access to the underlying reference data collections via the likes of <see cref="GetByTypeAsync{TRef}"/>, <see cref="GetByTypeAsync(Type, CancellationToken)"/> or <see cref="GetByNameAsync(string, CancellationToken)"/>.
/// <para>To improve performance the reference data should be cached where possible. The <see cref="ReferenceDataOrchestrator"/> supports a default <see cref="IMemoryCache"/> implementation; however, this can be overridden and/or extended by
/// overriding the <see cref="OnGetOrCreateAsync(Type, Func{Type, CancellationToken, Task{IReferenceDataCollection}}, CancellationToken)"/> method. The <i>create</i> is executed in the context of a <see cref="ServiceProviderServiceExtensions.CreateScope(IServiceProvider)"/>
/// to limit/minimize any impact on the processing of the current request by isolating all scoped services.</para></remarks>
/// By default, will include <see cref="ExecutionContext.Current"/> <see cref="ExecutionContext.TenantId"/> as part of the cache keu
/// <para>To improve performance the reference data is cached. The <see cref="ReferenceDataOrchestrator"/> enables this via an <see cref="IMemoryCache"/> implementation; default is <see cref="MemoryCache"/> where not explicitly specified.
/// The underlying reference data loading is executed in the context of a <see cref="ServiceProviderServiceExtensions.CreateScope(IServiceProvider)"/> to limit/minimize any impact on the processing of the current request by isolating all scoped services.</para></remarks>
public class ReferenceDataOrchestrator
{
/// <summary>
Expand All @@ -36,9 +36,8 @@ public class ReferenceDataOrchestrator
private readonly object _lock = new();
private readonly ConcurrentDictionary<Type, Type> _typeToProvider = new();
private readonly ConcurrentDictionary<string, Type> _nameToType = new(StringComparer.OrdinalIgnoreCase);
private readonly IMemoryCache? _cache;
private readonly SemaphoreSlim _primarySemaphore = new(1, 1);
private readonly ConcurrentDictionary<Type, SemaphoreSlim> _semaphores = new();
private readonly ConcurrentDictionary<object, SemaphoreSlim> _semaphores = new();
private readonly TimeSpan _absoluteExpirationRelativeToNow = TimeSpan.FromHours(2);
private readonly TimeSpan _slidingExpiration = TimeSpan.FromMinutes(30);
private readonly Lazy<ILogger> _logger;
Expand All @@ -53,11 +52,11 @@ public class ReferenceDataOrchestrator
/// Initializes a new instance of the <see cref="ReferenceDataOrchestrator"/> class.
/// </summary>
/// <param name="serivceProvider">The <see cref="IServiceProvider"/> needed to instantiated the registered providers.</param>
/// <param name="cache">The optional <see cref="IMemoryCache"/> to be used where not specifically overridden by <see cref="OnGetOrCreateAsync(Type, Func{Type, CancellationToken, Task{IReferenceDataCollection}}, CancellationToken)"/>.</param>
protected ReferenceDataOrchestrator(IServiceProvider serivceProvider, IMemoryCache? cache)
/// <param name="cache">The <see cref="IMemoryCache"/>.</param>
protected ReferenceDataOrchestrator(IServiceProvider serivceProvider, IMemoryCache cache)
{
ServiceProvider = serivceProvider ?? throw new ArgumentNullException(nameof(serivceProvider));
_cache = cache;
Cache = cache ?? throw new ArgumentNullException(nameof(cache));
_logger = new Lazy<ILogger>(ServiceProvider.GetRequiredService<ILogger<ReferenceDataOrchestrator>>);
}

Expand All @@ -81,6 +80,11 @@ protected ReferenceDataOrchestrator(IServiceProvider serivceProvider, IMemoryCac
/// </summary>
public IServiceProvider ServiceProvider { get; }

/// <summary>
/// Gets the underlying <see cref="IMemoryCache"/>.
/// </summary>
protected IMemoryCache Cache { get; }

/// <summary>
/// Gets the <see cref="ILogger"/>.
/// </summary>
Expand Down Expand Up @@ -177,7 +181,7 @@ public ReferenceDataOrchestrator Register<TProvider>() where TProvider : IRefere
/// </summary>
/// <param name="type">The <see cref="IReferenceData"/> <see cref="Type"/>.</param>
/// <returns>The corresponding <see cref="IReferenceDataCollection"/> where found; otherwise, <c>null</c>.</returns>
public IReferenceDataCollection? GetByType(Type type) => _cache.TryGetValue(type, out IReferenceDataCollection coll) ? coll : Invokers.Invoker.RunSync(() => GetByTypeAsync(type));
public IReferenceDataCollection? GetByType(Type type) => Cache.TryGetValue(OnGetCacheKey(type), out IReferenceDataCollection coll) ? coll : Invokers.Invoker.RunSync(() => GetByTypeAsync(type));

/// <summary>
/// Gets the <see cref="IReferenceDataCollection"/> for the specified <see cref="IReferenceData"/> <see cref="Type"/> (will throw <see cref="InvalidOperationException"/> where not found).
Expand All @@ -191,7 +195,7 @@ public ReferenceDataOrchestrator Register<TProvider>() where TProvider : IRefere
/// </summary>
/// <param name="type">The <see cref="IReferenceData"/> <see cref="Type"/>.</param>
/// <returns>The corresponding <see cref="IReferenceDataCollection"/> where found; otherwise, will throw an <see cref="InvalidOperationException"/>.</returns>
public IReferenceDataCollection GetByTypeRequired(Type type) => _cache.TryGetValue(type, out IReferenceDataCollection coll) ? coll : Invokers.Invoker.RunSync(() => GetByTypeRequiredAsync(type));
public IReferenceDataCollection GetByTypeRequired(Type type) => Cache.TryGetValue(OnGetCacheKey(type), out IReferenceDataCollection coll) ? coll : Invokers.Invoker.RunSync(() => GetByTypeRequiredAsync(type));

/// <summary>
/// Gets the <see cref="IReferenceDataCollection"/> for the specified <see cref="IReferenceData"/> <see cref="Type"/>.
Expand Down Expand Up @@ -256,48 +260,41 @@ public async Task<IReferenceDataCollection> GetByTypeRequiredAsync(Type type, Ca
/// sufficient. The <paramref name="getCollAsync"/> contains the logic to invoke the underlying <see cref="IReferenceDataProvider.GetAsync(Type, System.Threading.CancellationToken)"/>; this is executed in the context of a
/// <see cref="ServiceProviderServiceExtensions.CreateScope(IServiceProvider)"/> to limit/minimize any impact on the processing of the current request by isolating all scoped services. Additionally, semaphore locks are used to
/// manage concurrency to ensure cache loading is thread-safe.</remarks>
protected async virtual Task<IReferenceDataCollection> OnGetOrCreateAsync(Type type, Func<Type, CancellationToken, Task<IReferenceDataCollection>> getCollAsync, CancellationToken cancellationToken = default)
private async Task<IReferenceDataCollection> OnGetOrCreateAsync(Type type, Func<Type, CancellationToken, Task<IReferenceDataCollection>> getCollAsync, CancellationToken cancellationToken = default)
{
if (_cache == null)
return await getCollAsync(type, cancellationToken).ConfigureAwait(false);
else
{
// Try and get as most likely already in the cache; where exists then exit fast.
if (_cache.TryGetValue(type, out IReferenceDataCollection coll))
return coll;

// As the GetOrAdd is not thread-safe a primary semaphore is used to get the key-based semaphore.
SemaphoreSlim semaphore;
await _primarySemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
semaphore = _semaphores.GetOrAdd(type, _ => new SemaphoreSlim(1, 1));
}
finally
{
_primarySemaphore.Release();
}
// Try and get as most likely already in the cache; where exists then exit fast.
var key = OnGetCacheKey(type);
if (Cache.TryGetValue(key, out IReferenceDataCollection coll))
return coll;

// Where not found use a semaphore for the key (type) to ensure only a single task is retrieving.
await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
// Does a get or create as it may have been added as we went to lock.
coll = await _cache.GetOrCreateAsync(type, async entry =>
{
OnCreateCacheEntry(entry);
return await getCollAsync(type, cancellationToken).ConfigureAwait(false);
}).ConfigureAwait(false);
}
finally
{
semaphore.Release();
}
// Get or add a new semaphore for the cache key so we can manage single concurrency for that key only.
SemaphoreSlim semaphore = _semaphores.GetOrAdd(key, _ => new SemaphoreSlim(1, 1));

return coll;
// Use the semaphore to manage a single thread to perform the "expensive" get operation.
await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
// Does a get or create as it may have been added as we went to lock.
return await Cache.GetOrCreateAsync(key, async entry =>
{
OnCreateCacheEntry(entry);
return await getCollAsync(type, cancellationToken).ConfigureAwait(false);
}).ConfigureAwait(false);
}
finally
{
semaphore.Release();
}
}

/// <summary>
/// Gets the cache key to be used (defaults to <paramref name="type"/>).
/// </summary>
/// <param name="type">The <see cref="IReferenceData"/> <see cref="Type"/>.</param>
/// <returns>The cache key.</returns>
/// <remarks>To support the likes of multi-tenancy caching then the resulting cache key should be overridden to include the both the <see cref="ITenantId.TenantId"/> and <paramref name="type"/>.</remarks>
protected virtual object OnGetCacheKey(Type type) => type;

/// <summary>
/// Provides an opportunity to the maintain the <see cref="ICacheEntry"/> data prior to the cache <i>create</i> function being invoked (as a result of <see cref="OnGetOrCreateAsync(Type, Func{Type, CancellationToken, Task{IReferenceDataCollection}}, CancellationToken)"/>).
/// </summary>
Expand Down
55 changes: 55 additions & 0 deletions tests/CoreEx.Test/Framework/RefData/ReferenceDataTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,40 @@ public async Task Caching_Prefetch()
Assert.IsTrue(c!.ContainsId("BB"));
}

[Test]
public async Task Caching_Concurrency()
{
IServiceCollection sc = new ServiceCollection();
sc.AddLogging();
sc.AddJsonSerializer();
sc.AddExecutionContext();
sc.AddScoped<RefDataConcurrencyProvider>();
sc.AddReferenceDataOrchestrator<RefDataConcurrencyProvider>();
var sp = sc.BuildServiceProvider();

using var scope = sp.CreateScope();
var ec = scope.ServiceProvider.GetService<ExecutionContext>();

var colls = new IReferenceDataCollection?[5];

var tasks = new Task[5];
tasks[0] = Task.Run(() => colls[0] = ReferenceDataOrchestrator.Current.GetByType<RefData>());
tasks[1] = Task.Run(() => colls[1] = ReferenceDataOrchestrator.Current.GetByType<RefData>());
tasks[2] = Task.Run(() => colls[2] = ReferenceDataOrchestrator.Current.GetByType<RefData>());
tasks[3] = Task.Run(() => colls[3] = ReferenceDataOrchestrator.Current.GetByType<RefData>());
tasks[4] = Task.Run(() => colls[4] = ReferenceDataOrchestrator.Current.GetByType<RefData>());

await Task.WhenAll(tasks).ConfigureAwait(false);

for (int i = 0; i < colls.Length; i++)
{
Assert.IsNotNull(colls[i]);
Assert.AreEqual(2, colls[i]!.Count);
}

Assert.AreSame(colls[0], colls[4]); // First and last should be same object ref.
}

[Test]
public void Serialization_STJ_NoOrchestrator()
{
Expand Down Expand Up @@ -943,6 +977,27 @@ public async Task<IReferenceDataCollection> GetAsync(Type type, CancellationToke
}
}

public class RefDataConcurrencyProvider : IReferenceDataProvider
{
private int _count;

public Type[] Types => new Type[] { typeof(RefData) };

public async Task<IReferenceDataCollection> GetAsync(Type type, CancellationToken cancellationToken = default)
{
System.Diagnostics.Debug.WriteLine($"GetAsync=>Enter({_count})");
Interlocked.Increment(ref _count);
if (_count > 1)
//throw new InvalidOperationException("ReferenceData has loaded already; this should not occur as the ReferenceDataOrchestrator should ensure multi-load under concurrency does not occur.");
Assert.Fail("ReferenceData has loaded already; this should not occur as the ReferenceDataOrchestrator should ensure multi-load under concurrency does not occur.");

var coll = new RefDataCollection() { new RefData { Id = 1, Code = "A" }, new RefData { Id = 2, Code = "B" } };
await Task.Delay(100).ConfigureAwait(false);
System.Diagnostics.Debug.WriteLine($"GetAsync=>Exit({_count})");
return coll;
}
}

public class TestData : CoreEx.Entities.Extended.EntityBase
{
private int _id;
Expand Down

0 comments on commit 5dcb481

Please sign in to comment.