Skip to content
This repository has been archived by the owner on Sep 10, 2021. It is now read-only.

Commit

Permalink
Performance improvements
Browse files Browse the repository at this point in the history
- Removed dependency on BufferBlock<> for services, now using the observer pattern generating 5-15x throughput improvements and a reduction in memory usage
- EventSubscription no longer supports ReceiveAsync methods, use EventSubscription.AsObservable or an extension to buffer IObservable into a ReceiveAsync pattern
- Moved to 0.1.4
  • Loading branch information
alandoherty committed Nov 7, 2018
1 parent fa3552f commit f4da57f
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 242 deletions.
60 changes: 43 additions & 17 deletions samples/Example.General/Program.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Security.Cryptography.X509Certificates;
using System.Text;
Expand Down Expand Up @@ -35,13 +36,16 @@ interface ITest001

class Test001 : ITest001
{
static int si = 0;
int i = Interlocked.Increment(ref si);
private Guid _uuid;

public async Task<string> Login(LoginRequestMsg login) {
Console.WriteLine($"Worker Waiting {i} - Username: {login.Username} Password: {login.Password}");
//Console.WriteLine($"Worker ({_uuid}) - Username: {login.Username} Password: {login.Password}");

return "Wow";
}

return null;
public Test001(Guid uuid) {
_uuid = uuid;
}
}

Expand All @@ -51,25 +55,26 @@ class Program

static void Main(string[] args) => AsyncMain(args).Wait();

public static async void ReadLoop(Node node) {
ITest001 proxy = node.SecureProxy<ITest001>("auth:test", new SecureChannelConfiguration() {
ValidateAuthority = false,
ValidateAddress = false
});
public static async void ReadLoop(int[] ctr, Node node, Guid[] uuids) {
Random rand = new Random();

while (true) {
try {
int i = rand.Next(0, uuids.Length);
Guid uuid = uuids[i];
ITest001 proxy = node.Proxy<ITest001>($"auth:{uuid}");

string s = await proxy.Login(new LoginRequestMsg() {
Password = "wow",
Username = "alan"
});
}).ConfigureAwait(false);

Interlocked.Increment(ref ctr[0]);

Console.WriteLine($"String: {s}");
//Console.WriteLine($"String: {s}");
} catch(Exception ex) {
Console.WriteLine(ex.ToString());
}

await Task.Delay(3000);
}
}

Expand Down Expand Up @@ -97,11 +102,32 @@ static async Task AsyncMain(string[] args) {
ThrowUnhandledExceptions = true
});

Service s = await TestNode.AttachAsync("auth:test", new ServiceConfiguration() {
Filters = new IServiceFilter[] { new SecureFilter(new X509Certificate2("public_privatekey.pfx"), "bacon")}
}, RpcBehaviour.Bind<ITest001>(new Test001()));
// attach services
Guid[] uuids = new Guid[500];

for (int i = 0; i < uuids.Length; i++) {
uuids[i] = Guid.NewGuid();

ReadLoop(TestNode);
await TestNode.AttachAsync($"auth:{uuids[i]}", RpcBehaviour.Bind<ITest001>(new Test001(uuids[i]))).ConfigureAwait(false);
}

Console.WriteLine($"Attached {uuids.Length} services");

int[] ctr = new int[] { 0 };
int pavg = 0;

for (int i = 0; i < 32; i++)
ReadLoop(ctr, TestNode, uuids);

while(true) {
Console.WriteLine($"Logging in at {ctr[0]}/s avg ({pavg}/s), ({Process.GetCurrentProcess().Threads.Count} threads)");

pavg += ctr[0];
pavg = pavg / 2;
ctr[0] = 0;

await Task.Delay(1000).ConfigureAwait(false);
}

