Skip to content
This repository has been archived by the owner on Sep 10, 2021. It is now read-only.

Commit

Permalink
Event improvements and a few fixes
Browse files Browse the repository at this point in the history
- Added EventSubscription.AsObservable
- Prevented possible multiple disposes on BrokerQueue
- Moved event system to 1.1
- Fixed StackOverflowException on Node.EmitAsync(string, object)
- Broker will now only use batch publish when more than one message is sent
- Moved Holon to 0.1.2
  • Loading branch information
alandoherty committed Sep 13, 2018
1 parent 360bf2e commit 9369131
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 33 deletions.
42 changes: 39 additions & 3 deletions samples/Example.General/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,36 @@ interface ITest001
Task<string> Login(LoginRequestMsg login);
}

[ProtoContract]
class NameChangedEventData
{
[ProtoMember(1)]
public string OldName { get; set; }

[ProtoMember(2)]
public string NewName { get; set; }
}

class Test001 : ITest001
{
static int si = 0;
int i = Interlocked.Increment(ref si);

public async Task<string> Login(LoginRequestMsg login) {
Console.WriteLine($"Worker Waiting {i} - Username: {login.Username} Password: {login.Password}");

await Program.TestNode.EmitAsync("user:bacon.name_change", new NameChangedEventData() {
OldName = "Bacon",
NewName = "Ham"
});
return "landlocked";
}
}

