Skip to content

Commit

Permalink
Initial logic to configure and add partitioning for archiving events
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremydmiller committed Jul 24, 2024
1 parent fe9eed1 commit 624360d
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 3 deletions.
6 changes: 6 additions & 0 deletions src/EventSourcingTests/EventGraphTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ public void build_event()
@event.DotNetTypeName.ShouldBe(mapping.DotNetTypeName);
}

[Fact]
public void archive_partitioning_is_off_by_default()
{
theGraph.UseArchivedStreamPartitioning.ShouldBeFalse();
}

[Fact]
public void enable_unique_index_on_event_id_is_false_by_default()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
using System.Linq;
using Marten;
using Marten.Events;
using Marten.Events.Archiving;
using Marten.Events.Schema;
using Marten.Testing.Harness;
using Shouldly;
using Weasel.Postgresql.Tables.Partitioning;
using Xunit;

namespace EventSourcingTests;

public class building_events_and_streams_table_based_on_partitioning
{
private readonly EventGraph theGraph = new EventGraph(new StoreOptions());

[Fact]
public void no_partitioning_by_default()
{
new EventsTable(theGraph).Partitioning.ShouldBeNull();
new StreamsTable(theGraph).Partitioning.ShouldBeNull();
}

[Fact]
public void events_table_build_partitioning_when_active()
{
theGraph.UseArchivedStreamPartitioning = true;

var table = new EventsTable(theGraph);
var partitioning = table.Partitioning.ShouldBeOfType<ListPartitioning>();
partitioning.Columns.Single().ShouldBe(IsArchivedColumn.ColumnName);
partitioning.Partitions.Single().ShouldBe(new ListPartition("archived", "TRUE"));

table.PrimaryKeyColumns.ShouldContain(IsArchivedColumn.ColumnName);
}

[Fact]
public void streams_table_build_partitioning_when_active()
{
theGraph.UseArchivedStreamPartitioning = true;

var table = new StreamsTable(theGraph);
var partitioning = table.Partitioning.ShouldBeOfType<ListPartitioning>();
partitioning.Columns.Single().ShouldBe(IsArchivedColumn.ColumnName);
partitioning.Partitions.Single().ShouldBe(new ListPartition("archived", "TRUE"));

table.PrimaryKeyColumns.ShouldContain(IsArchivedColumn.ColumnName);
}
}
12 changes: 12 additions & 0 deletions src/Marten.sln
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventAppenderPerfTester", "
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "StressTests", "StressTests\StressTests.csproj", "{C9D33381-3AD3-4005-B854-F04F10EA837F}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Weasel.Core", "..\..\weasel\src\Weasel.Core\Weasel.Core.csproj", "{F1524942-FA31-4A11-A5DD-EE92B7806511}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Weasel.Postgresql", "..\..\weasel\src\Weasel.Postgresql\Weasel.Postgresql.csproj", "{0042C325-5957-4F76-AC4C-B1C417AAF93D}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -282,6 +286,14 @@ Global
{C9D33381-3AD3-4005-B854-F04F10EA837F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C9D33381-3AD3-4005-B854-F04F10EA837F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C9D33381-3AD3-4005-B854-F04F10EA837F}.Release|Any CPU.Build.0 = Release|Any CPU
{F1524942-FA31-4A11-A5DD-EE92B7806511}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F1524942-FA31-4A11-A5DD-EE92B7806511}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F1524942-FA31-4A11-A5DD-EE92B7806511}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F1524942-FA31-4A11-A5DD-EE92B7806511}.Release|Any CPU.Build.0 = Release|Any CPU
{0042C325-5957-4F76-AC4C-B1C417AAF93D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{0042C325-5957-4F76-AC4C-B1C417AAF93D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{0042C325-5957-4F76-AC4C-B1C417AAF93D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{0042C325-5957-4F76-AC4C-B1C417AAF93D}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
6 changes: 6 additions & 0 deletions src/Marten/Events/EventGraph.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ internal EventGraph(StoreOptions options)
_aggregateTypeByName = new Cache<string, Type>(findAggregateType);
}

/// <summary>
/// Opt into using PostgreSQL list partitioning. This can have significant performance and scalability benefits
/// *if* you are also aggressively using event stream archiving
/// </summary>
public bool UseArchivedStreamPartitioning { get; set; }

internal NpgsqlDbType StreamIdDbType { get; private set; }

internal StoreOptions Options { get; }
Expand Down
6 changes: 6 additions & 0 deletions src/Marten/Events/IEventStoreOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ public interface IEventStoreOptions

public EventAppendMode AppendMode { get; set; }

/// <summary>
/// Opt into using PostgreSQL list partitioning. This can have significant performance and scalability benefits
/// *if* you are also aggressively using event stream archiving
/// </summary>
public bool UseArchivedStreamPartitioning { get; set; }

/// <summary>
/// Register an event type with Marten. This isn't strictly necessary for normal usage,
/// but can help Marten with asynchronous projections where Marten hasn't yet encountered
Expand Down
6 changes: 6 additions & 0 deletions src/Marten/Events/IReadOnlyEventStoreOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,10 @@ public interface IReadOnlyEventStoreOptions
/// Do not use this if you manually alter the fetched aggregate from FetchForWriting() outside of Marten
/// </summary>
bool UseIdentityMapForInlineAggregates { get; set; }

/// <summary>
/// Opt into using PostgreSQL list partitioning. This can have significant performance and scalability benefits
/// *if* you are also aggressively using event stream archiving
/// </summary>
bool UseArchivedStreamPartitioning { get; set; }
}
6 changes: 5 additions & 1 deletion src/Marten/Events/Schema/EventsTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ public EventsTable(EventGraph events): base(new PostgresqlObjectName(events.Data
});
}

AddColumn<IsArchivedColumn>();
var archiving = AddColumn<IsArchivedColumn>();
if (events.UseArchivedStreamPartitioning)
{
archiving.PartitionByListValues().AddPartition("archived", true);
}
}

internal IList<IEventTableColumn> SelectColumns()
Expand Down
6 changes: 5 additions & 1 deletion src/Marten/Events/Schema/StreamsTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ public StreamsTable(EventGraph events): base(new PostgresqlObjectName(events.Dat
AddColumn<TenantIdColumn>();
}

AddColumn<IsArchivedColumn>();
var archiving = AddColumn<IsArchivedColumn>();
if (events.UseArchivedStreamPartitioning)
{
archiving.PartitionByListValues().AddPartition("archived", true);
}
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/Marten/Marten.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
<PackageReference Include="Polly.Core" Version="8.3.1" />
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="8.0.0" />
<PackageReference Include="System.Text.Json" Version="8.0.4" />
<PackageReference Include="Weasel.Postgresql" Version="7.4.1" />
<!--<PackageReference Include="Weasel.Postgresql" Version="7.4.1" />-->
</ItemGroup>

<!--SourceLink specific settings-->
Expand All @@ -69,5 +69,8 @@
<ItemGroup>
<Folder Include="Internal\OpenTelemetry\" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\weasel\src\Weasel.Postgresql\Weasel.Postgresql.csproj" />
</ItemGroup>
<Import Project="../../Analysis.Build.props" />
</Project>

0 comments on commit 624360d

Please sign in to comment.