Skip to content

Commit

Permalink
Added subscription flush option
Browse files Browse the repository at this point in the history
- Moved to 0.5.2
- In some cases you may call Subscribe on an active peer after frames have been received, for simplicitly you can now use the SubscriptionOptions.Flush option to cause any frames in the queue to be dequeued and sent to the observer
  • Loading branch information
alandoherty committed May 29, 2019
1 parent 8f20d5b commit a7f3099
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 16 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

# protosocket

A networking library for frame-based, performant asynchronous TCP sockets on .NET Core. Open permissive MIT license and requires a minimum of .NET Standard 1.3.
A networking library for frame-based, performant asynchronous TCP sockets on .NET Core. Open permissive MIT license and requires a minimum of .NET Standard 1.3. Battle-tested and ready for production use.

## Getting Started

Expand Down Expand Up @@ -60,7 +60,7 @@ Your implementation simply needs to call `PipeReader.TryRead`, processing as muc

### Protocol Modes

By default all peers are `ProtocolMode.Active`, this means the library will take care of reading as many frames from the opposing peer as possible. Calling your handlers/subscribers as necessary when a frame arrives.
By default all peers are `ProtocolMode.Active`, this means the library will take care of reading as many frames from the opposing peer as possible. Calling your handlers/subscribers as necessary when a frame arrives. Note that if your peer is configured as `ProtocolMode.Active`, you should use the `SubscriptionOptions.Flush` option when using subscribers so that any frames in the queue are automatically sent to the subscribers.

In some cases this behaviour can cause problems. For example, if you have a complex negotiation process that involves upgrading the underlying protocol or modifying the frame structure, you don't want the library reading constantly and interpreting the data incorrectly.

Expand Down
6 changes: 3 additions & 3 deletions src/ProtoSocket/ProtoSocket.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<TargetFramework>netstandard1.3</TargetFramework>
<Authors>Alan Doherty &amp; WIFIPLUG Ltd</Authors>
<Company>Alan Doherty</Company>
<Version>0.5.1</Version>
<Version>0.5.2</Version>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<Description>A networking library for frame-based, performant asynchronous sockets on .NET Core</Description>
<Copyright>Alan Doherty 2018</Copyright>
Expand All @@ -13,8 +13,8 @@
<PackageIconUrl>https://s3-eu-west-1.amazonaws.com/assets.alandoherty.co.uk/github/protosocket-net-nuget.png</PackageIconUrl>
<RepositoryUrl></RepositoryUrl>
<RepositoryType>git</RepositoryType>
<AssemblyVersion>0.5.1.0</AssemblyVersion>
<FileVersion>0.5.1.0</FileVersion>
<AssemblyVersion>0.5.2.0</AssemblyVersion>
<FileVersion>0.5.2.0</FileVersion>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'">
Expand Down
36 changes: 25 additions & 11 deletions src/ProtoSocket/ProtocolPeer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1269,34 +1269,48 @@ private bool Notify(TFrame frame) {
/// Subscribes the observer to this peer.
/// </summary>
/// <param name="observer">The observer.</param>
/// <remarks>Depending on the peer configuration, you may need to pass <see cref="SubscriptionOptions.Flush"/> to get notifications for any frames which were received before adding the subscription.</remarks>
/// <returns>A disposable wrapper for the subscription.</returns>
public IDisposable Subscribe(IObserver<TFrame> observer) {
Subscription subscription = null;

// add subscription
lock (_subscriptions) {
subscription = new Subscription(this, observer, null);
_subscriptions.Add(subscription);
}
return Subscribe(observer, null, SubscriptionOptions.None);
}

return subscription;
/// <summary>
/// Subscribes the observer to this peer.
/// </summary>
/// <param name="observer">The observer.</param>
/// <param name="options">The options.</param>
/// <remarks>Depending on the peer configuration, you may need to pass <see cref="SubscriptionOptions.Flush"/> to get notifications for any frames which were received before adding the subscription.</remarks>
/// <returns>A disposable wrapper for the subscription.</returns>
public IDisposable Subscribe(IObserver<TFrame> observer, SubscriptionOptions options = SubscriptionOptions.None) {
return Subscribe(observer, null, options);
}

/// <summary>
/// Subscribes the observer to this peer.
/// </summary>
/// <param name="observer">The observer.</param>
/// <param name="predicate">The predicate to match for frames.</param>
/// <param name="filter">The predicate filter.</param>
/// <param name="options">The options.</param>
/// <remarks>Depending on the peer configuration, you may need to pass <see cref="SubscriptionOptions.Flush"/> to get notifications for any frames which were received before adding the subscription.</remarks>
/// <returns>A disposable wrapper for the subscription.</returns>
public IDisposable Subscribe(IObserver<TFrame> observer, Predicate<TFrame> predicate) {
public IDisposable Subscribe(IObserver<TFrame> observer, Predicate<TFrame> filter, SubscriptionOptions options = SubscriptionOptions.None) {
Subscription subscription = null;

// add subscription
lock (_subscriptions) {
subscription = new Subscription(this, observer, predicate);
subscription = new Subscription(this, observer, filter);
_subscriptions.Add(subscription);
}

// check if flush requested
if (options.HasFlag(SubscriptionOptions.Flush)) {
while (TryReceive(out TFrame frame)) {
if (subscription.Predicate == null || subscription.Predicate(frame))
subscription.Observer.OnNext(frame);
}
}

return subscription;
}
#endregion
Expand Down
23 changes: 23 additions & 0 deletions src/ProtoSocket/SubscriptionOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace ProtoSocket
{
/// <summary>
/// Defines the subscription options.
/// </summary>
[Flags]
public enum SubscriptionOptions
{
/// <summary>
/// No extra options.
/// </summary>
None = 0,

/// <summary>
/// Flushes any frames in the receive queue and calls <see cref="IObserver{T}.OnNext(T)"/>.
/// </summary>
Flush = 1
}
}

0 comments on commit a7f3099

Please sign in to comment.