Skip to content

Commit

Permalink
Merge pull request #18 from s2quake/fix/callback
Browse files Browse the repository at this point in the history
Fix an issue where the server is closed
  • Loading branch information
s2quake authored Jun 16, 2024
2 parents 8ca2ee0 + e72a795 commit c33c2ed
Show file tree
Hide file tree
Showing 5 changed files with 239 additions and 41 deletions.
26 changes: 17 additions & 9 deletions src/JSSoft.Communication/Grpc/AdaptorClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -403,23 +403,31 @@ private void InvokeCallback(IService service, string name, string[] data)
}

var methodDescriptors = _methodsByService[service];
if (methodDescriptors.Contains(name) != true)
if (methodDescriptors.Contains(name) == true)
{
throw new InvalidOperationException("Invalid method name.");
var methodDescriptor = methodDescriptors[name];
var args = _serializer!.DeserializeMany(methodDescriptor.ParameterTypes, data);
var instance = _descriptor!.ClientInstances[service];
Task.Run(() => methodDescriptor.InvokeAsync(_serviceContext, instance, args));
}
else
{
LogUtility.Warn($"Method '{name}' is not found.");
}

var methodDescriptor = methodDescriptors[name];
var args = _serializer!.DeserializeMany(methodDescriptor.ParameterTypes, data);
var instance = _descriptor!.ClientInstances[service];
Task.Run(() => methodDescriptor.InvokeAsync(_serviceContext, instance, args));
}