class Program
{
public static Node TestNode { get; set; }

static void Main(string[] args) => AsyncMain(args).Wait();

public static async void ReadLoop(Node node) {
Expand All @@ -70,17 +87,36 @@ public static async void ReadLoop(Node node) {
}
}

class Observer : IObserver<Event>
{
public void OnCompleted() {
}

public void OnError(Exception error) {
}

public void OnNext(Event value) {
NameChangedEventData changeData = value.Deserialize<NameChangedEventData>();

Console.WriteLine("Name changed: " + changeData.OldName + " -> " + changeData.NewName);
}
}

static async Task AsyncMain(string[] args) {
// attach node
Node node = await Node.CreateFromEnvironmentAsync(new NodeConfiguration() {
TestNode = await Node.CreateFromEnvironmentAsync(new NodeConfiguration() {
ThrowUnhandledExceptions = true
});

await node.AttachAsync("auth:test", new ServiceConfiguration() {
await TestNode.AttachAsync("auth:test", new ServiceConfiguration() {
Filters = new IServiceFilter[] { new SecureFilter(new X509Certificate2("public_privatekey.pfx"), "bacon") }
}, RpcBehaviour.Bind<ITest001>(new Test001()));

ReadLoop(node);
EventSubscription subscription = await TestNode.SubscribeAsync("user:bacon.*");

subscription.AsObservable().Subscribe(new Observer());

ReadLoop(TestNode);

await Task.Delay(50000);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Holon/Events/EventHeader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ internal class EventHeader
{
#region Constants
internal const string HEADER_NAME = "X-Event";
internal const string HEADER_VERSION = "1.0";
internal const string HEADER_VERSION = "1.1";
#endregion

#region Fields
Expand Down
57 changes: 55 additions & 2 deletions src/Holon/Events/EventSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class EventSubscription : IDisposable
private EventAddress _address;
#endregion

#region Fields
#region Properties
/// <summary>
/// Gets the event address.
/// </summary>
Expand Down Expand Up @@ -52,7 +52,7 @@ private Event ProcessMessage(InboundMessage message) {
EventHeader header = new EventHeader(Encoding.UTF8.GetString(envelope.Headers[EventHeader.HEADER_NAME] as byte[]));

// validate version
if (header.Version != "1.0")
if (header.Version != "1.1")
throw new NotSupportedException("Event version is not supported");

// find serializer
Expand All @@ -67,6 +67,14 @@ private Event ProcessMessage(InboundMessage message) {
return e;
}

/// <summary>
/// Gets the subscription as an observable target.
/// </summary>
/// <returns>The observerable.</returns>
public IObservable<Event> AsObservable() {
return new EventObservable(this);
}

/// <summary>
/// Receives an event asyncronously.
/// </summary>
Expand Down Expand Up @@ -122,6 +130,51 @@ public void Dispose() {
// dispose underlying queue
_queue.Dispose();
}

/// <summary>
/// A pass through for observing events.
/// </summary>
class EventObservable : IObservable<Event>
{
private EventSubscription _sub;
private IObservable<InboundMessage> _observable;

public IDisposable Subscribe(IObserver<Event> observer) {
return _observable.Subscribe(new EventObserver(_sub, observer));
}

/// <summary>
/// Creates an observable event producer.
/// </summary>
/// <param name="sub">The subscription.</param>
public EventObservable(EventSubscription sub) {
_sub = sub;
_observable = sub._queue.AsObservable();
}
}

class EventObserver : IObserver<InboundMessage>
{
private IObserver<Event> _observer;
private EventSubscription _sub;

public EventObserver(EventSubscription sub, IObserver<Event> observer) {
_sub = sub;
_observer = observer;
}

public void OnCompleted() {
_observer.OnCompleted();
}

public void OnError(Exception error) {
_observer.OnError(error);
}

public void OnNext(InboundMessage value) {
_observer.OnNext(_sub.ProcessMessage(value));
}
}
#endregion

#region Constructors
Expand Down
6 changes: 3 additions & 3 deletions src/Holon/Holon.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@

<PropertyGroup>
<TargetFramework>netstandard1.6</TargetFramework>
<Version>0.1.1</Version>
<Version>0.1.2</Version>
<Authors>Alan Doherty</Authors>
<Company>Alan Doherty</Company>
<Description>A minimal service and event bus with additional support for RPC</Description>
<Copyright>BattleCrate Ltd 2018</Copyright>
<PackageProjectUrl>https://github.com/alandoherty/holon-net</PackageProjectUrl>
<RepositoryUrl>https://github.com/alandoherty/holon-net</RepositoryUrl>
<RepositoryType>git</RepositoryType>
<AssemblyVersion>0.0.1.1</AssemblyVersion>
<AssemblyVersion>0.1.2.0</AssemblyVersion>
<PackageLicenseUrl>https://github.com/alandoherty/holon-net/blob/master/LICENSE</PackageLicenseUrl>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<PackageIconUrl>https://s3-eu-west-1.amazonaws.com/assets.alandoherty.co.uk/github/holon-net-nuget.png</PackageIconUrl>
<FileVersion>0.0.1.1</FileVersion>
<FileVersion>0.1.2.0</FileVersion>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
Expand Down
2 changes: 1 addition & 1 deletion src/Holon/Node.cs
Original file line number Diff line number Diff line change
Expand Up @@ -996,7 +996,7 @@ public async Task EmitAsync(EventAddress addr, object data) {
/// <exception cref="FormatException">If the event address is invalid.</exception>
/// <returns></returns>
public Task EmitAsync(string addr, object data) {
return EmitAsync(addr, data);
return EmitAsync(new EventAddress(addr), data);
}

/// <summary>
Expand Down
51 changes: 31 additions & 20 deletions src/Holon/Protocol/Broker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Xml.Serialization;
Expand Down Expand Up @@ -70,27 +71,37 @@ public Task SendAsync(params OutboundMessage[] messages) {
/// <param name="messages">The messages.</param>
/// <returns></returns>
public Task SendAsync(IEnumerable<OutboundMessage> messages) {
// create batch
IBasicPublishBatch batch = _channel.CreateBasicPublishBatch();

// add all the messages
foreach(OutboundMessage message in messages) {
IBasicProperties properties = _channel.CreateBasicProperties();

if (message.ReplyTo != null)
properties.ReplyTo = message.ReplyTo;
if (message.Headers != null)
properties.Headers = message.Headers;
if (message.ReplyID != null)
properties.CorrelationId = message.ReplyID.ToString();

batch.Add(message.Exchange, message.RoutingKey, message.Mandatory, properties, message.Body);
// get message array
OutboundMessage[] messageArr = messages.ToArray();

if (messageArr.Length == 0)
return Task.FromResult(true);
else if (messageArr.Length == 1) {
return SendAsync(messageArr[0]);
} else {
// create batch
IBasicPublishBatch batch = _channel.CreateBasicPublishBatch();

// add all the messages
foreach (OutboundMessage message in messageArr) {
IBasicProperties properties = _channel.CreateBasicProperties();

if (message.ReplyTo != null)
properties.ReplyTo = message.ReplyTo;
if (message.Headers != null)
properties.Headers = message.Headers;
if (message.ReplyID != null)
properties.CorrelationId = message.ReplyID.ToString();

batch.Add(message.Exchange, message.RoutingKey, message.Mandatory, properties, message.Body);
}

return _ctx.AskWork(delegate () {
batch.Publish();
return null;
});
}

return _ctx.AskWork(delegate () {
batch.Publish();
return null;
});

}

/// <summary>
Expand Down
6 changes: 3 additions & 3 deletions src/Holon/Protocol/BrokerQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ internal sealed class BrokerQueue : IDisposable
#region Fields
private string _queue;
private Broker _broker;
private bool _disposed;
private AsyncConsumer _consumer;
private string _consumerTag;
private List<string> _exchanges = new List<string>();

private int _disposed;
#endregion

#region Properties
Expand Down Expand Up @@ -121,9 +122,8 @@ public async Task<InboundMessage> ReceiveAsync(TimeSpan timeout) {
/// </summary>
public void Dispose() {
// prevent disposing multiple times
if (_disposed)
if (Interlocked.CompareExchange(ref _disposed, 1, 0) == 1)
return;
_disposed = true;

// delete consumer
if (_consumer != null) {
Expand Down

0 comments on commit 9369131

Please sign in to comment.