Skip to content

Commit

Permalink
fix: update method signatures, and minor fixes (BrighterCommand#3448)
Browse files Browse the repository at this point in the history
  • Loading branch information
iancooper authored Dec 31, 2024
1 parent a846bfd commit f6022f5
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 45 deletions.
2 changes: 1 addition & 1 deletion src/Paramore.Brighter.DynamoDb/DynamoDbUnitOfWork.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void Close()
/// Commit a transaction, performing all associated write actions
/// Will block thread and use second thread for callback
/// </summary>
public void Commit() => BrighterSynchronizationHelper.Run(() => DynamoDb.TransactWriteItemsAsync(_tx));
public void Commit() => BrighterSynchronizationHelper.Run(async () => await DynamoDb.TransactWriteItemsAsync(_tx));

/// <summary>
/// Commit a transaction, performing all associated write actions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public SqsMessageConsumer(AWSMessagingGatewayConnection awsConnection,
/// Sync over Async
/// </summary>
/// <param name="message">The message.</param>
public void Acknowledge(Message message) => BrighterSynchronizationHelper.Run(() => AcknowledgeAsync(message));
public void Acknowledge(Message message) => BrighterSynchronizationHelper.Run(async () => await AcknowledgeAsync(message));

/// <summary>
/// Acknowledges the specified message.
Expand Down Expand Up @@ -112,7 +112,7 @@ public SqsMessageConsumer(AWSMessagingGatewayConnection awsConnection,
/// Sync over async
/// </summary>
/// <param name="message">The message.</param>
public void Reject(Message message) => BrighterSynchronizationHelper.Run(() => RejectAsync(message));
public void Reject(Message message) => BrighterSynchronizationHelper.Run(async () => await RejectAsync(message));

/// <summary>
/// Rejects the specified message.
Expand Down Expand Up @@ -158,7 +158,7 @@ await client.ChangeMessageVisibilityAsync(
/// Purges the specified queue name.
/// Sync over Async
/// </summary>
public void Purge() => BrighterSynchronizationHelper.Run(() => PurgeAsync());
public void Purge() => BrighterSynchronizationHelper.Run(async () => await PurgeAsync());

/// <summary>
/// Purges the specified queue name.
Expand Down Expand Up @@ -187,7 +187,7 @@ await client.ChangeMessageVisibilityAsync(
/// Sync over async
/// </summary>
/// <param name="timeOut">The timeout. AWS uses whole seconds. Anything greater than 0 uses long-polling. </param>
public Message[] Receive(TimeSpan? timeOut = null) => BrighterSynchronizationHelper.Run(() => ReceiveAsync(timeOut));
public Message[] Receive(TimeSpan? timeOut = null) => BrighterSynchronizationHelper.Run(async () => await ReceiveAsync(timeOut));

/// <summary>
/// Receives the specified queue name.
Expand Down Expand Up @@ -255,7 +255,7 @@ await client.ChangeMessageVisibilityAsync(
}


public bool Requeue(Message message, TimeSpan? delay = null) => BrighterSynchronizationHelper.Run(() => RequeueAsync(message, delay));
public bool Requeue(Message message, TimeSpan? delay = null) => BrighterSynchronizationHelper.Run(async () => await RequeueAsync(message, delay));

/// <summary>
/// Re-queues the specified message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public async Task SendAsync(Message message, CancellationToken cancellationToken
/// Sync over Async
/// </summary>
/// <param name="message">The message.</param>
public void Send(Message message) => BrighterSynchronizationHelper.Run(() => SendAsync(message));
public void Send(Message message) => BrighterSynchronizationHelper.Run(async () => await SendAsync(message));

/// <summary>
/// Sends the specified message, with a delay.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public async ValueTask DisposeAsync()
/// </summary>
/// <param name="timeOut">The timeout for a message being available. Defaults to 300ms.</param>
/// <returns>Message.</returns>
public Message[] Receive(TimeSpan? timeOut = null) => BrighterSynchronizationHelper.Run(() => ReceiveAsync(timeOut));
public Message[] Receive(TimeSpan? timeOut = null) => BrighterSynchronizationHelper.Run(async () => await ReceiveAsync(timeOut));

/// <summary>
/// Receives the specified queue name.
Expand Down Expand Up @@ -230,7 +230,7 @@ public async ValueTask DisposeAsync()
/// Sync over Async
/// </summary>
/// <param name="message">The message.</param>
public void Reject(Message message) => BrighterSynchronizationHelper.Run(() => RejectAsync(message));
public void Reject(Message message) => BrighterSynchronizationHelper.Run(async () => await RejectAsync(message));

/// <summary>
/// Rejects the specified message.
Expand Down Expand Up @@ -268,7 +268,7 @@ public async ValueTask DisposeAsync()
/// <param name="message"></param>
/// <param name="delay">Delay to the delivery of the message. 0 is no delay. Defaults to 0.</param>
/// <returns>True if the message should be acked, false otherwise</returns>
public bool Requeue(Message message, TimeSpan? delay = null) => BrighterSynchronizationHelper.Run(() => RequeueAsync(message, delay));
public bool Requeue(Message message, TimeSpan? delay = null) => BrighterSynchronizationHelper.Run(async () => await RequeueAsync(message, delay));

/// <summary>
/// Requeues the specified message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private async Task ConnectAsync()
/// Sync over async
/// </summary>
/// <param name="message">The message.</param>
public void PublishMessage(Message message) => BrighterSynchronizationHelper.Run(() => PublishMessageAsync(message));
public void PublishMessage(Message message) => BrighterSynchronizationHelper.Run(async () => await PublishMessageAsync(message));

/// <summary>
/// Sends the specified message asynchronously.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public RmqMessageConsumer(
/// Acknowledges the specified message.
/// </summary>
/// <param name="message">The message.</param>
public void Acknowledge(Message message) => BrighterSynchronizationHelper.Run(() =>AcknowledgeAsync(message));
public void Acknowledge(Message message) => BrighterSynchronizationHelper.Run(async () =>await AcknowledgeAsync(message));

public async Task AcknowledgeAsync(Message message, CancellationToken cancellationToken = default)
{
Expand Down Expand Up @@ -170,7 +170,7 @@ public async Task AcknowledgeAsync(Message message, CancellationToken cancellati
/// <summary>
/// Purges the specified queue name.
/// </summary>
public void Purge() => BrighterSynchronizationHelper.Run(() => PurgeAsync());
public void Purge() => BrighterSynchronizationHelper.Run(async () => await PurgeAsync());

public async Task PurgeAsync(CancellationToken cancellationToken = default)
{
Expand Down Expand Up @@ -295,7 +295,7 @@ exception is NotSupportedException ||
/// <param name="message"></param>
/// <param name="timeout">Time to delay delivery of the message.</param>
/// <returns>True if message deleted, false otherwise</returns>
public bool Requeue(Message message, TimeSpan? timeout = null) => BrighterSynchronizationHelper.Run(() => RequeueAsync(message, timeout));
public bool Requeue(Message message, TimeSpan? timeout = null) => BrighterSynchronizationHelper.Run(async () => await RequeueAsync(message, timeout));

public async Task<bool> RequeueAsync(Message message, TimeSpan? timeout = null,
CancellationToken cancellationToken = default)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public RmqMessageProducer(RmqMessagingGatewayConnection connection, RmqPublicati
/// <param name="message">The message.</param>
/// <param name="delay">Delay to delivery of the message.</param>
/// <returns>Task.</returns>
public void SendWithDelay(Message message, TimeSpan? delay = null) => BrighterSynchronizationHelper.Run(() => SendWithDelayAsync(message, delay));
public void SendWithDelay(Message message, TimeSpan? delay = null) => BrighterSynchronizationHelper.Run(async () => await SendWithDelayAsync(message, delay));

/// <summary>
/// Sends the specified message
Expand Down
51 changes: 32 additions & 19 deletions src/Paramore.Brighter/Tasks/BrighterSynchronizationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,28 +140,11 @@ public override void Post(SendOrPostCallback callback, object? state)
//NOTE: if we got here, something went wrong, we should have been able to queue the message
//mostly this seems to be a problem with the task we are running completing, but work is still being queued to the
//synchronization context.
// If the execution context can help, we might be able to redirect; if not just run immediately on this thread

var contextCallback = new ContextCallback(callback);
if (ctxt != null && ctxt != _executionContext)
{
Debug.WriteLine(string.Empty);
Debug.IndentLevel = 1;
Debug.WriteLine($"BrighterSynchronizationContext: Post Failed to queue {callback.Method.Name} on thread {Thread.CurrentThread.ManagedThreadId}");
Debug.WriteLine($"BrighterSynchronizationContext: Parent Task {ParentTaskId}");
Debug.IndentLevel = 0;
SynchronizationHelper.ExecuteOnContext(ctxt, contextCallback, state);
}
ExecuteOnCallersContext(contextCallback, state, ctxt);
else
{
Debug.WriteLine(string.Empty);
Debug.IndentLevel = 1;
Debug.WriteLine($"BrighterSynchronizationContext: Post Failed to queue {callback.Method.Name} on thread {Thread.CurrentThread.ManagedThreadId}");
Debug.WriteLine($"BrighterSynchronizationContext: Parent Task {ParentTaskId}");
Debug.IndentLevel = 0;
//just execute inline
SynchronizationHelper.ExecuteImmediately(contextCallback, state);
}
ExecuteImmediately(contextCallback, state);
Debug.WriteLine(string.Empty);

}
Expand Down Expand Up @@ -192,5 +175,35 @@ public override void Send(SendOrPostCallback callback, object? state)
throw new TimeoutException("BrighterSynchronizationContext: Send operation timed out.");
}
}

private void ExecuteImmediately(ContextCallback contextCallback, object? state)
{
Debug.WriteLine(string.Empty);
Debug.IndentLevel = 1;
Debug.Fail("BrighterSynchronizationContext: ExecuteImmediately. We should never get here");
Debug.WriteLine($"BrighterSynchronizationContext: Post Failed to queue {contextCallback.Method.Name} on thread {Thread.CurrentThread.ManagedThreadId}");
Debug.WriteLine($"BrighterSynchronizationContext: Parent Task {ParentTaskId}");
Debug.IndentLevel = 0;
//just execute inline
SynchronizationHelper.ExecuteImmediately(contextCallback, state);
}

/// <summary>
/// We should never get here as we should not be called from the wrong context
/// </summary>
/// <param name="contextCallback"></param>
/// <param name="state">Any state to pass to the callback</param>
/// <param name="ctxt"></param>
/// <param name="callback">The callback to execute</param>
private void ExecuteOnCallersContext(ContextCallback contextCallback, object? state, ExecutionContext ctxt)
{
Debug.WriteLine(string.Empty);
Debug.IndentLevel = 1;
Debug.Fail("BrighterSynchronizationContext: ExecuteOnCallersContext. We should never get here");
Debug.WriteLine($"BrighterSynchronizationContext: Post Failed to queue {contextCallback.Method.Name} on thread {Thread.CurrentThread.ManagedThreadId}");
Debug.WriteLine($"BrighterSynchronizationContext: Parent Task {ParentTaskId}");
Debug.IndentLevel = 0;
SynchronizationHelper.ExecuteOnContext(ctxt, contextCallback, state);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -389,11 +389,11 @@ public void Execute(Task parentTask)
foreach (var (task, propagateExceptions) in _taskQueue.GetConsumingEnumerable())
{
_taskScheduler.DoTryExecuteTask(task);
_activeTasks.TryRemove(task, out _);

if (!propagateExceptions) continue;

task.GetAwaiter().GetResult();
_activeTasks.TryRemove(task, out _);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,25 +306,27 @@ public void Task_AfterExecute_NeverRuns()

context.Execute(task);

try
{
var taskTwo = context.Factory.StartNew(
() => { value = 2; },
context.Factory.CancellationToken,
context.Factory.CreationOptions | TaskCreationOptions.DenyChildAttach,
context.TaskScheduler);
var taskTwo = context.Factory.StartNew(
() => { value = 2; },
context.Factory.CancellationToken,
context.Factory.CreationOptions | TaskCreationOptions.DenyChildAttach,
context.TaskScheduler);

taskTwo.ContinueWith(_ => { throw new Exception("Should not run"); }, TaskScheduler.Default);
taskTwo.ContinueWith(_ => { throw new Exception("Should not run"); }, TaskScheduler.Default);

bool exceptionRan = false;
try
{
context.Execute(taskTwo);
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
exceptionRan = true;
}
//there should be no pending work

value.Should().Be(1);
exceptionRan.Should().BeFalse();
}

[Fact]
Expand Down

0 comments on commit f6022f5

Please sign in to comment.