diff --git a/src/Paramore.Brighter.DynamoDb/DynamoDbUnitOfWork.cs b/src/Paramore.Brighter.DynamoDb/DynamoDbUnitOfWork.cs index 3f2f1ecc48..e3b9f89f96 100644 --- a/src/Paramore.Brighter.DynamoDb/DynamoDbUnitOfWork.cs +++ b/src/Paramore.Brighter.DynamoDb/DynamoDbUnitOfWork.cs @@ -55,7 +55,7 @@ public void Close() /// Commit a transaction, performing all associated write actions /// Will block thread and use second thread for callback /// - public void Commit() => BrighterSynchronizationHelper.Run(() => DynamoDb.TransactWriteItemsAsync(_tx)); + public void Commit() => BrighterSynchronizationHelper.Run(async () => await DynamoDb.TransactWriteItemsAsync(_tx)); /// /// Commit a transaction, performing all associated write actions diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageConsumer.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageConsumer.cs index dcd0d7dcd6..8ac29a1162 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageConsumer.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageConsumer.cs @@ -77,7 +77,7 @@ public SqsMessageConsumer(AWSMessagingGatewayConnection awsConnection, /// Sync over Async /// /// The message. - public void Acknowledge(Message message) => BrighterSynchronizationHelper.Run(() => AcknowledgeAsync(message)); + public void Acknowledge(Message message) => BrighterSynchronizationHelper.Run(async () => await AcknowledgeAsync(message)); /// /// Acknowledges the specified message. @@ -112,7 +112,7 @@ public SqsMessageConsumer(AWSMessagingGatewayConnection awsConnection, /// Sync over async /// /// The message. - public void Reject(Message message) => BrighterSynchronizationHelper.Run(() => RejectAsync(message)); + public void Reject(Message message) => BrighterSynchronizationHelper.Run(async () => await RejectAsync(message)); /// /// Rejects the specified message. @@ -158,7 +158,7 @@ await client.ChangeMessageVisibilityAsync( /// Purges the specified queue name. /// Sync over Async /// - public void Purge() => BrighterSynchronizationHelper.Run(() => PurgeAsync()); + public void Purge() => BrighterSynchronizationHelper.Run(async () => await PurgeAsync()); /// /// Purges the specified queue name. @@ -187,7 +187,7 @@ await client.ChangeMessageVisibilityAsync( /// Sync over async /// /// The timeout. AWS uses whole seconds. Anything greater than 0 uses long-polling. - public Message[] Receive(TimeSpan? timeOut = null) => BrighterSynchronizationHelper.Run(() => ReceiveAsync(timeOut)); + public Message[] Receive(TimeSpan? timeOut = null) => BrighterSynchronizationHelper.Run(async () => await ReceiveAsync(timeOut)); /// /// Receives the specified queue name. @@ -255,7 +255,7 @@ await client.ChangeMessageVisibilityAsync( } - public bool Requeue(Message message, TimeSpan? delay = null) => BrighterSynchronizationHelper.Run(() => RequeueAsync(message, delay)); + public bool Requeue(Message message, TimeSpan? delay = null) => BrighterSynchronizationHelper.Run(async () => await RequeueAsync(message, delay)); /// /// Re-queues the specified message. diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageProducer.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageProducer.cs index d5dbe58fde..4f412e661f 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageProducer.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageProducer.cs @@ -132,7 +132,7 @@ public async Task SendAsync(Message message, CancellationToken cancellationToken /// Sync over Async /// /// The message. - public void Send(Message message) => BrighterSynchronizationHelper.Run(() => SendAsync(message)); + public void Send(Message message) => BrighterSynchronizationHelper.Run(async () => await SendAsync(message)); /// /// Sends the specified message, with a delay. diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusConsumer.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusConsumer.cs index 86cba4d7c2..fde4875028 100644 --- a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusConsumer.cs +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusConsumer.cs @@ -157,7 +157,7 @@ public async ValueTask DisposeAsync() /// /// The timeout for a message being available. Defaults to 300ms. /// Message. - public Message[] Receive(TimeSpan? timeOut = null) => BrighterSynchronizationHelper.Run(() => ReceiveAsync(timeOut)); + public Message[] Receive(TimeSpan? timeOut = null) => BrighterSynchronizationHelper.Run(async () => await ReceiveAsync(timeOut)); /// /// Receives the specified queue name. @@ -230,7 +230,7 @@ public async ValueTask DisposeAsync() /// Sync over Async /// /// The message. - public void Reject(Message message) => BrighterSynchronizationHelper.Run(() => RejectAsync(message)); + public void Reject(Message message) => BrighterSynchronizationHelper.Run(async () => await RejectAsync(message)); /// /// Rejects the specified message. @@ -268,7 +268,7 @@ public async ValueTask DisposeAsync() /// /// Delay to the delivery of the message. 0 is no delay. Defaults to 0. /// True if the message should be acked, false otherwise - public bool Requeue(Message message, TimeSpan? delay = null) => BrighterSynchronizationHelper.Run(() => RequeueAsync(message, delay)); + public bool Requeue(Message message, TimeSpan? delay = null) => BrighterSynchronizationHelper.Run(async () => await RequeueAsync(message, delay)); /// /// Requeues the specified message. diff --git a/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessagePublisher.cs b/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessagePublisher.cs index 6f6e63a61b..47c52b91ec 100644 --- a/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessagePublisher.cs +++ b/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessagePublisher.cs @@ -73,7 +73,7 @@ private async Task ConnectAsync() /// Sync over async /// /// The message. - public void PublishMessage(Message message) => BrighterSynchronizationHelper.Run(() => PublishMessageAsync(message)); + public void PublishMessage(Message message) => BrighterSynchronizationHelper.Run(async () => await PublishMessageAsync(message)); /// /// Sends the specified message asynchronously. diff --git a/src/Paramore.Brighter.MessagingGateway.RMQ/RmqMessageConsumer.cs b/src/Paramore.Brighter.MessagingGateway.RMQ/RmqMessageConsumer.cs index 98524f3202..a4a7ef13c3 100644 --- a/src/Paramore.Brighter.MessagingGateway.RMQ/RmqMessageConsumer.cs +++ b/src/Paramore.Brighter.MessagingGateway.RMQ/RmqMessageConsumer.cs @@ -142,7 +142,7 @@ public RmqMessageConsumer( /// Acknowledges the specified message. /// /// The message. - public void Acknowledge(Message message) => BrighterSynchronizationHelper.Run(() =>AcknowledgeAsync(message)); + public void Acknowledge(Message message) => BrighterSynchronizationHelper.Run(async () =>await AcknowledgeAsync(message)); public async Task AcknowledgeAsync(Message message, CancellationToken cancellationToken = default) { @@ -170,7 +170,7 @@ public async Task AcknowledgeAsync(Message message, CancellationToken cancellati /// /// Purges the specified queue name. /// - public void Purge() => BrighterSynchronizationHelper.Run(() => PurgeAsync()); + public void Purge() => BrighterSynchronizationHelper.Run(async () => await PurgeAsync()); public async Task PurgeAsync(CancellationToken cancellationToken = default) { @@ -295,7 +295,7 @@ exception is NotSupportedException || /// /// Time to delay delivery of the message. /// True if message deleted, false otherwise - public bool Requeue(Message message, TimeSpan? timeout = null) => BrighterSynchronizationHelper.Run(() => RequeueAsync(message, timeout)); + public bool Requeue(Message message, TimeSpan? timeout = null) => BrighterSynchronizationHelper.Run(async () => await RequeueAsync(message, timeout)); public async Task RequeueAsync(Message message, TimeSpan? timeout = null, CancellationToken cancellationToken = default) diff --git a/src/Paramore.Brighter.MessagingGateway.RMQ/RmqMessageProducer.cs b/src/Paramore.Brighter.MessagingGateway.RMQ/RmqMessageProducer.cs index 27f5b33b10..756917ae71 100644 --- a/src/Paramore.Brighter.MessagingGateway.RMQ/RmqMessageProducer.cs +++ b/src/Paramore.Brighter.MessagingGateway.RMQ/RmqMessageProducer.cs @@ -104,7 +104,7 @@ public RmqMessageProducer(RmqMessagingGatewayConnection connection, RmqPublicati /// The message. /// Delay to delivery of the message. /// Task. - public void SendWithDelay(Message message, TimeSpan? delay = null) => BrighterSynchronizationHelper.Run(() => SendWithDelayAsync(message, delay)); + public void SendWithDelay(Message message, TimeSpan? delay = null) => BrighterSynchronizationHelper.Run(async () => await SendWithDelayAsync(message, delay)); /// /// Sends the specified message diff --git a/src/Paramore.Brighter/Tasks/BrighterSynchronizationContext.cs b/src/Paramore.Brighter/Tasks/BrighterSynchronizationContext.cs index 502c1dedfd..9adc5415b6 100644 --- a/src/Paramore.Brighter/Tasks/BrighterSynchronizationContext.cs +++ b/src/Paramore.Brighter/Tasks/BrighterSynchronizationContext.cs @@ -140,28 +140,11 @@ public override void Post(SendOrPostCallback callback, object? state) //NOTE: if we got here, something went wrong, we should have been able to queue the message //mostly this seems to be a problem with the task we are running completing, but work is still being queued to the //synchronization context. - // If the execution context can help, we might be able to redirect; if not just run immediately on this thread - var contextCallback = new ContextCallback(callback); if (ctxt != null && ctxt != _executionContext) - { - Debug.WriteLine(string.Empty); - Debug.IndentLevel = 1; - Debug.WriteLine($"BrighterSynchronizationContext: Post Failed to queue {callback.Method.Name} on thread {Thread.CurrentThread.ManagedThreadId}"); - Debug.WriteLine($"BrighterSynchronizationContext: Parent Task {ParentTaskId}"); - Debug.IndentLevel = 0; - SynchronizationHelper.ExecuteOnContext(ctxt, contextCallback, state); - } + ExecuteOnCallersContext(contextCallback, state, ctxt); else - { - Debug.WriteLine(string.Empty); - Debug.IndentLevel = 1; - Debug.WriteLine($"BrighterSynchronizationContext: Post Failed to queue {callback.Method.Name} on thread {Thread.CurrentThread.ManagedThreadId}"); - Debug.WriteLine($"BrighterSynchronizationContext: Parent Task {ParentTaskId}"); - Debug.IndentLevel = 0; - //just execute inline - SynchronizationHelper.ExecuteImmediately(contextCallback, state); - } + ExecuteImmediately(contextCallback, state); Debug.WriteLine(string.Empty); } @@ -192,5 +175,35 @@ public override void Send(SendOrPostCallback callback, object? state) throw new TimeoutException("BrighterSynchronizationContext: Send operation timed out."); } } + + private void ExecuteImmediately(ContextCallback contextCallback, object? state) + { + Debug.WriteLine(string.Empty); + Debug.IndentLevel = 1; + Debug.Fail("BrighterSynchronizationContext: ExecuteImmediately. We should never get here"); + Debug.WriteLine($"BrighterSynchronizationContext: Post Failed to queue {contextCallback.Method.Name} on thread {Thread.CurrentThread.ManagedThreadId}"); + Debug.WriteLine($"BrighterSynchronizationContext: Parent Task {ParentTaskId}"); + Debug.IndentLevel = 0; + //just execute inline + SynchronizationHelper.ExecuteImmediately(contextCallback, state); + } + + /// + /// We should never get here as we should not be called from the wrong context + /// + /// + /// Any state to pass to the callback + /// + /// The callback to execute + private void ExecuteOnCallersContext(ContextCallback contextCallback, object? state, ExecutionContext ctxt) + { + Debug.WriteLine(string.Empty); + Debug.IndentLevel = 1; + Debug.Fail("BrighterSynchronizationContext: ExecuteOnCallersContext. We should never get here"); + Debug.WriteLine($"BrighterSynchronizationContext: Post Failed to queue {contextCallback.Method.Name} on thread {Thread.CurrentThread.ManagedThreadId}"); + Debug.WriteLine($"BrighterSynchronizationContext: Parent Task {ParentTaskId}"); + Debug.IndentLevel = 0; + SynchronizationHelper.ExecuteOnContext(ctxt, contextCallback, state); + } } } diff --git a/src/Paramore.Brighter/Tasks/BrighterSynchronizationHelper.cs b/src/Paramore.Brighter/Tasks/BrighterSynchronizationHelper.cs index a84b9c1ada..62c2296d9d 100644 --- a/src/Paramore.Brighter/Tasks/BrighterSynchronizationHelper.cs +++ b/src/Paramore.Brighter/Tasks/BrighterSynchronizationHelper.cs @@ -389,11 +389,11 @@ public void Execute(Task parentTask) foreach (var (task, propagateExceptions) in _taskQueue.GetConsumingEnumerable()) { _taskScheduler.DoTryExecuteTask(task); + _activeTasks.TryRemove(task, out _); if (!propagateExceptions) continue; task.GetAwaiter().GetResult(); - _activeTasks.TryRemove(task, out _); } }); diff --git a/tests/Paramore.Brighter.Core.Tests/Tasks/BrighterSynchronizationContextsTests.cs b/tests/Paramore.Brighter.Core.Tests/Tasks/BrighterSynchronizationContextsTests.cs index c102564b61..99abe8ee3a 100644 --- a/tests/Paramore.Brighter.Core.Tests/Tasks/BrighterSynchronizationContextsTests.cs +++ b/tests/Paramore.Brighter.Core.Tests/Tasks/BrighterSynchronizationContextsTests.cs @@ -306,25 +306,27 @@ public void Task_AfterExecute_NeverRuns() context.Execute(task); - try - { - var taskTwo = context.Factory.StartNew( - () => { value = 2; }, - context.Factory.CancellationToken, - context.Factory.CreationOptions | TaskCreationOptions.DenyChildAttach, - context.TaskScheduler); + var taskTwo = context.Factory.StartNew( + () => { value = 2; }, + context.Factory.CancellationToken, + context.Factory.CreationOptions | TaskCreationOptions.DenyChildAttach, + context.TaskScheduler); - taskTwo.ContinueWith(_ => { throw new Exception("Should not run"); }, TaskScheduler.Default); + taskTwo.ContinueWith(_ => { throw new Exception("Should not run"); }, TaskScheduler.Default); + bool exceptionRan = false; + try + { context.Execute(taskTwo); } catch (Exception e) { - Console.WriteLine(e); - throw; + exceptionRan = true; } + //there should be no pending work value.Should().Be(1); + exceptionRan.Should().BeFalse(); } [Fact]