diff --git a/samples/Example.General/Program.cs b/samples/Example.General/Program.cs index 652ae7e..27ebd38 100644 --- a/samples/Example.General/Program.cs +++ b/samples/Example.General/Program.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.IO; using System.Security.Cryptography.X509Certificates; using System.Text; @@ -35,13 +36,16 @@ interface ITest001 class Test001 : ITest001 { - static int si = 0; - int i = Interlocked.Increment(ref si); + private Guid _uuid; public async Task Login(LoginRequestMsg login) { - Console.WriteLine($"Worker Waiting {i} - Username: {login.Username} Password: {login.Password}"); + //Console.WriteLine($"Worker ({_uuid}) - Username: {login.Username} Password: {login.Password}"); + + return "Wow"; + } - return null; + public Test001(Guid uuid) { + _uuid = uuid; } } @@ -51,25 +55,26 @@ class Program static void Main(string[] args) => AsyncMain(args).Wait(); - public static async void ReadLoop(Node node) { - ITest001 proxy = node.SecureProxy("auth:test", new SecureChannelConfiguration() { - ValidateAuthority = false, - ValidateAddress = false - }); + public static async void ReadLoop(int[] ctr, Node node, Guid[] uuids) { + Random rand = new Random(); while (true) { try { + int i = rand.Next(0, uuids.Length); + Guid uuid = uuids[i]; + ITest001 proxy = node.Proxy($"auth:{uuid}"); + string s = await proxy.Login(new LoginRequestMsg() { Password = "wow", Username = "alan" - }); + }).ConfigureAwait(false); + + Interlocked.Increment(ref ctr[0]); - Console.WriteLine($"String: {s}"); + //Console.WriteLine($"String: {s}"); } catch(Exception ex) { Console.WriteLine(ex.ToString()); } - - await Task.Delay(3000); } } @@ -97,11 +102,32 @@ static async Task AsyncMain(string[] args) { ThrowUnhandledExceptions = true }); - Service s = await TestNode.AttachAsync("auth:test", new ServiceConfiguration() { - Filters = new IServiceFilter[] { new SecureFilter(new X509Certificate2("public_privatekey.pfx"), "bacon")} - }, RpcBehaviour.Bind(new Test001())); + // attach services + Guid[] uuids = new Guid[500]; + + for (int i = 0; i < uuids.Length; i++) { + uuids[i] = Guid.NewGuid(); - ReadLoop(TestNode); + await TestNode.AttachAsync($"auth:{uuids[i]}", RpcBehaviour.Bind(new Test001(uuids[i]))).ConfigureAwait(false); + } + + Console.WriteLine($"Attached {uuids.Length} services"); + + int[] ctr = new int[] { 0 }; + int pavg = 0; + + for (int i = 0; i < 32; i++) + ReadLoop(ctr, TestNode, uuids); + + while(true) { + Console.WriteLine($"Logging in at {ctr[0]}/s avg ({pavg}/s), ({Process.GetCurrentProcess().Threads.Count} threads)"); + + pavg += ctr[0]; + pavg = pavg / 2; + ctr[0] = 0; + + await Task.Delay(1000).ConfigureAwait(false); + } await Task.Delay(50000); } diff --git a/src/Holon/Events/EventSubscription.cs b/src/Holon/Events/EventSubscription.cs index 0c6b869..f883199 100644 --- a/src/Holon/Events/EventSubscription.cs +++ b/src/Holon/Events/EventSubscription.cs @@ -74,50 +74,6 @@ private Event ProcessMessage(InboundMessage message) { public IObservable AsObservable() { return new EventObservable(this); } - - /// - /// Receives an event asyncronously. - /// - /// The event. - public async Task ReceiveAsync() { - while (true) { - InboundMessage message = await _queue.ReceiveAsync(); - - try { - return ProcessMessage(message); - } catch (Exception) { } - } - } - - /// - /// Receives an event asyncronously. - /// - /// The cancellation token. - /// The event. - public async Task ReceiveAsync(CancellationToken cancellationToken) { - while (true) { - InboundMessage message = await _queue.ReceiveAsync(cancellationToken); - - try { - return ProcessMessage(message); - } catch (Exception) { } - } - } - - /// - /// Receives an event asyncronously. - /// - /// The timeout. - /// The event. - public async Task ReceiveAsync(TimeSpan timeout) { - while (true) { - InboundMessage message = await _queue.ReceiveAsync(timeout); - - try { - return ProcessMessage(message); - } catch (Exception) { } - } - } /// /// Disposes the underlying queue. diff --git a/src/Holon/Holon.csproj b/src/Holon/Holon.csproj index 1c5c4cb..ace0a0e 100644 --- a/src/Holon/Holon.csproj +++ b/src/Holon/Holon.csproj @@ -2,7 +2,7 @@ netstandard1.6 - 0.1.3.1 + 0.1.4 Alan Doherty Alan Doherty A minimal service and event bus with additional support for RPC @@ -10,11 +10,11 @@ https://github.com/alandoherty/holon-net https://github.com/alandoherty/holon-net git - 0.1.3.1 + 0.1.4.0 https://github.com/alandoherty/holon-net/blob/master/LICENSE true https://s3-eu-west-1.amazonaws.com/assets.alandoherty.co.uk/github/holon-net-nuget.png - 0.1.3.1 + 0.1.4.0 diff --git a/src/Holon/Namespace.cs b/src/Holon/Namespace.cs index b3819fa..6de4b2b 100644 --- a/src/Holon/Namespace.cs +++ b/src/Holon/Namespace.cs @@ -33,6 +33,7 @@ internal class Namespace : IDisposable private Dictionary _replyWaits = new Dictionary(); private Node _node; + private Task _replyProcessor; private List _declaredEventNamespaces = new List(); #endregion @@ -165,16 +166,17 @@ public async Task SetupAsync() { rng.GetBytes(uniqueId); string uniqueIdStr = BitConverter.ToString(uniqueId).Replace("-", "").ToLower(); + // add the reply queue _replyQueue = await _broker.CreateQueueAsync(string.Format("~reply:{1}%{0}", _node.UUID, uniqueIdStr), false, true, "", "", true, true, new Dictionary() { { "x-expires", (int)TimeSpan.FromMinutes(15).TotalMilliseconds } }).ConfigureAwait(false); + + // subscribe to reply queue + _replyQueue.AsObservable().Subscribe(new ReplyObserver(this)); } } catch (Exception ex) { throw new InvalidOperationException("Failed to create node reply queue", ex); } - - // start reply processor - ReplyLoop(); } /// @@ -257,53 +259,43 @@ public async Task SetupServiceAsync(Service service) { /// /// Node worker to process reply messages. /// - private async void ReplyLoop() { - while (_disposed == 0) { - // receieve broker message - InboundMessage msg = null; - - try { - msg = await _replyQueue.ReceiveAsync().ConfigureAwait(false); - } catch (Exception) { - break; - } + /// The inbound message. + private void ReplyProcess(InboundMessage msg) { + //TODO: cancel on dispose + Envelope envelope = new Envelope(msg, this); - //TODO: cancel on dispose - Envelope envelope = new Envelope(msg, this); + // check if we have an correlation + if (envelope.ID == Guid.Empty) { + // trigger event + _node.OnUnroutableReply(new UnroutableReplyEventArgs(envelope)); - // check if we have an correlation - if (envelope.ID == Guid.Empty) { - // trigger event - _node.OnUnroutableReply(new UnroutableReplyEventArgs(envelope)); + return; + } - continue; - } + // get completion source for this envelope + ReplyWait replyWait = default(ReplyWait); + bool foundReplyWait = false; - // get completion source for this envelope - ReplyWait replyWait = default(ReplyWait); - bool foundReplyWait = false; + lock (_replyWaits) { + foundReplyWait = _replyWaits.TryGetValue(envelope.ID, out replyWait); + } - lock (_replyWaits) { - foundReplyWait = _replyWaits.TryGetValue(envelope.ID, out replyWait); - } + if (!foundReplyWait) { + // log + Console.WriteLine("unroutable reply: {0}", envelope.ID); - if (!foundReplyWait) { - // log - Console.WriteLine("unroutable reply: {0}", envelope.ID); + // trigger event + _node.OnUnroutableReply(new UnroutableReplyEventArgs(envelope)); + } else { + // if it's a multi-reply add to results, if not set completion source + if (replyWait.Results == null) { + lock (_replyWaits) { + _replyWaits.Remove(envelope.ID); + } - // trigger event - _node.OnUnroutableReply(new UnroutableReplyEventArgs(envelope)); + replyWait.CompletionSource.TrySetResult(envelope); } else { - // if it's a multi-reply add to results, if not set completion source - if (replyWait.Results == null) { - lock (_replyWaits) { - _replyWaits.Remove(envelope.ID); - } - - replyWait.CompletionSource.TrySetResult(envelope); - } else { - replyWait.Results.Add(envelope); - } + replyWait.Results.Add(envelope); } } } @@ -494,6 +486,29 @@ public async Task SubscribeAsync(EventAddress addr) { } #endregion + class ReplyObserver : IObserver + { + public Namespace Namespace { get; set; } + + public void OnCompleted() { + } + + public void OnError(Exception error) { + } + + public void OnNext(InboundMessage msg) { + Namespace.ReplyProcess(msg); + } + + /// + /// Creates a new reply observer for the provided namespace. + /// + /// The namespace. + public ReplyObserver(Namespace @namespace) { + Namespace = @namespace; + } + } + #region Constructors /// /// Creates a new namespace configuration. diff --git a/src/Holon/Protocol/Broker.cs b/src/Holon/Protocol/Broker.cs index 016a0df..bc47636 100644 --- a/src/Holon/Protocol/Broker.cs +++ b/src/Holon/Protocol/Broker.cs @@ -174,7 +174,7 @@ public async Task CreateQueueAsync(string name = "", bool durable = }).ConfigureAwait(false); // create consumer - AsyncConsumer consumer = new AsyncConsumer(_channel); + ObservableConsumer consumer = new ObservableConsumer(_channel); // consume queue string consumerTag = null; diff --git a/src/Holon/Protocol/BrokerQueue.cs b/src/Holon/Protocol/BrokerQueue.cs index 5cc2cd6..f5e5a7d 100644 --- a/src/Holon/Protocol/BrokerQueue.cs +++ b/src/Holon/Protocol/BrokerQueue.cs @@ -15,7 +15,7 @@ internal sealed class BrokerQueue : IDisposable #region Fields private string _queue; private Broker _broker; - private AsyncConsumer _consumer; + private ObservableConsumer _consumer; private string _consumerTag; private List _exchanges = new List(); @@ -49,7 +49,7 @@ public string[] Exchanges { /// /// public IObservable AsObservable() { - return _consumer.AsObservable(); + return _consumer; } /// @@ -91,32 +91,6 @@ public void Unbind(string exchange, string routingKey) { }); } - /// - /// Receives a message asyncronously. - /// - /// The message. - public Task ReceiveAsync() { - return ReceiveAsync(CancellationToken.None); - } - - /// - /// Receives a message asyncronously. - /// - /// The cancellation token. - /// - public async Task ReceiveAsync(CancellationToken cancellationToken) { - return await _consumer.ReceiveAsync(cancellationToken).ConfigureAwait(false); - } - - /// - /// Receives a message asyncronously. - /// - /// The timeout. - /// - public async Task ReceiveAsync(TimeSpan timeout) { - return await _consumer.ReceiveAsync(timeout).ConfigureAwait(false); - } - /// /// Disposes /// @@ -143,7 +117,7 @@ public void Dispose() { /// The queue name. /// The consumer tag. /// The consumer. - internal BrokerQueue(Broker broker, string queue, string consumerTag, AsyncConsumer consumer) { + internal BrokerQueue(Broker broker, string queue, string consumerTag, ObservableConsumer consumer) { _broker = broker; _queue = queue; _consumer = consumer; diff --git a/src/Holon/Protocol/AsyncConsumer.cs b/src/Holon/Protocol/ObservableConsumer.cs similarity index 54% rename from src/Holon/Protocol/AsyncConsumer.cs rename to src/Holon/Protocol/ObservableConsumer.cs index a202de6..18cb00c 100644 --- a/src/Holon/Protocol/AsyncConsumer.cs +++ b/src/Holon/Protocol/ObservableConsumer.cs @@ -7,18 +7,19 @@ using System.Threading.Tasks.Dataflow; using System.Threading; using Holon.Services; +using System.Linq; namespace Holon.Protocol { /// /// Represents a basic consumer designed for use with asyncronous TPL code. /// - internal class AsyncConsumer : IBasicConsumer + internal class ObservableConsumer : IBasicConsumer, IObservable { #region Fields private IModel _channel; private int _cancelled; - private BufferBlock _mailbox = new BufferBlock(); + private List _subscriptions = new List(1); #endregion /// @@ -40,9 +41,11 @@ private void HandleCancelled(object sender, ConsumerEventArgs e) { if (Interlocked.CompareExchange(ref _cancelled, 1, 0) == 1) return; - // empty mailbox and complete - _mailbox.TryReceiveAll(out IList messages); - _mailbox.Complete(); + // call completed on all observers + lock (_subscriptions) { + foreach (SubscribedObserver observer in _subscriptions) + observer.Observer.OnCompleted(); + } // call event ConsumerCancelled?.Invoke(sender, e); @@ -84,7 +87,10 @@ public void HandleBasicConsumeOk(string consumerTag) { /// The properties. /// The body. public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, byte[] body) { - _mailbox.Post(new InboundMessage(_channel, deliveryTag, redelivered, exchange, routingKey, properties, body)); + lock(_subscriptions) { + foreach (SubscribedObserver observer in _subscriptions) + observer.Observer.OnNext(new InboundMessage(_channel, deliveryTag, redelivered, exchange, routingKey, properties, body)); + } } /// @@ -96,56 +102,78 @@ public void HandleModelShutdown(object model, ShutdownEventArgs reason) { } /// - /// Receives a message asyncronously from the consumer. - /// - /// The broker message. - public Task ReceiveAsync() { - return _mailbox.ReceiveAsync(); - } - - /// - /// Receives a message asyncronously from the consumer within the timeout. + /// Subscribes to this consumer. /// - /// The timeout. - /// The broker message. - public Task ReceiveAsync(TimeSpan timeout) { - return _mailbox.ReceiveAsync(timeout); + /// The observer. + /// + public IDisposable Subscribe(IObserver observer) { + lock (_subscriptions) { + // find the next identification + int nextId = -1; + + for (int i = 0; i < int.MaxValue; i++) { + if (_subscriptions.Any(s => s.ID == i)) + continue; + + nextId = i; + break; + } + + // check we got an ID, essentially impossible not to though + if (nextId == -1) + throw new OutOfMemoryException("No more slots available for subscribers"); + + // add subscription + SubscribedObserver subscription = new SubscribedObserver() { + ID = nextId, + Consumer = this, + Observer = observer + }; + + _subscriptions.Add(subscription); + + // return our disposer + return new SubscribedObserverDisposer(this, subscription.ID); + } } + #endregion - /// - /// Receives a message asyncronously from the consumer. - /// - /// The cancellation token. - /// The broker message. - public Task ReceiveAsync(CancellationToken cancellationToken) { - return _mailbox.ReceiveAsync(cancellationToken); - } + #region Structures + struct SubscribedObserverDisposer : IDisposable + { + public int ID { get; set; } + public ObservableConsumer Consumer { get; set; } + + /// + /// Disposes of the subscribed observer. + /// + public void Dispose() { + lock (Consumer._subscriptions) { + int thisId = ID; + Consumer._subscriptions.RemoveAll(o => o.ID == thisId); + } + } - /// - /// Receives a message asyncronously from the consumer. - /// - /// The timeout. - /// The cancellation token. - /// The broker message. - public Task ReceiveAsync(TimeSpan timeout, CancellationToken cancellationToken) { - return _mailbox.ReceiveAsync(timeout, cancellationToken); + public SubscribedObserverDisposer(ObservableConsumer consumer, int id) { + ID = id; + Consumer = consumer; + } } - /// - /// Creates an observable for this consumer. - /// - /// - public IObservable AsObservable() { - return _mailbox.AsObservable(); + struct SubscribedObserver + { + public int ID { get; set; } + public IObserver Observer { get; set; } + public ObservableConsumer Consumer { get; set; } } #endregion #region Constructors /// - /// Creates a new async consumer. + /// Creates a new observable consumer. /// /// The channel. - public AsyncConsumer(IModel channel) { + public ObservableConsumer(IModel channel) { _channel = channel; } #endregion diff --git a/src/Holon/Services/Service.cs b/src/Holon/Services/Service.cs index e526699..fb7497b 100644 --- a/src/Holon/Services/Service.cs +++ b/src/Holon/Services/Service.cs @@ -13,7 +13,7 @@ namespace Holon.Services /// /// Represents an service. /// - public sealed class Service : IDisposable + public sealed class Service : IDisposable, IObserver { #region Fields private Namespace _namespace; @@ -21,7 +21,6 @@ public sealed class Service : IDisposable private Broker _broker; private BrokerQueue _queue; private ServiceBehaviour _behaviour; - private CancellationTokenSource _loopCancel; private ServiceConfiguration _configuration; private DateTimeOffset _timeSetup; @@ -154,10 +153,6 @@ public void Dispose() { if (Interlocked.CompareExchange(ref _disposed, 1, 0) == 1) return; - // cancel loop - if (_loopCancel != null) - _loopCancel.Cancel(); - // dispose of queue _queue.Unbind(_addr.Namespace, _addr.RoutingKey); _queue.Dispose(); @@ -208,8 +203,8 @@ internal async Task SetupAsync(Broker broker) { // setup semaphore _concurrencySlim = new SemaphoreSlim(_configuration.MaxConcurrency, _configuration.MaxConcurrency); - // begin loop - ServiceLoop(); + // begin observing + _queue.AsObservable().Subscribe(this); // set uptime _timeSetup = DateTimeOffset.UtcNow; @@ -283,73 +278,58 @@ private async Task ServiceHandleAsync(Envelope envelope) { await _behaviour.HandleAsync(envelope).ConfigureAwait(false); } - /// - /// Service worker to receive messages from queue and hand to behaviour. - /// - private async void ServiceLoop() { - // assert loop not running - Debug.Assert(_loopCancel == null, "ServiceLoop already running"); + void IObserver.OnCompleted() { + throw new NotImplementedException(); + } - // create cancellation token - _loopCancel = new CancellationTokenSource(); + void IObserver.OnError(Exception error) { + throw new NotImplementedException(); + } - while (true) { - Envelope envelope = null; - InboundMessage message = null; + async void IObserver.OnNext(InboundMessage message) { + Envelope envelope = new Envelope(message, _namespace); - try { - // wait for a free request slot, this ensures ServiceConfiguration.MaxConcurrency is kept to - await _concurrencySlim.WaitAsync(_loopCancel.Token); + // wait for a free request slot, this ensures ServiceConfiguration.MaxConcurrency is kept to + await _concurrencySlim.WaitAsync().ConfigureAwait(false); - // receive a single message - message = await _queue.ReceiveAsync(_loopCancel.Token).ConfigureAwait(false); + // handle + try { + if (Execution == ServiceExecution.Serial) { + // increment pending metric + Interlocked.Increment(ref _requestsPending); - // create envelope - envelope = new Envelope(message, _namespace); - } catch(OperationCanceledException) { - return; - } + // handle the operaration + await ServiceHandleAsync(envelope).ConfigureAwait(false); - // handle - try { - if (Execution == ServiceExecution.Serial) { - // increment pending metric - Interlocked.Increment(ref _requestsPending); - - // handle the operaration - await ServiceHandleAsync(envelope).ConfigureAwait(false); - - // release semaphore - try { - _concurrencySlim.Release(); - } catch (Exception) { } - - // decrement pending metric and increment completed - Interlocked.Decrement(ref _requestsPending); - Interlocked.Increment(ref _requestsCompleted); - } else { - ServiceHandle(envelope); - continue; - } - } catch(Exception ex) { // release semaphore - _concurrencySlim.Release(); + try { + _concurrencySlim.Release(); + } catch (Exception) { } - // increment faulted metric - Interlocked.Increment(ref _requestsFaulted); - // decrement pending metric + // decrement pending metric and increment completed Interlocked.Decrement(ref _requestsPending); - - // call exception handler - OnUnhandledException(new ServiceExceptionEventArgs(_behaviour, ex)); + Interlocked.Increment(ref _requestsCompleted); + } else { + ServiceHandle(envelope); } + } catch (Exception ex) { + // release semaphore + _concurrencySlim.Release(); - // acknowledge - _broker.Context.QueueWork(() => { - _broker.Channel.BasicAck(envelope.Message.DeliveryTag, false); - return null; - }); + // increment faulted metric + Interlocked.Increment(ref _requestsFaulted); + // decrement pending metric + Interlocked.Decrement(ref _requestsPending); + + // call exception handler + OnUnhandledException(new ServiceExceptionEventArgs(_behaviour, ex)); } + + // acknowledge + _broker.Context.QueueWork(() => { + _broker.Channel.BasicAck(envelope.Message.DeliveryTag, false); + return null; + }); } #endregion