Skip to content

Commit

Permalink
Add support for NpgsqlMultiHostDataSource
Browse files Browse the repository at this point in the history
  • Loading branch information
Hawxy authored and jeremydmiller committed May 8, 2024
1 parent ac3ecd7 commit c4f8294
Show file tree
Hide file tree
Showing 13 changed files with 333 additions and 8 deletions.
34 changes: 32 additions & 2 deletions docs/configuration/hostbuilder.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ services.AddMarten()
.UseLightweightSessions()
.UseNpgsqlDataSource();
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/CoreTests/MartenServiceCollectionExtensionsTests.cs#L291-L299' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_usenpgsqldatasource' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/CoreTests/MartenServiceCollectionExtensionsTests.cs#L292-L300' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_usenpgsqldatasource' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

If you're on .NET 8 (and above), you can also use a dedicated [keyed registration](https://learn.microsoft.com/en-us/dotnet/core/whats-new/dotnet-8#keyed-di-services). This can be useful for scenarios where you need more than one data source registered:
Expand All @@ -143,9 +143,39 @@ services.AddMarten()
.UseLightweightSessions()
.UseNpgsqlDataSource();
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/CoreTests/MartenServiceCollectionExtensionsTests.cs#L291-L299' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_usenpgsqldatasource' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/CoreTests/MartenServiceCollectionExtensionsTests.cs#L292-L300' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_usenpgsqldatasource' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Using a Multi-Host Data Source <Badge type="tip" text="7.11" />

