Skip to content

Commit

Permalink
Filtering improvements
Browse files Browse the repository at this point in the history
- Renamed ProtocolServer.ConnectionFilter to ProtocolServer.Filter
- Removed unused extension file
- Cleaned up connection filters
- Moved to 0.4.1
  • Loading branch information
alandoherty committed Jan 25, 2019
1 parent a25a281 commit ac1d9c2
Show file tree
Hide file tree
Showing 13 changed files with 170 additions and 83 deletions.
28 changes: 24 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ For the `Received` event, the event handler will always be called when no `Recei

Finally if all other handlers have been called without conflict, any `IObserver<TFrame>` subscriptions will be called.

### Coders

In the newer version of ProtoSocket, coders are implemented using `System.IO.Pipelines`. The same high-performance library powering ASP.NET Kestrel.

You can find a great tutorial on the .NET Blog [here](https://blogs.msdn.microsoft.com/dotnet/2018/07/09/system-io-pipelines-high-performance-io-in-net/). Examples are available inside the repository, [ChatCoder.cs](samples/Example.Chat/ChatCoder.cs) and [ClassicCoder.cs](samples/Example.Minecraft/Net/ClassicCoder.cs).

Your implementation simply needs to call `PipeReader.TryRead`, processing as much data as possible and either returning a frame (and true), or false to indicate you haven't got a full frame yet. The underlying peer will continually call your read implementation until you are able to output no more frames.

### Queueing

In many scenarios creating an asyncronous operation and waiting for every packet to be sent is not ideal, for these use cases you can use the `ProtocolPeer.Queue` and `ProtocolPeer.QueueAsync` methods.
Expand All @@ -67,13 +75,25 @@ Task message3 = peer.QueueAsync(new ChatMessage() { Text = "I like eBooks too" }
await Task.WhenAll(message1, message2, message3);
```

### Coders
### Filters

In the newer version of ProtoSocket, coders are implemented using `System.IO.Pipelines`. The same high-performance library powering ASP.NET Kestrel.
You can selectively decline incoming connections by adding a filter to the `ProtocolServer` object. If filtered, the connection will not be added to the server and the socket will be closed instantly.

