Skip to content

Commit

Permalink
Fix npgsql parameter boxing within codegen
Browse files Browse the repository at this point in the history
  • Loading branch information
Hawxy committed Jul 24, 2024
1 parent fe9eed1 commit 8af818b
Show file tree
Hide file tree
Showing 23 changed files with 211 additions and 179 deletions.
13 changes: 13 additions & 0 deletions src/Marten/Events/CodeGeneration/CodeGenerationExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
using System;
using System.Linq;
using System.Reflection;
using JasperFx.CodeGeneration;
using JasperFx.CodeGeneration.Frames;
using JasperFx.Core.Reflection;
using Marten.Internal;
using Weasel.Postgresql;

namespace Marten.Events.CodeGeneration;

Expand Down Expand Up @@ -42,4 +45,14 @@ public static Type GetEventType(this MethodInfo method, Type aggregateType)

return parameterInfo.ParameterType;
}

public static void AppendSql(this FramesCollection collection, string sql)
{
collection.Code($"{{0}}.{nameof(CommandBuilder.Append)}(\"{sql}\");", Use.Type<ICommandBuilder>());
}

public static void AppendSql(this FramesCollection collection, char sql)
{
collection.Code($"{{0}}.{nameof(CommandBuilder.Append)}('{sql}');", Use.Type<ICommandBuilder>());
}
}
117 changes: 47 additions & 70 deletions src/Marten/Events/CodeGeneration/EventDocumentStorageGenerator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,23 +109,20 @@ private static GeneratedType buildUpdateStreamVersion(GeneratedType builderType,
{
var operationType = assembly.AddType(UpdateStreamVersionOperationName, typeof(UpdateStreamVersion));

var sql = $"update {graph.DatabaseSchemaName}.mt_streams set version = ? where id = ? and version = ?";
if (graph.TenancyStyle == TenancyStyle.Conjoined)
{
sql += $" and {TenantIdColumn.Name} = ?";
}

var setter = operationType.AddStringConstant("SQL", sql);
var sql = $"update {graph.DatabaseSchemaName}.mt_streams ";

var configureCommand = operationType.MethodFor("ConfigureCommand");
configureCommand.DerivedVariables.Add(
new Variable(typeof(StreamAction), nameof(UpdateStreamVersion.Stream)));

configureCommand.Frames.Code($"var parameters = {{0}}.{nameof(CommandBuilder.AppendWithParameters)}(SQL);",
Use.Type<ICommandBuilder>());
configureCommand.Frames.AppendSql(sql);

configureCommand.Frames.Code($"var parameterBuilder = {{0}}.{nameof(CommandBuilder.CreateGroupedParameterBuilder)}();", Use.Type<ICommandBuilder>());

configureCommand.Frames.AppendSql("set version = ");
configureCommand.SetParameterFromMember<StreamAction>(0, x => x.Version);

configureCommand.Frames.AppendSql("where id = ");
if (graph.StreamIdentity == StreamIdentity.AsGuid)
{
configureCommand.SetParameterFromMember<StreamAction>(1, x => x.Id);
Expand All @@ -135,10 +132,12 @@ private static GeneratedType buildUpdateStreamVersion(GeneratedType builderType,
configureCommand.SetParameterFromMember<StreamAction>(1, x => x.Key);
}

configureCommand.Frames.AppendSql("and version = ");
configureCommand.SetParameterFromMember<StreamAction>(2, x => x.ExpectedVersionOnServer);

if (graph.TenancyStyle == TenancyStyle.Conjoined)
{
configureCommand.Frames.AppendSql($" and {TenantIdColumn.Name} = ");
new TenantIdColumn().As<IStreamTableColumn>().GenerateAppendCode(configureCommand, 3);
}

Expand Down Expand Up @@ -231,31 +230,26 @@ private static GeneratedType buildStreamQueryHandlerType(EventGraph graph, Gener
private static void buildConfigureCommandMethodForStreamState(EventGraph graph,
GeneratedType streamQueryHandlerType)
{
var sql =
$"select id, version, type, timestamp, created as timestamp, is_archived from {graph.DatabaseSchemaName}.mt_streams where id = ?";
if (graph.TenancyStyle == TenancyStyle.Conjoined)
{
streamQueryHandlerType.AllInjectedFields.Add(new InjectedField(typeof(string), "tenantId"));
sql += $" and {TenantIdColumn.Name} = ?";
}

var setter = streamQueryHandlerType.AddStringConstant("SQL", sql);

var configureCommand = streamQueryHandlerType.MethodFor("ConfigureCommand");
configureCommand.Frames.Call<ICommandBuilder>(x => x.AppendWithParameters(""), call =>
{
call.Arguments[0] = setter;
call.ReturnAction = ReturnAction.Initialize;
});
var sql =
$"select id, version, type, timestamp, created as timestamp, is_archived from {graph.DatabaseSchemaName}.mt_streams where id = ";

configureCommand.Frames.AppendSql(sql);

var idDbType = graph.StreamIdentity == StreamIdentity.AsGuid ? DbType.Guid : DbType.String;
configureCommand.Frames.Code("{0}[0].Value = _streamId;", Use.Type<NpgsqlParameter[]>());
configureCommand.Frames.Code("{0}[0].DbType = {1};", Use.Type<NpgsqlParameter[]>(), idDbType);
configureCommand.Frames.Code($"var parameter1 = builder.{nameof(CommandBuilder.AppendParameter)}(_streamId);");
configureCommand.Frames.Code("parameter1.DbType = {0};", idDbType);

if (graph.TenancyStyle == TenancyStyle.Conjoined)
{
configureCommand.Frames.Code("{0}[1].Value = _tenantId;", Use.Type<NpgsqlParameter[]>());
configureCommand.Frames.Code("{0}[1].DbType = {1};", Use.Type<NpgsqlParameter[]>(), DbType.String);
configureCommand.Frames.AppendSql($" and {TenantIdColumn.Name} = ");
configureCommand.Frames.Code($"var parameter2 = builder.{nameof(CommandBuilder.AppendParameter)}(_tenantId);");
configureCommand.Frames.Code("parameter2.DbType = {0};", DbType.String);
}
}

Expand Down Expand Up @@ -286,18 +280,22 @@ private static GeneratedType buildAppendEventOperation(EventGraph graph, Generat
columns.Add(sequence);

var sql =
$"insert into {graph.DatabaseSchemaName}.mt_events ({columns.Select(x => x.Name).Join(", ")}) values ({columns.Select(c => c.ValueSql(graph, mode)).Join(", ")})";
$"insert into {graph.DatabaseSchemaName}.mt_events ({columns.Select(x => x.Name).Join(", ")}) values (";

operationType.AddStringConstant("SQL", sql);
configure.Frames.AppendSql(sql);

configure.Frames.Code($"var parameters = {{0}}.{nameof(CommandBuilder.AppendWithParameters)}(SQL);",
Use.Type<ICommandBuilder>());
configure.Frames.Code($"var parameterBuilder = {{0}}.{nameof(CommandBuilder.CreateGroupedParameterBuilder)}(',');", Use.Type<ICommandBuilder>());

for (var i = 0; i < columns.Count; i++)
{
columns[i].GenerateAppendCode(configure, graph, i, mode);
var valueSql = columns[i].ValueSql(graph, mode);
if (valueSql != "?")
configure.Frames.AppendSql($"{(i > 0 ? "," : string.Empty)}{valueSql}");
}

configure.Frames.AppendSql(')');

return operationType;
}

Expand All @@ -306,68 +304,44 @@ private static GeneratedType buildQuickAppendOperation(EventGraph graph, Generat
var operationType = assembly.AddType("QuickAppendEventsOperation", typeof(QuickAppendEventsOperationBase));

var table = new EventsTable(graph);
var parameterList = "";


var index = 6;
int causationIndex = 0;
int correlationIndex = 0;
int headerIndex = 0;
if (table.Columns.OfType<CausationIdColumn>().Any())
{
parameterList += ", ?";
causationIndex = ++index;
}

if (table.Columns.OfType<CorrelationIdColumn>().Any())
{
parameterList += ", ?";
correlationIndex = ++index;
}

if (table.Columns.OfType<HeadersColumn>().Any())
{
parameterList += ", ?";
headerIndex = ++index;
}

var sql =
$"select {graph.DatabaseSchemaName}.mt_quick_append_events(?, ?, ?, ?, ?, ?, ?{parameterList})";

operationType.AddStringConstant("SQL", sql);
var sql = $"select {graph.DatabaseSchemaName}.mt_quick_append_events(";

var configure = operationType.MethodFor(nameof(QuickAppendEventsOperationBase.ConfigureCommand));
configure.DerivedVariables.Add(new Variable(typeof(StreamAction), nameof(QuickAppendEventsOperationBase.Stream)));

configure.Frames.Code($"var parameters = {{0}}.{nameof(CommandBuilder.AppendWithParameters)}(SQL);",
Use.Type<ICommandBuilder>());
configure.Frames.AppendSql(sql);

configure.Frames.Code($"var parameterBuilder = {{0}}.{nameof(CommandBuilder.CreateGroupedParameterBuilder)}(',');", Use.Type<ICommandBuilder>());

if (graph.StreamIdentity == StreamIdentity.AsGuid)
{
configure.Frames.Code("writeId(parameters);");
configure.Frames.Code("writeId(parameterBuilder);");
}
else
{
configure.Frames.Code("writeKey(parameters);");
configure.Frames.Code("writeKey(parameterBuilder);");
}

configure.Frames.Code("writeBasicParameters(parameters, session);");
configure.Frames.Code("writeBasicParameters(parameterBuilder, session);");

if (causationIndex > 0)
if (table.Columns.OfType<CausationIdColumn>().Any())
{
configure.Frames.Code($"writeCausationIds({causationIndex}, parameters);");
configure.Frames.Code("writeCausationIds(parameterBuilder);");
}

if (correlationIndex > 0)
if (table.Columns.OfType<CorrelationIdColumn>().Any())
{
configure.Frames.Code($"writeCorrelationIds({correlationIndex}, parameters);");
configure.Frames.Code("writeCorrelationIds(parameterBuilder);");
}

if (headerIndex > 0)
if (table.Columns.OfType<HeadersColumn>().Any())
{
configure.Frames.Code($"writeHeaders({headerIndex}, parameters, session);");
configure.Frames.Code("writeHeaders(parameterBuilder, session);");
}

configure.Frames.AppendSql(')');

return operationType;
}

Expand All @@ -383,20 +357,23 @@ private static GeneratedType buildInsertStream(GeneratedType builderType, Genera
.ToArray();

var sql =
$"insert into {graph.DatabaseSchemaName}.mt_streams ({columns.Select(x => x.Name).Join(", ")}) values ({columns.Select(_ => "?").Join(", ")})";
operationType.AddStringConstant("SQL", sql);
$"insert into {graph.DatabaseSchemaName}.mt_streams ({columns.Select(x => x.Name).Join(", ")}) values (";


var configureCommand = operationType.MethodFor("ConfigureCommand");
configureCommand.DerivedVariables.Add(new Variable(typeof(StreamAction), nameof(InsertStreamBase.Stream)));

configureCommand.Frames.Code($"var parameters = {{0}}.{nameof(CommandBuilder.AppendWithParameters)}(SQL);",
Use.Type<ICommandBuilder>());
configureCommand.Frames.AppendSql(sql);

configureCommand.Frames.Code($"var parameterBuilder = {{0}}.{nameof(CommandBuilder.CreateGroupedParameterBuilder)}(',');", Use.Type<ICommandBuilder>());

for (var i = 0; i < columns.Length; i++)
{
columns[i].GenerateAppendCode(configureCommand, i);
}

configureCommand.Frames.AppendSql(')');

builderType.MethodFor(nameof(EventDocumentStorage.InsertStream))
.Frames.ReturnNewGeneratedTypeObject(operationType, "stream");

Expand Down
7 changes: 4 additions & 3 deletions src/Marten/Events/Daemon/Internals/EventTypeFilter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ public EventTypeFilter(EventGraph graph, IReadOnlyCollection<Type> eventTypes)

public void Apply(ICommandBuilder builder)
{
var parameters = builder.AppendWithParameters("d.type = ANY(?)");
parameters[0].NpgsqlDbType = NpgsqlDbType.Varchar | NpgsqlDbType.Array;
parameters[0].Value = _typeNames;
builder.Append("d.type = ANY(");
var parameter = builder.AppendParameter(_typeNames);
parameter.NpgsqlDbType = NpgsqlDbType.Varchar | NpgsqlDbType.Array;
builder.Append(')');
}

}
61 changes: 33 additions & 28 deletions src/Marten/Events/Operations/QuickAppendEventsOperationBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,50 +52,55 @@ public void Postprocess(DbDataReader reader, IList<Exception> exceptions)
}
}

protected void writeId(NpgsqlParameter[] parameters)
protected void writeId(IGroupedParameterBuilder builder)
{
parameters[0].NpgsqlDbType = NpgsqlDbType.Uuid;
parameters[0].Value = Stream.Id;
var param = builder.AppendParameter(Stream.Id);
param.NpgsqlDbType = NpgsqlDbType.Uuid;
}

protected void writeKey(NpgsqlParameter[] parameters)
protected void writeKey(IGroupedParameterBuilder builder)
{
parameters[0].NpgsqlDbType = NpgsqlDbType.Varchar;
parameters[0].Value = Stream.Key;
var param = builder.AppendParameter(Stream.Key);
param.NpgsqlDbType = NpgsqlDbType.Varchar;
}

protected void writeBasicParameters(NpgsqlParameter[] parameters, IMartenSession session)
protected void writeBasicParameters(IGroupedParameterBuilder builder, IMartenSession session)
{
parameters[1].NpgsqlDbType = NpgsqlDbType.Varchar;
parameters[1].Value = Stream.AggregateTypeName.IsEmpty() ? DBNull.Value : Stream.AggregateTypeName;
parameters[2].NpgsqlDbType = NpgsqlDbType.Varchar;
parameters[2].Value = Stream.TenantId;
parameters[3].NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Uuid;
parameters[3].Value = Stream.Events.Select(x => x.Id).ToArray();
parameters[4].NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Varchar;
parameters[4].Value = Stream.Events.Select(x => x.EventTypeName).ToArray();
parameters[5].NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Varchar;
parameters[5].Value = Stream.Events.Select(x => x.DotNetTypeName).ToArray();
parameters[6].NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Jsonb;
parameters[6].Value = Stream.Events.Select(e => session.Serializer.ToJson(e.Data)).ToArray();
var param1 = Stream.AggregateTypeName.IsEmpty() ? builder.AppendParameter<object>(DBNull.Value) : builder.AppendParameter(Stream.AggregateTypeName);
param1.NpgsqlDbType = NpgsqlDbType.Varchar;

var param2 = builder.AppendParameter(Stream.TenantId);
param2.NpgsqlDbType = NpgsqlDbType.Varchar;

var param3 = builder.AppendParameter(Stream.Events.Select(x => x.Id).ToArray());
param3.NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Uuid;

var param4 = builder.AppendParameter(Stream.Events.Select(x => x.EventTypeName).ToArray());
param4.NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Varchar;

var param5 = builder.AppendParameter(Stream.Events.Select(x => x.DotNetTypeName).ToArray());
param5.NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Varchar;

var param6 = builder.AppendParameter(Stream.Events.Select(e => session.Serializer.ToJson(e.Data)).ToArray());
param6.NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Jsonb;
}

protected void writeCausationIds(int index, NpgsqlParameter[] parameters)
protected void writeCausationIds(IGroupedParameterBuilder builder)
{
parameters[index].NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Varchar;
parameters[index].Value = Stream.Events.Select(x => x.CausationId).ToArray();
var param = builder.AppendParameter(Stream.Events.Select(x => x.CausationId).ToArray());
param.NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Varchar;
}

protected void writeCorrelationIds(int index, NpgsqlParameter[] parameters)
protected void writeCorrelationIds(IGroupedParameterBuilder builder)
{
parameters[index].NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Varchar;
parameters[index].Value = Stream.Events.Select(x => x.CorrelationId).ToArray();
var param = builder.AppendParameter(Stream.Events.Select(x => x.CorrelationId).ToArray());
param.NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Varchar;
}

protected void writeHeaders(int index, NpgsqlParameter[] parameters, IMartenSession session)
protected void writeHeaders(IGroupedParameterBuilder builder, IMartenSession session)
{
parameters[index].NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Jsonb;
parameters[index].Value = Stream.Events.Select(x => session.Serializer.ToJson(x.Headers)).ToArray();
var param = builder.AppendParameter(Stream.Events.Select(x => session.Serializer.ToJson(x.Headers)).ToArray());
param.NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Jsonb;
}

public async Task PostprocessAsync(DbDataReader reader, IList<Exception> exceptions, CancellationToken token)
Expand Down
8 changes: 5 additions & 3 deletions src/Marten/Events/Schema/EventJsonDataColumn.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using JasperFx.CodeGeneration.Frames;
using Marten.Internal;
using NpgsqlTypes;
using Weasel.Postgresql;
using Weasel.Postgresql.Tables;

namespace Marten.Events.Schema;
Expand All @@ -26,9 +27,10 @@ public void GenerateSelectorCodeAsync(GeneratedMethod method, EventGraph graph,

public void GenerateAppendCode(GeneratedMethod method, EventGraph graph, int index, AppendMode full)
{
method.Frames.Code($"parameters[{index}].NpgsqlDbType = {{0}};", NpgsqlDbType.Jsonb);
method.Frames.Code($"parameters[{index}].Value = {{0}}.Serializer.ToJson({{1}}.{nameof(IEvent.Data)});",
Use.Type<IMartenSession>(), Use.Type<IEvent>());
method.Frames.Code($"var parameter{index} = parameterBuilder.{nameof(IGroupedParameterBuilder.AppendParameter)}({{0}}.Serializer.ToJson({{1}}.{nameof(IEvent.Data)}));",
Use.Type<IMartenSession>(), Use.Type<IEvent>());

method.Frames.Code($"parameter{index}.NpgsqlDbType = {{0}};", NpgsqlDbType.Jsonb);
}

public string ValueSql(EventGraph graph, AppendMode mode)
Expand Down
7 changes: 4 additions & 3 deletions src/Marten/Events/Schema/EventTableColumn.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ public void GenerateSelectorCodeAsync(GeneratedMethod method, EventGraph graph,

public virtual void GenerateAppendCode(GeneratedMethod method, EventGraph graph, int index, AppendMode full)
{
method.Frames.Code($"parameters[{index}].{nameof(NpgsqlParameter.NpgsqlDbType)} = {{0}};",
NpgsqlDbType);
method.Frames.Code(
$"parameters[{index}].{nameof(NpgsqlParameter.Value)} = {{0}}.{Member.Name};", Use.Type<IEvent>());
$"var parameter{index} = parameterBuilder.{nameof(IGroupedParameterBuilder.AppendParameter)}({{0}}.{Member.Name});", Use.Type<IEvent>());

method.Frames.Code($"parameter{index}.{nameof(NpgsqlParameter.NpgsqlDbType)} = {{0}};", NpgsqlDbType);

}

public virtual string ValueSql(EventGraph graph, AppendMode mode)
Expand Down
Loading

0 comments on commit 8af818b

Please sign in to comment.