diff --git a/src/AxonIQ.AxonServer.Connector/OngoingQueryCollection.cs b/src/AxonIQ.AxonServer.Connector/OngoingQueryCollection.cs deleted file mode 100644 index 1f5c0bc..0000000 --- a/src/AxonIQ.AxonServer.Connector/OngoingQueryCollection.cs +++ /dev/null @@ -1,32 +0,0 @@ -namespace AxonIQ.AxonServer.Connector; - -public class OngoingQueryCollection -{ - private readonly Dictionary _queries; - - public OngoingQueryCollection() - { - _queries = new Dictionary(); - } - - public void AddQuery(InstructionId queryId, IAsyncDisposable forwarder) - { - _queries.Add(queryId, forwarder); - } - - public bool TryFlowControlRequestForQuery(InstructionId queryId, long requested) - { - if (_queries.TryGetValue(queryId, out var forwarder) && forwarder is IFlowControl flowControlled) - { - flowControlled.Request(requested); - return true; - } - - return false; - } - - public IAsyncDisposable? RemoveQuery(InstructionId queryId) - { - return _queries.Remove(queryId, out var forwarder) ? forwarder : null; - } -} \ No newline at end of file diff --git a/src/AxonIQ.AxonServer.Connector/QueryChannel.cs b/src/AxonIQ.AxonServer.Connector/QueryChannel.cs index b5775f8..afa44ec 100644 --- a/src/AxonIQ.AxonServer.Connector/QueryChannel.cs +++ b/src/AxonIQ.AxonServer.Connector/QueryChannel.cs @@ -135,7 +135,6 @@ await _actor.TellAsync( ok.Value, consumer, consumerCancellationTokenSource, - new OngoingQueryCollection(), connecting.QueryHandlers, connecting.Flow, connecting.QueryExecutions, @@ -686,7 +685,6 @@ public record Connected( AsyncDuplexStreamingCall Call, Task ConsumeQueryProviderInboundInstructions, CancellationTokenSource ConsumeQueryProviderInboundInstructionsCancellationTokenSource, - OngoingQueryCollection OngoingQueries, QueryHandlerCollection QueryHandlers, FlowController Flow, QueryExecutions QueryExecutions, SubscriptionQueryExecutions SubscriptionQueryExecutions) : State(QueryHandlers, Flow, QueryExecutions, SubscriptionQueryExecutions); public record Faulted(QueryHandlerCollection QueryHandlers, FlowController Flow, QueryExecutions QueryExecutions, SubscriptionQueryExecutions SubscriptionQueryExecutions) : State(QueryHandlers, Flow, QueryExecutions, SubscriptionQueryExecutions); } diff --git a/test/AxonIQ.AxonServer.Connector.Tests/OngoingQueryCollectionTests.cs b/test/AxonIQ.AxonServer.Connector.Tests/OngoingQueryCollectionTests.cs deleted file mode 100644 index 92a66d6..0000000 --- a/test/AxonIQ.AxonServer.Connector.Tests/OngoingQueryCollectionTests.cs +++ /dev/null @@ -1,72 +0,0 @@ -namespace AxonIQ.AxonServer.Connector.Tests; - -public class OngoingQueryCollectionTests -{ - private readonly OngoingQueryCollection _sut; - - public OngoingQueryCollectionTests() - { - _sut = new OngoingQueryCollection(); - } - - [Fact] - public void AddQueryHasExpectedResult() - { - var queryId = InstructionId.New(); - var forwarder = new FakeForwarder(); - _sut.AddQuery(queryId, forwarder); - - Assert.False(_sut.TryFlowControlRequestForQuery(queryId, 1)); - - Assert.Same(forwarder, _sut.RemoveQuery(queryId)); - Assert.Null(_sut.RemoveQuery(queryId)); - } - - [Fact] - public void TryFlowControlRequestForNonExistingQueryHasExpectedResult() - { - Assert.False(_sut.TryFlowControlRequestForQuery(InstructionId.New(), 1)); - } - - [Fact] - public void TryFlowControlRequestForQueryHasExpectedResult() - { - var queryId = InstructionId.New(); - var forwarder = new FakeFlowControlledForwarder(); - _sut.AddQuery(queryId, forwarder); - - Assert.True(_sut.TryFlowControlRequestForQuery(queryId, 1)); - Assert.True(_sut.TryFlowControlRequestForQuery(queryId, 1)); - Assert.Equal(2, forwarder.Requested); - } - - private class FakeForwarder : IAsyncDisposable - { - public ValueTask DisposeAsync() - { - return ValueTask.CompletedTask; - } - } - - private class FakeFlowControlledForwarder : IFlowControl, IAsyncDisposable - { - public long Requested { get; private set; } - - public bool Cancelled { get; private set; } - - public void Request(long count) - { - Requested += count; - } - - public void Cancel() - { - Cancelled = true; - } - - public ValueTask DisposeAsync() - { - return ValueTask.CompletedTask; - } - } -} \ No newline at end of file diff --git a/test/AxonIQ.AxonServer.Connector.Tests/PublicApiAssertions.VerifyApiChanges.verified.txt b/test/AxonIQ.AxonServer.Connector.Tests/PublicApiAssertions.VerifyApiChanges.verified.txt index 68edfc0..12fba07 100644 --- a/test/AxonIQ.AxonServer.Connector.Tests/PublicApiAssertions.VerifyApiChanges.verified.txt +++ b/test/AxonIQ.AxonServer.Connector.Tests/PublicApiAssertions.VerifyApiChanges.verified.txt @@ -5,13 +5,15 @@ [assembly: System.Runtime.Versioning.TargetFramework(".NETCoreApp,Version=v6.0", FrameworkDisplayName="")] namespace AxonIQ.AxonServer.Connector { - public readonly struct AggregateId + public readonly struct AggregateId : System.IEquatable { public AggregateId(string value) { } public bool Equals(AxonIQ.AxonServer.Connector.AggregateId other) { } public override bool Equals(object? obj) { } public override int GetHashCode() { } public override string ToString() { } + public static bool operator !=(AxonIQ.AxonServer.Connector.AggregateId left, AxonIQ.AxonServer.Connector.AggregateId right) { } + public static bool operator ==(AxonIQ.AxonServer.Connector.AggregateId left, AxonIQ.AxonServer.Connector.AggregateId right) { } } public static class AxonServerAuthentication { @@ -382,8 +384,8 @@ namespace AxonIQ.AxonServer.Connector } public interface IQueryHandler { - System.Threading.Tasks.Task HandleAsync(Io.Axoniq.Axonserver.Grpc.Query.QueryRequest request, AxonIQ.AxonServer.Connector.IQueryResponseChannel responseChannel); - AxonIQ.AxonServer.Connector.ISubscriptionQueryRegistration? RegisterSubscriptionQuery(Io.Axoniq.Axonserver.Grpc.Query.SubscriptionQuery query, AxonIQ.AxonServer.Connector.ISubscriptionQueryUpdateResponseChannel responseChannel); + System.Threading.Tasks.Task HandleAsync(Io.Axoniq.Axonserver.Grpc.Query.QueryRequest request, AxonIQ.AxonServer.Connector.IQueryResponseChannel responseChannel, System.Threading.CancellationToken ct); + System.Threading.Tasks.Task? TryHandleAsync(Io.Axoniq.Axonserver.Grpc.Query.SubscriptionQuery query, AxonIQ.AxonServer.Connector.ISubscriptionQueryUpdateResponseChannel responseChannel, System.Threading.CancellationToken ct); } public interface IQueryHandlerRegistration : System.IAsyncDisposable { @@ -391,11 +393,11 @@ namespace AxonIQ.AxonServer.Connector } public interface IQueryResponseChannel { - System.Threading.Tasks.ValueTask CompleteAsync(); - System.Threading.Tasks.ValueTask CompleteWithErrorAsync(Io.Axoniq.Axonserver.Grpc.ErrorMessage errorMessage); - System.Threading.Tasks.ValueTask CompleteWithErrorAsync(AxonIQ.AxonServer.Connector.ErrorCategory errorCategory, Io.Axoniq.Axonserver.Grpc.ErrorMessage errorMessage); - System.Threading.Tasks.ValueTask SendAsync(Io.Axoniq.Axonserver.Grpc.Query.QueryResponse response); - System.Threading.Tasks.ValueTask SendLastAsync(Io.Axoniq.Axonserver.Grpc.Query.QueryResponse response); + System.Threading.Tasks.ValueTask CompleteAsync(System.Threading.CancellationToken cancellationToken); + System.Threading.Tasks.ValueTask CompleteWithErrorAsync(Io.Axoniq.Axonserver.Grpc.ErrorMessage error, System.Threading.CancellationToken cancellationToken); + System.Threading.Tasks.ValueTask CompleteWithErrorAsync(AxonIQ.AxonServer.Connector.ErrorCategory category, string message, System.Threading.CancellationToken cancellationToken); + System.Threading.Tasks.ValueTask SendAsync(Io.Axoniq.Axonserver.Grpc.Query.QueryResponse response, System.Threading.CancellationToken cancellationToken); + System.Threading.Tasks.ValueTask SendLastAsync(Io.Axoniq.Axonserver.Grpc.Query.QueryResponse response, System.Threading.CancellationToken cancellationToken); } public interface IQuerySubscriptionResult : System.IAsyncDisposable { @@ -407,14 +409,10 @@ namespace AxonIQ.AxonServer.Connector System.Func Clock { get; } System.Threading.Tasks.ValueTask ScheduleTaskAsync(System.Func task, System.TimeSpan due); } - public interface ISubscriptionQueryRegistration : System.IAsyncDisposable - { - System.Threading.Tasks.Task WaitUntilCompletedAsync(); - } public interface ISubscriptionQueryUpdateResponseChannel { - System.Threading.Tasks.ValueTask CompleteAsync(); - System.Threading.Tasks.ValueTask SendUpdateAsync(Io.Axoniq.Axonserver.Grpc.Query.QueryUpdate update); + System.Threading.Tasks.ValueTask CompleteAsync(System.Threading.CancellationToken ct); + System.Threading.Tasks.ValueTask SendUpdateAsync(Io.Axoniq.Axonserver.Grpc.Query.QueryUpdate update, System.Threading.CancellationToken ct); } public readonly struct InstructionId { @@ -484,9 +482,9 @@ namespace AxonIQ.AxonServer.Connector } public class QueryDefinition : System.IEquatable { - public QueryDefinition(AxonIQ.AxonServer.Connector.QueryName QueryName, string ResultType) { } + public QueryDefinition(AxonIQ.AxonServer.Connector.QueryName QueryName, string ResultName) { } public AxonIQ.AxonServer.Connector.QueryName QueryName { get; init; } - public string ResultType { get; init; } + public string ResultName { get; init; } } public readonly struct QueryHandlerId { @@ -497,13 +495,15 @@ namespace AxonIQ.AxonServer.Connector public override string ToString() { } public static AxonIQ.AxonServer.Connector.QueryHandlerId New() { } } - public readonly struct QueryName + public readonly struct QueryName : System.IEquatable { public QueryName(string value) { } public bool Equals(AxonIQ.AxonServer.Connector.QueryName other) { } public override bool Equals(object? obj) { } public override int GetHashCode() { } public override string ToString() { } + public static bool operator !=(AxonIQ.AxonServer.Connector.QueryName left, AxonIQ.AxonServer.Connector.QueryName right) { } + public static bool operator ==(AxonIQ.AxonServer.Connector.QueryName left, AxonIQ.AxonServer.Connector.QueryName right) { } } public delegate System.Threading.Tasks.ValueTask ReceiveHeartbeatAcknowledgement(Io.Axoniq.Axonserver.Grpc.InstructionAck message); public class ReconnectOptions : System.IEquatable @@ -528,18 +528,16 @@ namespace AxonIQ.AxonServer.Connector public static bool operator !=(AxonIQ.AxonServer.Connector.RegisteredCommandId left, AxonIQ.AxonServer.Connector.RegisteredCommandId right) { } public static bool operator ==(AxonIQ.AxonServer.Connector.RegisteredCommandId left, AxonIQ.AxonServer.Connector.RegisteredCommandId right) { } } - public readonly struct RegistrationId + public readonly struct RegisteredQueryId : System.IEquatable { - public RegistrationId(string value) { } - public bool Equals(AxonIQ.AxonServer.Connector.RegistrationId other) { } + public bool Equals(AxonIQ.AxonServer.Connector.RegisteredQueryId other) { } public override bool Equals(object? obj) { } public override int GetHashCode() { } public override string ToString() { } - public static AxonIQ.AxonServer.Connector.RegistrationId New() { } - public static bool operator !=(AxonIQ.AxonServer.Connector.RegistrationId left, AxonIQ.AxonServer.Connector.RegistrationId right) { } - public static bool operator ==(AxonIQ.AxonServer.Connector.RegistrationId left, AxonIQ.AxonServer.Connector.RegistrationId right) { } + public static AxonIQ.AxonServer.Connector.RegisteredQueryId New() { } + public static bool operator !=(AxonIQ.AxonServer.Connector.RegisteredQueryId left, AxonIQ.AxonServer.Connector.RegisteredQueryId right) { } + public static bool operator ==(AxonIQ.AxonServer.Connector.RegisteredQueryId left, AxonIQ.AxonServer.Connector.RegisteredQueryId right) { } } - public delegate System.Threading.Tasks.ValueTask RequestReconnect(); public readonly struct ScheduledEventCancellationToken : System.IEquatable { public ScheduledEventCancellationToken(string value) { } @@ -547,8 +545,10 @@ namespace AxonIQ.AxonServer.Connector public override bool Equals(object? obj) { } public override int GetHashCode() { } public override string ToString() { } + public static bool operator !=(AxonIQ.AxonServer.Connector.ScheduledEventCancellationToken left, AxonIQ.AxonServer.Connector.ScheduledEventCancellationToken right) { } + public static bool operator ==(AxonIQ.AxonServer.Connector.ScheduledEventCancellationToken left, AxonIQ.AxonServer.Connector.ScheduledEventCancellationToken right) { } } - public readonly struct SegmentId + public readonly struct SegmentId : System.IEquatable { public SegmentId(int value) { } public bool Equals(AxonIQ.AxonServer.Connector.SegmentId other) { } @@ -556,6 +556,8 @@ namespace AxonIQ.AxonServer.Connector public override int GetHashCode() { } public int ToInt32() { } public override string ToString() { } + public static bool operator !=(AxonIQ.AxonServer.Connector.SegmentId left, AxonIQ.AxonServer.Connector.SegmentId right) { } + public static bool operator ==(AxonIQ.AxonServer.Connector.SegmentId left, AxonIQ.AxonServer.Connector.SegmentId right) { } } public delegate System.Threading.Tasks.ValueTask SendHeartbeat(AxonIQ.AxonServer.Connector.ReceiveHeartbeatAcknowledgement responder, System.TimeSpan timeout); public static class ServiceCollectionExtensions @@ -569,7 +571,7 @@ namespace AxonIQ.AxonServer.Connector public static Microsoft.Extensions.DependencyInjection.IServiceCollection AddAxonServerConnectionFactory(this Microsoft.Extensions.DependencyInjection.IServiceCollection services, Microsoft.Extensions.Configuration.IConfiguration configuration) { } public static Microsoft.Extensions.DependencyInjection.IServiceCollection AddAxonServerConnectionFactory(this Microsoft.Extensions.DependencyInjection.IServiceCollection services, System.Action configure) { } } - public readonly struct SubscriptionId + public readonly struct SubscriptionId : System.IEquatable { public SubscriptionId(string value) { } public bool Equals(AxonIQ.AxonServer.Connector.SubscriptionId other) { } @@ -580,17 +582,6 @@ namespace AxonIQ.AxonServer.Connector public static bool operator !=(AxonIQ.AxonServer.Connector.SubscriptionId left, AxonIQ.AxonServer.Connector.SubscriptionId right) { } public static bool operator ==(AxonIQ.AxonServer.Connector.SubscriptionId left, AxonIQ.AxonServer.Connector.SubscriptionId right) { } } - public readonly struct SubscriptionIdentifier - { - public SubscriptionIdentifier(string value) { } - public bool Equals(AxonIQ.AxonServer.Connector.SubscriptionIdentifier other) { } - public override bool Equals(object? obj) { } - public override int GetHashCode() { } - public override string ToString() { } - public static AxonIQ.AxonServer.Connector.SubscriptionIdentifier New() { } - public static bool operator !=(AxonIQ.AxonServer.Connector.SubscriptionIdentifier left, AxonIQ.AxonServer.Connector.SubscriptionIdentifier right) { } - public static bool operator ==(AxonIQ.AxonServer.Connector.SubscriptionIdentifier left, AxonIQ.AxonServer.Connector.SubscriptionIdentifier right) { } - } public static class TimeSpanMath { public static System.TimeSpan Max(System.TimeSpan left, System.TimeSpan right) { } @@ -611,7 +602,6 @@ namespace AxonIQ.AxonServer.Connector { public static OpenTelemetry.Trace.TracerProviderBuilder AddAxonServerConnectorInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder builder) { } } - public delegate System.Threading.Tasks.ValueTask WriteCommandProviderOutbound(Io.Axoniq.Axonserver.Grpc.Command.CommandProviderOutbound instruction); public delegate System.Threading.Tasks.ValueTask WritePlatformInboundInstruction(Io.Axoniq.Axonserver.Grpc.Control.PlatformInboundInstruction instruction); public delegate System.Threading.Tasks.ValueTask WriteQueryProviderOutbound(Io.Axoniq.Axonserver.Grpc.Query.QueryProviderOutbound instruction); }