You can find a great tutorial on the .NET Blog [here](https://blogs.msdn.microsoft.com/dotnet/2018/07/09/system-io-pipelines-high-performance-io-in-net/). Examples are available inside the repository, [ChatCoder.cs](samples/Example.Chat/ChatCoder.cs) and [ClassicCoder.cs](samples/Example.Minecraft/Net/ClassicCoder.cs).
A filter can be created manually by implementing `IConnectionFilter`, or you can use the premade classes `AsyncConnectionFilter`/`ConnectionFilter` which accept a delegate as their constructor.

Your implementation simply needs to call `PipeReader.TryRead`, processing as much data as possible and either returning a frame (and true), or false to indicate you haven't got a full frame yet. The underlying peer will continually call your read implementation until you are able to output no more frames.
```csharp
server.Filter = new ConnectionFilter(ctx => ((IPEndPoint)ctx.RemoteEndPoint).Address != IPAddress.Parse("192.168.0.2"));
```

You can optionally use the asyncronous filter, which will allow you to accept other connections in the background while processing your filter.

```csharp
server.Filter = new AsyncConnectionFilter(async (ctx, ct) => {
await Task.Delay(3000);
return false;
});

```

## Contributing

Expand Down
1 change: 0 additions & 1 deletion samples/Example.Chat/ChatCoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
using System.Threading;
using System.Threading.Tasks;
using ProtoSocket;
using ProtoSocket.Extensions;

namespace Example.Chat
{
Expand Down
5 changes: 3 additions & 2 deletions samples/Example.Line/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using ProtoSocket.Filters;
using System;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -23,7 +24,7 @@ static async Task Main(string[] args) {
LineServer server = new LineServer();
server.Configure("tcp://0.0.0.0:6060");
server.Start();

server.Connected += async (o, e) => {
e.Peer.Subscribe(new ConsoleObserver());
};
Expand Down
1 change: 0 additions & 1 deletion samples/Example.Minecraft/Net/ClassicCoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
using System.Threading.Tasks;
using Example.Minecraft.Net.Packets;
using ProtoSocket;
using ProtoSocket.Extensions;

namespace Example.Minecraft.Net
{
Expand Down
41 changes: 0 additions & 41 deletions src/ProtoSocket/Extensions/StreamExtensions.cs

This file was deleted.

39 changes: 39 additions & 0 deletions src/ProtoSocket/Filters/AsyncConnectionFilter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using System;
using System.Collections.Generic;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ProtoSocket.Filters
{
/// <summary>
/// Provides an asyncronous connection filter implementation
/// </summary>
public class AsyncConnectionFilter : IConnectionFilter
{
private Func<IncomingContext, CancellationToken, Task<bool>> _delegate;

bool IConnectionFilter.IsAsynchronous {
get {
return true;
}
}

bool IConnectionFilter.Filter(IncomingContext incomingCtx) {
throw new InvalidOperationException();
}

Task<bool> IConnectionFilter.FilterAsync(IncomingContext incomingCtx, CancellationToken cancellationToken) {
return _delegate(incomingCtx, cancellationToken);
}

/// <summary>
/// Creates a new connection filter with the specific delegate.
/// </summary>
/// <param name="func">The function to filter connections.</param>
public AsyncConnectionFilter(Func<IncomingContext, CancellationToken, Task<bool>> func) {
_delegate = func;
}
}
}
38 changes: 38 additions & 0 deletions src/ProtoSocket/Filters/ConnectionFilter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ProtoSocket.Filters
{
/// <summary>
/// Provides an synchronous connection filter implementation
/// </summary>
public class ConnectionFilter : IConnectionFilter
{
private Func<IncomingContext, bool> _delegate;

bool IConnectionFilter.IsAsynchronous {
get {
return false;
}
}

bool IConnectionFilter.Filter(IncomingContext incomingCtx) {
return _delegate(incomingCtx);
}

Task<bool> IConnectionFilter.FilterAsync(IncomingContext incomingCtx, CancellationToken cancellationToken) {
throw new InvalidOperationException();
}

/// <summary>
/// Creates a new connection filter with the specific delegate.
/// </summary>
/// <param name="func">The function to filter connections.</param>
public ConnectionFilter(Func<IncomingContext, bool> func) {
_delegate = func;
}
}
}
23 changes: 0 additions & 23 deletions src/ProtoSocket/IAsyncConnectionFilter.cs

This file was deleted.

24 changes: 21 additions & 3 deletions src/ProtoSocket/IConnectionFilter.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ProtoSocket
{
Expand All @@ -11,10 +14,25 @@ namespace ProtoSocket
public interface IConnectionFilter
{
/// <summary>
/// Filter the incoming connection, this operation must not block.
/// Gets if this filter is asynchronous.
/// </summary>
/// <param name="client">The client.</param>
bool IsAsynchronous { get; }

/// <summary>
/// Filter the incoming connection, blocking this operation will prevent additional clients from being accepted.
/// </summary>
/// <param name="incomingCtx">The incoming connect context.</param>
/// <remarks>This will be called if <see cref="IsAsynchronous"/> returns false.</remarks>
/// <returns>If to accept the connection.</returns>
bool Filter(TcpClient client);
bool Filter(IncomingContext incomingCtx);

/// <summary>
/// Filter the incoming connection.
/// </summary>
/// <param name="incomingCtx">The incoming connect context.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <remarks>This will be called if <see cref="IsAsynchronous"/> returns true.</remarks>
/// <returns>The task to determine if to accept the connection.</returns>
Task<bool> FilterAsync(IncomingContext incomingCtx, CancellationToken cancellationToken = default);
}
}
2 changes: 1 addition & 1 deletion src/ProtoSocket/IProtocolServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public interface IProtocolServer : IDisposable
/// <summary>
/// Gets or sets the connection filter, if any.
/// </summary>
IConnectionFilter ConnectionFilter { get; set; }
IConnectionFilter Filter { get; set; }

/// <summary>
/// Gets the number of connections.
Expand Down
31 changes: 31 additions & 0 deletions src/ProtoSocket/IncomingContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using System;
using System.Collections.Generic;
using System.Net;
using System.Text;

namespace ProtoSocket
{
/// <summary>
/// Represents an incoming connection context.
/// </summary>
public struct IncomingContext
{
/// <summary>
/// Gets the network endpoint.
/// </summary>
public EndPoint RemoteEndPoint { get; internal set; }

/// <summary>
/// Gets the server.
/// </summary>
public IProtocolServer Server { get; internal set; }

/// <summary>
/// Gets the string representation
/// </summary>
/// <returns></returns>
public override string ToString() {
return RemoteEndPoint.ToString();
}
}
}
6 changes: 5 additions & 1 deletion src/ProtoSocket/ProtoSocket.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<TargetFramework>netstandard1.3</TargetFramework>
<Authors>Alan Doherty &amp; WIFIPLUG Ltd</Authors>
<Company>Alan Doherty</Company>
<Version>0.4.0</Version>
<Version>0.4.1</Version>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<Description>A networking library for frame-based, performant asynchronous sockets on .NET Core</Description>
<Copyright>Alan Doherty 2018</Copyright>
Expand All @@ -31,4 +31,8 @@
<PackageReference Include="System.Net.Security" Version="4.3.2" />
</ItemGroup>

<ItemGroup>
<Folder Include="Extensions\" />
</ItemGroup>

</Project>
14 changes: 8 additions & 6 deletions src/ProtoSocket/ProtocolServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class ProtocolServer<TConnection, TFrame> : IDisposable, IProtocolServer
/// <summary>
/// Gets or sets the connection filter, if any.
/// </summary>
public IConnectionFilter ConnectionFilter {
public IConnectionFilter Filter {
get {
return _filter;
} set {
Expand Down Expand Up @@ -150,14 +150,16 @@ private async void AcceptNext(TcpClient client) {
// filter
if (_filter != null) {
bool allow = false;
IncomingContext incomingCtx = new IncomingContext() {
Server = this,
RemoteEndPoint = client.Client.RemoteEndPoint
};

try {
if (_filter is IAsyncConnectionFilter)
allow = await (_filter as IAsyncConnectionFilter).FilterAsync(client, _stopSource.Token).ConfigureAwait(false);
else if (_filter is IConnectionFilter)
allow = _filter.Filter(client);
if (_filter.IsAsynchronous)
allow = await _filter.FilterAsync(incomingCtx, _stopSource.Token).ConfigureAwait(false);
else
throw new NotSupportedException("The connection filter is not supported");
allow = _filter.Filter(incomingCtx);
} catch (OperationCanceledException) {
return;
}
Expand Down

0 comments on commit ac1d9c2

Please sign in to comment.