private void InvokeCallback(PollReply reply)
{
foreach (var item in reply.Items)
{
var service = _serviceByName[item.ServiceName];
InvokeCallback(service, item.Name, [.. item.Data]);
if (_serviceByName.TryGetValue(item.ServiceName, out var service) == true)
{
InvokeCallback(service, item.Name, [.. item.Data]);
}
else
{
LogUtility.Warn($"Service '{item.ServiceName}' is not found.");
}
}

reply.Items.Clear();
Expand Down
83 changes: 83 additions & 0 deletions test/JSSoft.Communication.Tests/CallbackNoneTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// <copyright file="CallbackNoneTest.cs" company="JSSoft">
// Copyright (c) 2024 Jeesu Choi. All Rights Reserved.
// Licensed under the MIT License. See LICENSE.md in the project root for license information.
// </copyright>

using JSSoft.Communication.Tests.Extensions;
using Xunit.Abstractions;

namespace JSSoft.Communication.Tests;

public class CallbackNoneTest : IAsyncLifetime
{
private const int Timeout = 3000;
private readonly ITestOutputHelper _logger;
private readonly TestServer1 _testServer = new();
private readonly ServerContext _serverContext;
private readonly ClientContext _clientContext;
private readonly RandomEndPoint _endPoint = new();
private ITestService1? _server;

private Guid _clientToken;
private Guid _serverToken;

public CallbackNoneTest(ITestOutputHelper logger)
{
_logger = logger;
_serverContext = new(_testServer) { EndPoint = _endPoint };
_clientContext = new(new ClientService<ITestService1>()) { EndPoint = _endPoint };
logger.WriteLine($"{_endPoint}");
}

public interface ITestService1
{
void Invoke();
}

public interface ITestService2
{
void Invoke();
}

public interface ITestCallback2
{
void OnInvoked();
}

[Fact]
public void Callback1_Test()
{
var manualResetEvent = new ManualResetEvent(false);
_clientContext.Disconnected += ClientContext_Disconnected;
_server!.Invoke();
Assert.False(manualResetEvent.WaitOne(Timeout));

void ClientContext_Disconnected(object? sender, EventArgs e)
{
manualResetEvent.Set();
}
}

public async Task InitializeAsync()
{
_serverToken = await _serverContext.OpenAsync(CancellationToken.None);
_logger.WriteLine($"Server is opened: {_serverToken}");
_clientToken = await _clientContext.OpenAsync(CancellationToken.None);
_logger.WriteLine($"Client is opened: {_clientToken}");
_server = _testServer;
}

public async Task DisposeAsync()
{
await _serverContext.ReleaseAsync(_serverToken);
_logger.WriteLine($"Server is released: {_serverToken}");
await _clientContext.ReleaseAsync(_clientToken);
_logger.WriteLine($"Client is released: {_clientToken}");
_endPoint.Dispose();
}

private sealed class TestServer1 : ServerService<ITestService1, ITestCallback2>, ITestService1
{
public void Invoke() => Client.OnInvoked();
}
}
86 changes: 55 additions & 31 deletions test/JSSoft.Communication.Tests/CallbackTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,28 @@ namespace JSSoft.Communication.Tests;

public class CallbackTest : IAsyncLifetime
{
private const int Timeout = 30000;
private const int ClientCount = 2;
private readonly ITestOutputHelper _logger;
private readonly TestServer _testServer = new();
private readonly TestClient _testClient = new();
private readonly ServerContext _serverContext;
private readonly ClientContext _clientContext;
private readonly TestClient[] _testClients = new TestClient[ClientCount];
private readonly ClientContext[] _clientContexts = new ClientContext[ClientCount];
private readonly Guid[] _clientTokens = new Guid[ClientCount];
private readonly RandomEndPoint _endPoint = new();
private ITestService? _server;
private TestServer? _server;

private Guid _clientToken;
private Guid _serverToken;

public CallbackTest(ITestOutputHelper logger)
{
_logger = logger;
_serverContext = new(_testServer) { EndPoint = _endPoint };
_clientContext = new(_testClient) { EndPoint = _endPoint };
for (var i = 0; i < ClientCount; i++)
{
_testClients[i] = new() { Index = i };
_clientContexts[i] = new(_testClients[i]) { EndPoint = _endPoint };
}

logger.WriteLine($"{_endPoint}");
}

Expand All @@ -49,64 +54,76 @@ public interface ITestCallback
}

[Fact]
public void Callback1_Test()
public async Task Callback1_TestAsync()
{
var raised = Assert.Raises<ValueEventArgs>(
handler => _testClient.Invoked += handler,
handler => _testClient.Invoked -= handler,
() =>
var raisedCount = await EventTestUtility.RaisesManyAsync<TestClient, ValueEventArgs>(
items: _testClients,
attach: (item, handler) => item.Invoked += handler,
detach: (item, handler) => item.Invoked -= handler,
testCode: () =>
{
_server!.Invoke();
_testClient.AutoResetEvent.WaitOne(Timeout);
});
Assert.Null(raised.Arguments.Value);

Assert.Equal(_testClients.Length, raisedCount);
Assert.All(_testClients, item => Assert.Null(item.Value));
}

[Fact]
public void Callback2_Test()
public async Task Callback2_TestAsync()
{
var value = 123;
var raised = Assert.Raises<ValueEventArgs>(
handler => _testClient.Invoked += handler,
handler => _testClient.Invoked -= handler,
() =>
var raisedCount = await EventTestUtility.RaisesManyAsync<TestClient, ValueEventArgs>(
items: _testClients,
attach: (item, handler) => item.Invoked += handler,
detach: (item, handler) => item.Invoked -= handler,
testCode: () =>
{
_server!.Invoke(value);
_testClient.AutoResetEvent.WaitOne(Timeout);
});
Assert.Equal(value, raised.Arguments.Value);
Assert.Equal(_testClients.Length, raisedCount);
Assert.All(_testClients, item => Assert.Equal(value, item.Value));
}

[Fact]
public void Callback3_Test()
public async Task Callback3_TestAsync()
{
var value = (123, "123");
var raised = Assert.Raises<ValueEventArgs>(
handler => _testClient.Invoked += handler,
handler => _testClient.Invoked -= handler,
() =>
var raisedCount = await EventTestUtility.RaisesManyAsync<TestClient, ValueEventArgs>(
items: _testClients,
attach: (item, handler) => item.Invoked += handler,
detach: (item, handler) => item.Invoked -= handler,
testCode: () =>
{
_server!.Invoke(value);
_testClient.AutoResetEvent.WaitOne(Timeout);
});
Assert.Equal(value, raised.Arguments.Value);
Assert.Equal(_testClients.Length, raisedCount);
Assert.All(_testClients, item => Assert.Equal(value, item.Value));
}

public async Task InitializeAsync()
{
_serverToken = await _serverContext.OpenAsync(CancellationToken.None);
_logger.WriteLine($"Server is opened: {_serverToken}");
_clientToken = await _clientContext.OpenAsync(CancellationToken.None);
_logger.WriteLine($"Client is opened: {_clientToken}");
for (var i = 0; i < ClientCount; i++)
{
_clientTokens[i] = await _clientContexts[i].OpenAsync(CancellationToken.None);
_logger.WriteLine($"Client #{i} is opened: {_clientTokens[i]}");
}

_server = _testServer;
}

public async Task DisposeAsync()
{
await _serverContext.ReleaseAsync(_serverToken);
_logger.WriteLine($"Server is released: {_serverToken}");
await _clientContext.ReleaseAsync(_clientToken);
_logger.WriteLine($"Client is released: {_clientToken}");
for (var i = 0; i < ClientCount; i++)
{
await _clientContexts[i].ReleaseAsync(_clientTokens[i]);
_logger.WriteLine($"Client #{i} is released: {_clientTokens[i]}");
}

_endPoint.Dispose();
}

Expand All @@ -130,20 +147,27 @@ private sealed class TestClient : ClientService<ITestService, ITestCallback>, IT

public AutoResetEvent AutoResetEvent { get; } = new(initialState: false);

public int Index { get; set; } = -1;

public object? Value { get; private set; } = DBNull.Value;

void ITestCallback.OnInvoked()
{
Value = null;
Invoked?.Invoke(this, new(null));
AutoResetEvent.Set();
}

void ITestCallback.OnInvoked(int value)
{
Value = value;
Invoked?.Invoke(this, new(value));
AutoResetEvent.Set();
}

void ITestCallback.OnInvoked((int Value1, string Value2) value)
{
Value = value;
Invoked?.Invoke(this, new(value));
AutoResetEvent.Set();
}
Expand Down
53 changes: 53 additions & 0 deletions test/JSSoft.Communication.Tests/EventObjectCollection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// <copyright file="EventObjectCollection.cs" company="JSSoft">
// Copyright (c) 2024 Jeesu Choi. All Rights Reserved.
// Licensed under the MIT License. See LICENSE.md in the project root for license information.
// </copyright>

namespace JSSoft.Communication.Tests;

public sealed class EventObjectCollection<TObject, TEventArgs>
: Dictionary<TObject, ManualResetEvent>, IDisposable
where TObject : notnull
where TEventArgs : EventArgs
{
private readonly Action<TObject, EventHandler<TEventArgs>> _detach;

public EventObjectCollection(
TObject[] items,
Action<TObject, EventHandler<TEventArgs>> attach,
Action<TObject, EventHandler<TEventArgs>> detach)
: base(capacity: items.Length)
{
for (var i = 0; i < items.Length; i++)
{
Add(items[i], new ManualResetEvent(initialState: false));
attach(items[i], Handler);
}

_detach = detach;
}

public void Dispose()
{
foreach (var item in this)
{
_detach(item.Key, Handler);
item.Value.Dispose();
}
}

public async Task<int> WaiyAsync(int timeout)
{
var tasks = Values.Select(item => Task.Run(() => item.WaitOne(timeout)));
var results = await Task.WhenAll(tasks);
return results.Count(item => item == true);
}

private void Handler(object? s, TEventArgs args)
{
if (s is TObject item && TryGetValue(item, out var manualResetEvent) == true)
{
manualResetEvent.Set();
}
}
}
32 changes: 31 additions & 1 deletion test/JSSoft.Communication.Tests/EventTestUtility.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ namespace JSSoft.Communication.Tests;

public static class EventTestUtility
{
public const int Timeout = 10000;

public static async Task<bool> RaisesAsync(
Action<EventHandler> attach, Action<EventHandler> detach, Func<Task> testCode)
{
Expand All @@ -25,11 +27,39 @@ public static async Task<bool> RaisesAsync(
detach(Handler);
}

return manualResetEvent.WaitOne(millisecondsTimeout: 10000);
return manualResetEvent.WaitOne(Timeout);

void Handler(object? s, EventArgs args)
{
manualResetEvent.Set();
}
}

public static async Task<int> RaisesManyAsync<TObject, TEventArgs>(
TObject[] items,
Action<TObject, EventHandler<TEventArgs>> attach,
Action<TObject, EventHandler<TEventArgs>> detach,
Action testCode)
where TObject : notnull
where TEventArgs : EventArgs
{
using var eventByItem
= new EventObjectCollection<TObject, TEventArgs>(items, attach, detach);
testCode();
return await eventByItem.WaiyAsync(Timeout);
}

public static async Task<int> RaisesManyAsync<TObject, TEventArgs>(
TObject[] items,
Action<TObject, EventHandler<TEventArgs>> attach,
Action<TObject, EventHandler<TEventArgs>> detach,
Func<Task> testCode)
where TObject : notnull
where TEventArgs : EventArgs
{
using var eventByItem
= new EventObjectCollection<TObject, TEventArgs>(items, attach, detach);
await testCode();
return await eventByItem.WaiyAsync(Timeout);
}
}

0 comments on commit c33c2ed

Please sign in to comment.