diff --git a/samples/Example.General/Program.cs b/samples/Example.General/Program.cs index 947c54f..ff33b20 100644 --- a/samples/Example.General/Program.cs +++ b/samples/Example.General/Program.cs @@ -55,29 +55,6 @@ class Program static void Main(string[] args) => AsyncMain(args).Wait(); - public static async void ReadLoop(int[] ctr, Node node, Guid[] uuids) { - Random rand = new Random(); - - while (true) { - try { - int i = rand.Next(0, uuids.Length); - Guid uuid = uuids[i]; - ITest001 proxy = node.Proxy($"auth:{uuid}"); - - string s = await proxy.Login(new LoginRequestMsg() { - Password = "wow", - Username = "alan" - }).ConfigureAwait(false); - - Interlocked.Increment(ref ctr[0]); - - //Console.WriteLine($"String: {s}"); - } catch(Exception ex) { - Console.WriteLine(ex.ToString()); - } - } - } - class EventTest { public string Potato { get; set; } @@ -102,37 +79,18 @@ static async Task AsyncMain(string[] args) { ThrowUnhandledExceptions = true }); - // attach services - Guid[] uuids = new Guid[500]; - List tasks = new List(); - - Stopwatch stopwatch = new Stopwatch(); - stopwatch.Start(); - - for (int i = 0; i < uuids.Length; i++) { - uuids[i] = Guid.NewGuid(); + // subscribe + Guid g = Guid.NewGuid(); + EventSubscription subscription = await TestNode.SubscribeAsync($"device:{g}.*"); - tasks.Add(TestNode.AttachAsync($"auth:{uuids[i]}", RpcBehaviour.Bind(new Test001(uuids[i])))); - } - - await Task.WhenAll(tasks).ConfigureAwait(false); - - Console.WriteLine($"Attached {uuids.Length} services in {stopwatch.ElapsedMilliseconds}ms"); - - int[] ctr = new int[] { 0 }; - int pavg = 0; - for (int i = 0; i < 32; i++) - ReadLoop(ctr, TestNode, uuids); + subscription.AsObservable().Subscribe(new EventObserver()); - while(true) { - Console.WriteLine($"Logging in at {ctr[0]}/s avg ({pavg}/s), ({Process.GetCurrentProcess().Threads.Count} threads)"); - - pavg += ctr[0]; - pavg = pavg / 2; - ctr[0] = 0; - - await Task.Delay(1000).ConfigureAwait(false); + while (true) { + await TestNode.EmitAsync($"device:{g}.test", new EventTest() { + Potato = "wow" + }); + await Task.Delay(500); } await Task.Delay(50000); diff --git a/src/Holon/Holon.csproj b/src/Holon/Holon.csproj index 5206afe..63e5bf9 100644 --- a/src/Holon/Holon.csproj +++ b/src/Holon/Holon.csproj @@ -2,7 +2,7 @@ netstandard1.6 - 0.1.4.1 + 0.1.5 Alan Doherty Alan Doherty A minimal service and event bus with additional support for RPC @@ -10,11 +10,11 @@ https://github.com/alandoherty/holon-net https://github.com/alandoherty/holon-net git - 0.1.4.1 + 0.1.5.0 https://github.com/alandoherty/holon-net/blob/master/LICENSE true https://s3-eu-west-1.amazonaws.com/assets.alandoherty.co.uk/github/holon-net-nuget.png - 0.1.4.1 + 0.1.5.0 diff --git a/src/Holon/Namespace.cs b/src/Holon/Namespace.cs index 29a6017..07b320c 100644 --- a/src/Holon/Namespace.cs +++ b/src/Holon/Namespace.cs @@ -455,7 +455,7 @@ public async Task EmitAsync(EventAddress addr, object data) { // send event try { - await _broker.SendAsync(string.Format("!{0}", addr.Namespace), addr.Name, body, new Dictionary() { + await _broker.SendAsync(string.Format("!{0}", addr.Namespace), $"{addr.Resource}.{addr.Name}", body, new Dictionary() { { EventHeader.HEADER_NAME, new EventHeader(EventHeader.HEADER_VERSION, serializer.Name).ToString() } }, null, null, false).ConfigureAwait(false); } catch (Exception) { }