Marten includes support for `NpgsqlMultiHostDataSource`, allowing you to spread queries over your read replicas, potentially improving throughput in read-heavy applications. To get started, your connection string should specify your primary host along a list of replicas, per [Npgsql documentation](https://www.npgsql.org/doc/failover-and-load-balancing.html).

Configuring `NpgsqlMultiHostDataSource` is very similar to a normal data source, simply swapping it for `AddMultiHostNpgsqlDataSource`. Marten will always use the primary node for queries with a `NpgsqlMultiHostDataSource` unless you explicitly opt to use the standby nodes. You can adjust what type of node Marten uses for querying via the `MultiHostSettings` store options:

<!-- snippet: sample_using_UseNpgsqlDataSourceMultiHost -->
<a id='snippet-sample_using_usenpgsqldatasourcemultihost'></a>
```cs
services.AddMultiHostNpgsqlDataSource(ConnectionSource.ConnectionString);

services.AddMarten(x =>
{
// Will prefer standby nodes for querying.
x.Advanced.MultiHostSettings.ReadSessionPreference = TargetSessionAttributes.PreferStandby;
})
.UseLightweightSessions()
.UseNpgsqlDataSource();
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/CoreTests/MartenServiceCollectionExtensionsTests.cs#L314-L326' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_usenpgsqldatasourcemultihost' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

::: warning
Marten will only use your read node preference with user queries (using IQuerySession) that are using a Marten-managed lifetime.

Internal queries, including the async daemon, will always use your primary node for reliability.

Ensure your replication delay is acceptable as you risk returning outdated queries.
:::

## Composite Configuration with ConfigureMarten()

The `AddMarten()` mechanism assumes that you are expressing all of the Marten configuration in one place and "know" what that configuration is upfront. Consider these possibilities where that isn't necessarily possible or desirable:
Expand Down
2 changes: 1 addition & 1 deletion docs/schema/migrations.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ services.AddMarten(opts =>
// database changes on application startup
.ApplyAllDatabaseChangesOnStartup();
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/CoreTests/MartenServiceCollectionExtensionsTests.cs#L149-L162' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_applyalldatabasechangesonstartup' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/CoreTests/MartenServiceCollectionExtensionsTests.cs#L150-L163' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_applyalldatabasechangesonstartup' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

In the option above, Marten is calling the same functionality within an `IHostedService` background task.
Expand Down
31 changes: 30 additions & 1 deletion src/CoreTests/MartenServiceCollectionExtensionsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using Marten.Testing.Harness;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Npgsql;
using Shouldly;
using Weasel.Core.Migrations;
using Xunit;
Expand Down Expand Up @@ -138,7 +139,7 @@ public void no_error_if_IHostEnvironment_does_not_exist()
var rules = store.Options.CreateGenerationRules();
rules.ApplicationAssembly.ShouldBe(store.Options.ApplicationAssembly);
}

[Fact]
public async Task apply_changes_on_startup()
{
Expand Down Expand Up @@ -305,6 +306,34 @@ public async Task use_npgsql_data_source()
Call(session).ShouldNotThrow();
}

[Fact]
public async Task use_npgsql_multi_host_data_source()
{
var services = new ServiceCollection();

#region sample_using_UseNpgsqlDataSourceMultiHost

services.AddMultiHostNpgsqlDataSource(ConnectionSource.ConnectionString);

services.AddMarten(x =>
{
// Will prefer standby nodes for querying.
x.Advanced.MultiHostSettings.ReadSessionPreference = TargetSessionAttributes.PreferStandby;
})
.UseLightweightSessions()
.UseNpgsqlDataSource();

#endregion

var serviceProvider = services.BuildServiceProvider();

await using var session = serviceProvider.GetService<IDocumentSession>();
Func<bool> Call(IDocumentSession s) => () => s.Query<Target>().Any();
Call(session).ShouldNotThrow();
}



#if NET8_0
[Fact]
public async Task use_npgsql_data_source_with_keyed_registration()
Expand Down
8 changes: 4 additions & 4 deletions src/Marten/Internal/Sessions/AutoClosingLifetime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public DbDataReader ExecuteReader(NpgsqlCommand command)
Logger.OnBeforeExecute(command);

// Do NOT use a using block here because we're returning the reader
var conn = _database.CreateConnection();
var conn = _database.CreateConnection(ConnectionUsage.Read);
conn.Open();

try
Expand All @@ -125,7 +125,7 @@ public async Task<DbDataReader> ExecuteReaderAsync(NpgsqlCommand command, Cancel
Logger.OnBeforeExecute(command);

// Do NOT use a using block here because we're returning the reader
var conn = _database.CreateConnection();
var conn = _database.CreateConnection(ConnectionUsage.Read);
await conn.OpenAsync(token).ConfigureAwait(false);

try
Expand All @@ -149,7 +149,7 @@ public DbDataReader ExecuteReader(NpgsqlBatch batch)
Logger.OnBeforeExecute(batch);

// Do NOT use a using block here because we're returning the reader
var conn = _database.CreateConnection();
var conn = _database.CreateConnection(ConnectionUsage.Read);
conn.Open();

try
Expand All @@ -172,7 +172,7 @@ public async Task<DbDataReader> ExecuteReaderAsync(NpgsqlBatch batch, Cancellati
Logger.OnBeforeExecute(batch);

// Do NOT use a using block here because we're returning the reader
var conn = _database.CreateConnection();
var conn = _database.CreateConnection(ConnectionUsage.Read);
await conn.OpenAsync(token).ConfigureAwait(false);

try
Expand Down
7 changes: 7 additions & 0 deletions src/Marten/Storage/IMartenDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ Task<IReadOnlyList<ShardState>> AllProjectionProgress(
Task<long> ProjectionProgressFor(ShardName name,
CancellationToken token = default);

NpgsqlConnection CreateConnection(ConnectionUsage connectionUsage = ConnectionUsage.ReadWrite);

/// <summary>
/// Find the position of the event store sequence just below the supplied timestamp. Will
Expand All @@ -119,3 +120,9 @@ Task<long> ProjectionProgressFor(ShardName name,
/// <returns></returns>
Task<long?> FindEventStoreFloorAtTimeAsync(DateTimeOffset timestamp, CancellationToken token);
}

public enum ConnectionUsage
{
Read,
ReadWrite
}
12 changes: 12 additions & 0 deletions src/Marten/Storage/MartenDatabase.Execution.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Threading;
using System.Threading.Tasks;
using Marten.Services;
using Npgsql;

namespace Marten.Storage;

Expand Down Expand Up @@ -67,4 +68,15 @@ public async Task SingleCommit(DbCommand command, CancellationToken cancellation

await command.ExecuteNonQueryAsync(cancellation).ConfigureAwait(false);
}

public NpgsqlConnection CreateConnection(ConnectionUsage connectionUsage = ConnectionUsage.ReadWrite)
{
if (connectionUsage == ConnectionUsage.Read)
{
return CreateConnection(Options.Advanced.MultiHostSettings.ReadSessionPreference);
}

return CreateConnection(Options.Advanced.MultiHostSettings.WriteSessionPreference);
}

}
5 changes: 5 additions & 0 deletions src/Marten/Storage/StandinDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,11 @@ public Task<long> ProjectionProgressFor(ShardName name, CancellationToken token
throw new NotImplementedException();
}

public NpgsqlConnection CreateConnection(ConnectionUsage connectionUsage = ConnectionUsage.ReadWrite)
{
throw new NotImplementedException();
}

public void Dispose()
{
((IDisposable)Tracker)?.Dispose();
Expand Down
19 changes: 19 additions & 0 deletions src/Marten/StoreOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,20 @@ public interface IReadOnlyAdvancedOptions
/// Option to enable or disable usage of default tenant when using multi-tenanted documents
/// </summary>
bool DefaultTenantUsageEnabled { get; }

}

public sealed class MultiHostSettings
{
/// <summary>
/// Sets the target session attributes for read-only sessions. Defaults to <see cref="TargetSessionAttributes.Primary"/>
/// </summary>
public TargetSessionAttributes ReadSessionPreference { get; set; } = TargetSessionAttributes.Primary;

/// <summary>
/// Sets the target session attributes for write sessions. Defaults to <see cref="TargetSessionAttributes.Primary"/>
/// </summary>
public TargetSessionAttributes WriteSessionPreference { get; set; } = TargetSessionAttributes.Primary;
}

public class AdvancedOptions: IReadOnlyAdvancedOptions
Expand Down Expand Up @@ -1010,6 +1024,11 @@ public void ModifySerializer(Action<ISerializer> configure)
/// </summary>
public PostgresqlMigrator Migrator { get; } = new();

/// <summary>
/// Configuration options when using a <see cref="NpgsqlMultiHostDataSource"/>
/// </summary>
public MultiHostSettings MultiHostSettings { get; } = new();

/// <summary>
/// Decides if `timestamp without time zone` database type should be used for `DateTime` DuplicatedField.
/// </summary>
Expand Down
2 changes: 2 additions & 0 deletions src/MultiHostTests/00_init.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
CREATE USER replicator WITH REPLICATION ENCRYPTED PASSWORD 'replicator_password';
SELECT pg_create_physical_replication_slot('replication_slot');
109 changes: 109 additions & 0 deletions src/MultiHostTests/MultiHostConfigurationContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
using Marten;
using Marten.Internal.CodeGeneration;
using Marten.Testing.Harness;
using Npgsql;
using Weasel.Core;
using Weasel.Postgresql;

namespace MultiHostTests
{
/// <summary>
/// Use this if the tests in a fixture are going to use
/// all custom StoreOptions configuration
/// </summary>
[Collection("OneOffs")]
public abstract class MultiHostConfigurationContext: IDisposable
{
protected readonly string _schemaName;
private DocumentStore _store;
private IDocumentSession _session;
protected readonly IList<IDisposable> _disposables = new List<IDisposable>();
private readonly string ConnectionString = "Host=localhost:5440,localhost:5441;Database=marten_testing;Username=user;password=password;Command Timeout=5";

public string SchemaName => _schemaName;

protected MultiHostConfigurationContext()
{
_schemaName = GetType().Name.ToLower().Sanitize();
}

public IList<IDisposable> Disposables => _disposables;

protected DocumentStore StoreOptions(Action<StoreOptions> configure, bool cleanAll = true)
{
var options = new StoreOptions();
var host = new NpgsqlDataSourceBuilder(ConnectionString).BuildMultiHost();
options.Connection(host);

options.Advanced.MultiHostSettings.ReadSessionPreference = TargetSessionAttributes.Standby;
options.Advanced.MultiHostSettings.WriteSessionPreference = TargetSessionAttributes.Primary;

// Can be overridden
options.AutoCreateSchemaObjects = AutoCreate.All;
options.NameDataLength = 100;
options.DatabaseSchemaName = _schemaName;

configure(options);

if (cleanAll)
{
using var conn = host.CreateConnection(TargetSessionAttributes.Primary);
conn.Open();
conn.CreateCommand($"drop schema if exists {_schemaName} cascade")
.ExecuteNonQuery();
}

_store = new DocumentStore(options);

_disposables.Add(_store);
_disposables.Add(host);

return _store;
}

protected DocumentStore theStore
{
get
{
if (_store == null)
{
StoreOptions(_ => { });
}

return _store;
}
set
{
_store = value;
}
}

protected IDocumentSession theSession
{
get
{
if (_session != null)
return _session;

_session = theStore.LightweightSession();
_disposables.Add(_session);

return _session;
}
}

public void Dispose()
{
foreach (var disposable in _disposables)
{
disposable.Dispose();
}
}

protected Task AppendEvent(Guid streamId, params object[] events)
{
theSession.Events.Append(streamId, events);
return theSession.SaveChangesAsync();
}
}
}
30 changes: 30 additions & 0 deletions src/MultiHostTests/MultiHostTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using Npgsql;

namespace MultiHostTests;

public class MultiHostTests : MultiHostConfigurationContext
{

[Fact]
public async Task QueryHitsReplicaWhenConfigured()
{
var result = await theSession.QueryAsync<bool>("SELECT pg_is_in_recovery();");
var isReplica = result[0];

Assert.True(isReplica);
}

[Fact]
public async Task QueryHitsPrimaryWhenConfigured()
{
StoreOptions(x =>
{
x.Advanced.MultiHostSettings.ReadSessionPreference = TargetSessionAttributes.Primary;
});

var result = await theSession.QueryAsync<bool>("SELECT pg_is_in_recovery();");
var isReplica = result[0];

Assert.False(isReplica);
}
}
Loading

0 comments on commit c4f8294

Please sign in to comment.