Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/31 readcommitedlock #39

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@

- Target Framework supported: netstandard2.0, net462
- Updated System.Data.SqlClient 4.8.5
- Added Azure SQL support to MsSqlDialect [#31](https://github.com/NEventStore/NEventStore.Persistence.SQL/issues/31)
- Fix: NEventStore constraint failed with MySql 8.x (works with 5.7) [#487](https://github.com/NEventStore/NEventStore/issues/487)

### Breaking Change

- The fix for [#487](https://github.com/NEventStore/NEventStore/issues/487) changed how the `Commits` table is created for MySql 8.x:
to update an existing database in order to run on 8.x you need to manually update the `Commits` table schema and change the constraint of the `CommitId` column
from: `CommitId binary(16) NOT NULL CHECK (CommitId != 0)` to: `CommitId binary(16) NOT NULL CHECK (CommitId <> 0x00)`.
- MsSqlDialects now have an `useAzureSql` parameter, if set to `true` the statement `WITH (READCOMMITTEDLOCK)` will be added to any `FROM Commits` query.
Copy link

@fschmied fschmied Jul 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could even be made a default. It does no harm outside of Azure SQL. In fact, it will also make NEventStore more robust with local SQL Server, too, when someone enabled the READ COMMITTED SNAPSHOT option on the database.


## 9.0.1

Expand Down
Original file line number Diff line number Diff line change
@@ -1,36 +1,36 @@
using NEventStore.Persistence.Sql.Tests;

namespace NEventStore.Persistence.AcceptanceTests
{
namespace NEventStore.Persistence.AcceptanceTests {
using NEventStore.Persistence.Sql;
using NEventStore.Persistence.Sql.SqlDialects;
using NEventStore.Serialization;
using System.Data.SqlClient;
using System.Transactions;

public partial class PersistenceEngineFixture
{
public partial class PersistenceEngineFixture {
public ISqlDialect SqlDialect { get; set; } = new MsSqlDialect();

/// <summary>
/// this mimic the current NEventStore default values which is run outside any transaction (creates a scope that
/// suppresses any transaction)
/// </summary>
public TransactionScopeOption? ScopeOption { get; set; } = null; // the old default: TransactionScopeOption.Suppress;
public TransactionScopeOption? ScopeOption { get; set; } // the old default: TransactionScopeOption.Suppress;

public PersistenceEngineFixture()
{
#if NET462
_createPersistence = pageSize =>
new SqlPersistenceFactory(new EnviromentConnectionFactory("MsSql", "System.Data.SqlClient"),
new BinarySerializer(),
new MsSqlDialect(),
SqlDialect,
pageSize: pageSize,
scopeOption: ScopeOption
).Build();
#else
_createPersistence = pageSize =>
new SqlPersistenceFactory(new EnviromentConnectionFactory("MsSql", SqlClientFactory.Instance),
new BinarySerializer(),
new MsSqlDialect(),
SqlDialect,
pageSize: pageSize,
scopeOption: ScopeOption
).Build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ namespace NEventStore.Persistence.AcceptanceTests
using Xunit.Should;
#endif

// The following tests actually works well only for MsSqlServer
// we'll need some refactoring to have initialization work
// for different of kinds of databases.

public enum TransactionScopeConcern
{
NoTransaction = 0,
Expand Down Expand Up @@ -319,11 +323,11 @@ public class Unsupported_Multiple_Completing_TransactionScopes_When_EnlistInAmbi
public Unsupported_Multiple_Completing_TransactionScopes_When_EnlistInAmbientTransaction_is_and_IsolationLevel_is(
TransactionScopeConcern enlistInAmbientTransaction,
IsolationLevel transationIsolationLevel
) : base(enlistInAmbientTransaction, transationIsolationLevel, true)
) : base(enlistInAmbientTransaction, transationIsolationLevel, completeTransaction: true)
{ }

[Fact]
public void should_throw_an_Exception_only_if_no_transaction_or_enlist_in_ambient_transaction_and_IsolationLevel_is_Serializable()
public void Should_throw_an_Exception_only_if_no_transaction_or_enlist_in_ambient_transaction_and_IsolationLevel_is_Serializable()
{
_thrown.Should().BeOfType<AggregateException>();
_thrown.InnerException.Should().BeOfType<StorageException>();
Expand Down
105 changes: 56 additions & 49 deletions src/NEventStore.Persistence.Sql/SqlDialects/MsSqlDialect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,78 +2,77 @@
using System.Transactions;
using IsolationLevel = System.Data.IsolationLevel;

namespace NEventStore.Persistence.Sql.SqlDialects
{
namespace NEventStore.Persistence.Sql.SqlDialects {
using System;
using System.Data.SqlClient;

public class MsSqlDialect : CommonSqlDialect
{
public class MsSqlDialect : CommonSqlDialect {
private const int UniqueIndexViolation = 2601;
private const int UniqueKeyViolation = 2627;

public override string InitializeStorage
{
/// <summary>
/// Add "WITH (READCOMMITTEDLOCK)" hint to any "FROM Commits" clause
/// (#31) Make MsSqlDialect compatible with AzureSql and READ COMMITTED SNAPSHOT
/// </summary>
private readonly bool _addReadCommittedLockToFromCommits;

/// <summary>
/// Constructor
/// </summary>
/// <param name="addReadCommittedLockToFromCommits">Add "WITH (READCOMMITTEDLOCK)" hint to any "FROM Commits" clause, can have an impact on multiple transactions scenarios</param>
public MsSqlDialect(bool addReadCommittedLockToFromCommits = false) {
_addReadCommittedLockToFromCommits = addReadCommittedLockToFromCommits;
}

public override string InitializeStorage {
get { return MsSqlStatements.InitializeStorage; }
}

public override string GetSnapshot
{
public override string GetSnapshot {
get { return "SET ROWCOUNT 1;\n" + base.GetSnapshot.Replace("LIMIT 1;", ";"); }
}

public override string GetCommitsFromStartingRevision
{
get { return NaturalPaging(base.GetCommitsFromStartingRevision); }
public override string GetCommitsFromStartingRevision {
get { return AddReadCommittedLockToFromCommits(NaturalPaging(base.GetCommitsFromStartingRevision)); }
}

public override string GetCommitsFromInstant
{
get { return CommonTableExpressionPaging(base.GetCommitsFromInstant); }
public override string GetCommitsFromInstant {
get { return AddReadCommittedLockToFromCommits(CommonTableExpressionPaging(base.GetCommitsFromInstant)); }
}

public override string GetCommitsFromToInstant
{
get { return CommonTableExpressionPaging(base.GetCommitsFromToInstant); }
public override string GetCommitsFromToInstant {
get { return AddReadCommittedLockToFromCommits(CommonTableExpressionPaging(base.GetCommitsFromToInstant)); }
}

public override string PersistCommit
{
public override string PersistCommit {
get { return MsSqlStatements.PersistCommits; }
}

public override string GetCommitsFromCheckpoint
{
get { return CommonTableExpressionPaging(base.GetCommitsFromCheckpoint); }
public override string GetCommitsFromCheckpoint {
get { return AddReadCommittedLockToFromCommits(CommonTableExpressionPaging(base.GetCommitsFromCheckpoint)); }
}

public override string GetCommitsFromToCheckpoint
{
get { return CommonTableExpressionPaging(base.GetCommitsFromToCheckpoint); }
public override string GetCommitsFromToCheckpoint {
get { return AddReadCommittedLockToFromCommits(CommonTableExpressionPaging(base.GetCommitsFromToCheckpoint)); }
}

public override string GetCommitsFromBucketAndCheckpoint
{
get { return CommonTableExpressionPaging(base.GetCommitsFromBucketAndCheckpoint); }
public override string GetCommitsFromBucketAndCheckpoint {
get { return AddReadCommittedLockToFromCommits(CommonTableExpressionPaging(base.GetCommitsFromBucketAndCheckpoint)); }
}

public override string GetCommitsFromToBucketAndCheckpoint
{
get { return CommonTableExpressionPaging(base.GetCommitsFromToBucketAndCheckpoint); }
public override string GetCommitsFromToBucketAndCheckpoint {
get { return AddReadCommittedLockToFromCommits(CommonTableExpressionPaging(base.GetCommitsFromToBucketAndCheckpoint)); }
}

public override string GetStreamsRequiringSnapshots
{
public override string GetStreamsRequiringSnapshots {
get { return NaturalPaging(base.GetStreamsRequiringSnapshots); }
}

private static string NaturalPaging(string query)
{
private static string NaturalPaging(string query) {
return "SET ROWCOUNT @Limit;\n" + RemovePaging(query);
}

private static string CommonTableExpressionPaging(string query)
{
private static string CommonTableExpressionPaging(string query) {
query = RemovePaging(query);
int orderByIndex = query.IndexOf("ORDER BY");
string orderBy = query.Substring(orderByIndex).Replace(";", string.Empty);
Expand All @@ -83,37 +82,45 @@ private static string CommonTableExpressionPaging(string query)
string from = query.Substring(fromIndex);
string select = query.Substring(0, fromIndex);

string value = MsSqlStatements.PagedQueryFormat.FormatWith(select, orderBy, from);
return value;
return MsSqlStatements.PagedQueryFormat.FormatWith(select, orderBy, from);
}

private static string RemovePaging(string query)
{
private static string RemovePaging(string query) {
return query
.Replace("\n LIMIT @Limit OFFSET @Skip;", ";")
.Replace("\n LIMIT @Limit;", ";");
}

public override bool IsDuplicate(Exception exception)
{
public override bool IsDuplicate(Exception exception) {
var dbException = exception as SqlException;
return dbException != null
&& (dbException.Number == UniqueIndexViolation || dbException.Number == UniqueKeyViolation);
}

public override IDbTransaction OpenTransaction(IDbConnection connection)
{
public override IDbTransaction OpenTransaction(IDbConnection connection) {
if (Transaction.Current == null)
return connection.BeginTransaction(IsolationLevel.ReadCommitted);

return base.OpenTransaction(connection);
}

/// <summary>
/// (#31) Add 'WITH (READCOMMITTEDLOCK)' to all 'FROM Commits' statements
/// </summary>
/// <param name="query"></param>
private string AddReadCommittedLockToFromCommits(string query) {
if (!_addReadCommittedLockToFromCommits) {
return query;
}
return query.Replace("FROM Commits", "FROM Commits WITH (READCOMMITTEDLOCK)");
}
}

public class MsSql2005Dialect : MsSqlDialect
{
public override DbType GetDateTimeDbType()
{
public class MsSql2005Dialect : MsSqlDialect {
public MsSql2005Dialect(bool addReadCommittedLockToFromCommits = false) : base(addReadCommittedLockToFromCommits) {
}

public override DbType GetDateTimeDbType() {
return DbType.DateTime;
}
}
Expand Down