await Task.Delay(50000);
}
Expand Down
44 changes: 0 additions & 44 deletions src/Holon/Events/EventSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,50 +74,6 @@ private Event ProcessMessage(InboundMessage message) {
public IObservable<Event> AsObservable() {
return new EventObservable(this);
}

/// <summary>
/// Receives an event asyncronously.
/// </summary>
/// <returns>The event.</returns>
public async Task<Event> ReceiveAsync() {
while (true) {
InboundMessage message = await _queue.ReceiveAsync();

try {
return ProcessMessage(message);
} catch (Exception) { }
}
}

/// <summary>
/// Receives an event asyncronously.
/// </summary>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>The event.</returns>
public async Task<Event> ReceiveAsync(CancellationToken cancellationToken) {
while (true) {
InboundMessage message = await _queue.ReceiveAsync(cancellationToken);

try {
return ProcessMessage(message);
} catch (Exception) { }
}
}

/// <summary>
/// Receives an event asyncronously.
/// </summary>
/// <param name="timeout">The timeout.</param>
/// <returns>The event.</returns>
public async Task<Event> ReceiveAsync(TimeSpan timeout) {
while (true) {
InboundMessage message = await _queue.ReceiveAsync(timeout);

try {
return ProcessMessage(message);
} catch (Exception) { }
}
}

/// <summary>
/// Disposes the underlying queue.
Expand Down
6 changes: 3 additions & 3 deletions src/Holon/Holon.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@

<PropertyGroup>
<TargetFramework>netstandard1.6</TargetFramework>
<Version>0.1.3.1</Version>
<Version>0.1.4</Version>
<Authors>Alan Doherty</Authors>
<Company>Alan Doherty</Company>
<Description>A minimal service and event bus with additional support for RPC</Description>
<Copyright>BattleCrate Ltd 2018</Copyright>
<PackageProjectUrl>https://github.com/alandoherty/holon-net</PackageProjectUrl>
<RepositoryUrl>https://github.com/alandoherty/holon-net</RepositoryUrl>
<RepositoryType>git</RepositoryType>
<AssemblyVersion>0.1.3.1</AssemblyVersion>
<AssemblyVersion>0.1.4.0</AssemblyVersion>
<PackageLicenseUrl>https://github.com/alandoherty/holon-net/blob/master/LICENSE</PackageLicenseUrl>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<PackageIconUrl>https://s3-eu-west-1.amazonaws.com/assets.alandoherty.co.uk/github/holon-net-nuget.png</PackageIconUrl>
<FileVersion>0.1.3.1</FileVersion>
<FileVersion>0.1.4.0</FileVersion>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
Expand Down
99 changes: 57 additions & 42 deletions src/Holon/Namespace.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ internal class Namespace : IDisposable
private Dictionary<Guid, ReplyWait> _replyWaits = new Dictionary<Guid, ReplyWait>();

private Node _node;
private Task _replyProcessor;

private List<string> _declaredEventNamespaces = new List<string>();
#endregion
Expand Down Expand Up @@ -165,16 +166,17 @@ public async Task SetupAsync() {
rng.GetBytes(uniqueId);
string uniqueIdStr = BitConverter.ToString(uniqueId).Replace("-", "").ToLower();

// add the reply queue
_replyQueue = await _broker.CreateQueueAsync(string.Format("~reply:{1}%{0}", _node.UUID, uniqueIdStr), false, true, "", "", true, true, new Dictionary<string, object>() {
{ "x-expires", (int)TimeSpan.FromMinutes(15).TotalMilliseconds }
}).ConfigureAwait(false);

// subscribe to reply queue
_replyQueue.AsObservable().Subscribe(new ReplyObserver(this));
}
} catch (Exception ex) {
throw new InvalidOperationException("Failed to create node reply queue", ex);
}

// start reply processor
ReplyLoop();
}

/// <summary>
Expand Down Expand Up @@ -257,53 +259,43 @@ public async Task SetupServiceAsync(Service service) {
/// <summary>
/// Node worker to process reply messages.
/// </summary>
private async void ReplyLoop() {
while (_disposed == 0) {
// receieve broker message
InboundMessage msg = null;

try {
msg = await _replyQueue.ReceiveAsync().ConfigureAwait(false);
} catch (Exception) {
break;
}
/// <param name="msg">The inbound message.</param>
private void ReplyProcess(InboundMessage msg) {
//TODO: cancel on dispose
Envelope envelope = new Envelope(msg, this);

//TODO: cancel on dispose
Envelope envelope = new Envelope(msg, this);
// check if we have an correlation
if (envelope.ID == Guid.Empty) {
// trigger event
_node.OnUnroutableReply(new UnroutableReplyEventArgs(envelope));

// check if we have an correlation
if (envelope.ID == Guid.Empty) {
// trigger event
_node.OnUnroutableReply(new UnroutableReplyEventArgs(envelope));
return;
}

continue;
}
// get completion source for this envelope
ReplyWait replyWait = default(ReplyWait);
bool foundReplyWait = false;

// get completion source for this envelope
ReplyWait replyWait = default(ReplyWait);
bool foundReplyWait = false;
lock (_replyWaits) {
foundReplyWait = _replyWaits.TryGetValue(envelope.ID, out replyWait);
}

lock (_replyWaits) {
foundReplyWait = _replyWaits.TryGetValue(envelope.ID, out replyWait);
}
if (!foundReplyWait) {
// log
Console.WriteLine("unroutable reply: {0}", envelope.ID);

if (!foundReplyWait) {
// log
Console.WriteLine("unroutable reply: {0}", envelope.ID);
// trigger event
_node.OnUnroutableReply(new UnroutableReplyEventArgs(envelope));
} else {
// if it's a multi-reply add to results, if not set completion source
if (replyWait.Results == null) {
lock (_replyWaits) {
_replyWaits.Remove(envelope.ID);
}

// trigger event
_node.OnUnroutableReply(new UnroutableReplyEventArgs(envelope));
replyWait.CompletionSource.TrySetResult(envelope);
} else {
// if it's a multi-reply add to results, if not set completion source
if (replyWait.Results == null) {
lock (_replyWaits) {
_replyWaits.Remove(envelope.ID);
}

replyWait.CompletionSource.TrySetResult(envelope);
} else {
replyWait.Results.Add(envelope);
}
replyWait.Results.Add(envelope);
}
}
}
Expand Down Expand Up @@ -494,6 +486,29 @@ public async Task<EventSubscription> SubscribeAsync(EventAddress addr) {
}
#endregion

class ReplyObserver : IObserver<InboundMessage>
{
public Namespace Namespace { get; set; }

public void OnCompleted() {
}

public void OnError(Exception error) {
}

public void OnNext(InboundMessage msg) {
Namespace.ReplyProcess(msg);
}

/// <summary>
/// Creates a new reply observer for the provided namespace.
/// </summary>
/// <param name="namespace">The namespace.</param>
public ReplyObserver(Namespace @namespace) {
Namespace = @namespace;
}
}

#region Constructors
/// <summary>
/// Creates a new namespace configuration.
Expand Down
2 changes: 1 addition & 1 deletion src/Holon/Protocol/Broker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public async Task<BrokerQueue> CreateQueueAsync(string name = "", bool durable =
}).ConfigureAwait(false);

// create consumer
AsyncConsumer consumer = new AsyncConsumer(_channel);
ObservableConsumer consumer = new ObservableConsumer(_channel);

// consume queue
string consumerTag = null;
Expand Down
32 changes: 3 additions & 29 deletions src/Holon/Protocol/BrokerQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ internal sealed class BrokerQueue : IDisposable
#region Fields
private string _queue;
private Broker _broker;
private AsyncConsumer _consumer;
private ObservableConsumer _consumer;
private string _consumerTag;
private List<string> _exchanges = new List<string>();

Expand Down Expand Up @@ -49,7 +49,7 @@ public string[] Exchanges {
/// </summary>
/// <returns></returns>
public IObservable<InboundMessage> AsObservable() {
return _consumer.AsObservable();
return _consumer;
}

/// <summary>
Expand Down Expand Up @@ -91,32 +91,6 @@ public void Unbind(string exchange, string routingKey) {
});
}

/// <summary>
/// Receives a message asyncronously.
/// </summary>
/// <returns>The message.</returns>
public Task<InboundMessage> ReceiveAsync() {
return ReceiveAsync(CancellationToken.None);
}

/// <summary>
/// Receives a message asyncronously.
/// </summary>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns></returns>
public async Task<InboundMessage> ReceiveAsync(CancellationToken cancellationToken) {
return await _consumer.ReceiveAsync(cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Receives a message asyncronously.
/// </summary>
/// <param name="timeout">The timeout.</param>
/// <returns></returns>
public async Task<InboundMessage> ReceiveAsync(TimeSpan timeout) {
return await _consumer.ReceiveAsync(timeout).ConfigureAwait(false);
}

/// <summary>
/// Disposes
/// </summary>
Expand All @@ -143,7 +117,7 @@ public void Dispose() {
/// <param name="queue">The queue name.</param>
/// <param name="consumerTag">The consumer tag.</param>
/// <param name="consumer">The consumer.</param>
internal BrokerQueue(Broker broker, string queue, string consumerTag, AsyncConsumer consumer) {
internal BrokerQueue(Broker broker, string queue, string consumerTag, ObservableConsumer consumer) {
_broker = broker;
_queue = queue;
_consumer = consumer;
Expand Down
Loading

0 comments on commit f4da57f

Please sign in to comment.