diff --git a/README.md b/README.md index a908524..21b2470 100644 --- a/README.md +++ b/README.md @@ -50,6 +50,14 @@ For the `Received` event, the event handler will always be called when no `Recei Finally if all other handlers have been called without conflict, any `IObserver` subscriptions will be called. +### Coders + +In the newer version of ProtoSocket, coders are implemented using `System.IO.Pipelines`. The same high-performance library powering ASP.NET Kestrel. + +You can find a great tutorial on the .NET Blog [here](https://blogs.msdn.microsoft.com/dotnet/2018/07/09/system-io-pipelines-high-performance-io-in-net/). Examples are available inside the repository, [ChatCoder.cs](samples/Example.Chat/ChatCoder.cs) and [ClassicCoder.cs](samples/Example.Minecraft/Net/ClassicCoder.cs). + +Your implementation simply needs to call `PipeReader.TryRead`, processing as much data as possible and either returning a frame (and true), or false to indicate you haven't got a full frame yet. The underlying peer will continually call your read implementation until you are able to output no more frames. + ### Queueing In many scenarios creating an asyncronous operation and waiting for every packet to be sent is not ideal, for these use cases you can use the `ProtocolPeer.Queue` and `ProtocolPeer.QueueAsync` methods. @@ -67,13 +75,25 @@ Task message3 = peer.QueueAsync(new ChatMessage() { Text = "I like eBooks too" } await Task.WhenAll(message1, message2, message3); ``` -### Coders +### Filters -In the newer version of ProtoSocket, coders are implemented using `System.IO.Pipelines`. The same high-performance library powering ASP.NET Kestrel. +You can selectively decline incoming connections by adding a filter to the `ProtocolServer` object. If filtered, the connection will not be added to the server and the socket will be closed instantly. -You can find a great tutorial on the .NET Blog [here](https://blogs.msdn.microsoft.com/dotnet/2018/07/09/system-io-pipelines-high-performance-io-in-net/). Examples are available inside the repository, [ChatCoder.cs](samples/Example.Chat/ChatCoder.cs) and [ClassicCoder.cs](samples/Example.Minecraft/Net/ClassicCoder.cs). +A filter can be created manually by implementing `IConnectionFilter`, or you can use the premade classes `AsyncConnectionFilter`/`ConnectionFilter` which accept a delegate as their constructor. -Your implementation simply needs to call `PipeReader.TryRead`, processing as much data as possible and either returning a frame (and true), or false to indicate you haven't got a full frame yet. The underlying peer will continually call your read implementation until you are able to output no more frames. +```csharp +server.Filter = new ConnectionFilter(ctx => ((IPEndPoint)ctx.RemoteEndPoint).Address != IPAddress.Parse("192.168.0.2")); +``` + +You can optionally use the asyncronous filter, which will allow you to accept other connections in the background while processing your filter. + +```csharp +server.Filter = new AsyncConnectionFilter(async (ctx, ct) => { + await Task.Delay(3000); + return false; +}); + +``` ## Contributing diff --git a/samples/Example.Chat/ChatCoder.cs b/samples/Example.Chat/ChatCoder.cs index 7c31ebf..292a940 100644 --- a/samples/Example.Chat/ChatCoder.cs +++ b/samples/Example.Chat/ChatCoder.cs @@ -8,7 +8,6 @@ using System.Threading; using System.Threading.Tasks; using ProtoSocket; -using ProtoSocket.Extensions; namespace Example.Chat { diff --git a/samples/Example.Line/Program.cs b/samples/Example.Line/Program.cs index 5b56c9f..c60c93f 100644 --- a/samples/Example.Line/Program.cs +++ b/samples/Example.Line/Program.cs @@ -1,4 +1,5 @@ -using System; +using ProtoSocket.Filters; +using System; using System.Threading; using System.Threading.Tasks; @@ -23,7 +24,7 @@ static async Task Main(string[] args) { LineServer server = new LineServer(); server.Configure("tcp://0.0.0.0:6060"); server.Start(); - + server.Connected += async (o, e) => { e.Peer.Subscribe(new ConsoleObserver()); }; diff --git a/samples/Example.Minecraft/Net/ClassicCoder.cs b/samples/Example.Minecraft/Net/ClassicCoder.cs index 4b51c2c..ec0e3b1 100644 --- a/samples/Example.Minecraft/Net/ClassicCoder.cs +++ b/samples/Example.Minecraft/Net/ClassicCoder.cs @@ -9,7 +9,6 @@ using System.Threading.Tasks; using Example.Minecraft.Net.Packets; using ProtoSocket; -using ProtoSocket.Extensions; namespace Example.Minecraft.Net { diff --git a/src/ProtoSocket/Extensions/StreamExtensions.cs b/src/ProtoSocket/Extensions/StreamExtensions.cs deleted file mode 100644 index ae95e83..0000000 --- a/src/ProtoSocket/Extensions/StreamExtensions.cs +++ /dev/null @@ -1,41 +0,0 @@ -using System; -using System.Collections.Generic; -using System.IO; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace ProtoSocket.Extensions -{ - /// - /// Provides extensions to the stream class. - /// - public static class StreamExtensions - { - #region Extensions - /// - /// Reads a block of data asyncronously, unlike you are guarenteed to get all of the data. If you get less than the requested count the end of stream has been reached. - /// - /// The stream. - /// The buffer. - /// The offset. - /// The count. - /// The cancellation token. - /// The number of bytes read. - public static async Task ReadBlockAsync(this Stream stream, byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - int headerTotal = 0; - - while (headerTotal != buffer.Length) { - int headerRead = await stream.ReadAsync(buffer, headerTotal, buffer.Length - headerTotal, cancellationToken).ConfigureAwait(false); - - if (headerRead == 0) - return headerTotal; - else - headerTotal += headerRead; - } - - return headerTotal; - } - #endregion - } -} diff --git a/src/ProtoSocket/Filters/AsyncConnectionFilter.cs b/src/ProtoSocket/Filters/AsyncConnectionFilter.cs new file mode 100644 index 0000000..8bb92f8 --- /dev/null +++ b/src/ProtoSocket/Filters/AsyncConnectionFilter.cs @@ -0,0 +1,39 @@ +using System; +using System.Collections.Generic; +using System.Net; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace ProtoSocket.Filters +{ + /// + /// Provides an asyncronous connection filter implementation + /// + public class AsyncConnectionFilter : IConnectionFilter + { + private Func> _delegate; + + bool IConnectionFilter.IsAsynchronous { + get { + return true; + } + } + + bool IConnectionFilter.Filter(IncomingContext incomingCtx) { + throw new InvalidOperationException(); + } + + Task IConnectionFilter.FilterAsync(IncomingContext incomingCtx, CancellationToken cancellationToken) { + return _delegate(incomingCtx, cancellationToken); + } + + /// + /// Creates a new connection filter with the specific delegate. + /// + /// The function to filter connections. + public AsyncConnectionFilter(Func> func) { + _delegate = func; + } + } +} diff --git a/src/ProtoSocket/Filters/ConnectionFilter.cs b/src/ProtoSocket/Filters/ConnectionFilter.cs new file mode 100644 index 0000000..7577245 --- /dev/null +++ b/src/ProtoSocket/Filters/ConnectionFilter.cs @@ -0,0 +1,38 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace ProtoSocket.Filters +{ + /// + /// Provides an synchronous connection filter implementation + /// + public class ConnectionFilter : IConnectionFilter + { + private Func _delegate; + + bool IConnectionFilter.IsAsynchronous { + get { + return false; + } + } + + bool IConnectionFilter.Filter(IncomingContext incomingCtx) { + return _delegate(incomingCtx); + } + + Task IConnectionFilter.FilterAsync(IncomingContext incomingCtx, CancellationToken cancellationToken) { + throw new InvalidOperationException(); + } + + /// + /// Creates a new connection filter with the specific delegate. + /// + /// The function to filter connections. + public ConnectionFilter(Func func) { + _delegate = func; + } + } +} diff --git a/src/ProtoSocket/IAsyncConnectionFilter.cs b/src/ProtoSocket/IAsyncConnectionFilter.cs deleted file mode 100644 index afe0f88..0000000 --- a/src/ProtoSocket/IAsyncConnectionFilter.cs +++ /dev/null @@ -1,23 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Net.Sockets; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace ProtoSocket -{ - /// - /// Represents a connection filter which should be ran asyncronously. - /// - public interface IAsyncConnectionFilter : IConnectionFilter - { - /// - /// Filter the incoming connection. - /// - /// The client. - /// The cancellation token. - /// The task to determine if to accept the connection. - Task FilterAsync(TcpClient client, CancellationToken cancellation); - } -} diff --git a/src/ProtoSocket/IConnectionFilter.cs b/src/ProtoSocket/IConnectionFilter.cs index 0edddd0..ddd1b9b 100644 --- a/src/ProtoSocket/IConnectionFilter.cs +++ b/src/ProtoSocket/IConnectionFilter.cs @@ -1,7 +1,10 @@ using System; using System.Collections.Generic; +using System.Net; using System.Net.Sockets; using System.Text; +using System.Threading; +using System.Threading.Tasks; namespace ProtoSocket { @@ -11,10 +14,25 @@ namespace ProtoSocket public interface IConnectionFilter { /// - /// Filter the incoming connection, this operation must not block. + /// Gets if this filter is asynchronous. /// - /// The client. + bool IsAsynchronous { get; } + + /// + /// Filter the incoming connection, blocking this operation will prevent additional clients from being accepted. + /// + /// The incoming connect context. + /// This will be called if returns false. /// If to accept the connection. - bool Filter(TcpClient client); + bool Filter(IncomingContext incomingCtx); + + /// + /// Filter the incoming connection. + /// + /// The incoming connect context. + /// The cancellation token. + /// This will be called if returns true. + /// The task to determine if to accept the connection. + Task FilterAsync(IncomingContext incomingCtx, CancellationToken cancellationToken = default); } } diff --git a/src/ProtoSocket/IProtocolServer.cs b/src/ProtoSocket/IProtocolServer.cs index e10a504..43b4cce 100644 --- a/src/ProtoSocket/IProtocolServer.cs +++ b/src/ProtoSocket/IProtocolServer.cs @@ -12,7 +12,7 @@ public interface IProtocolServer : IDisposable /// /// Gets or sets the connection filter, if any. /// - IConnectionFilter ConnectionFilter { get; set; } + IConnectionFilter Filter { get; set; } /// /// Gets the number of connections. diff --git a/src/ProtoSocket/IncomingContext.cs b/src/ProtoSocket/IncomingContext.cs new file mode 100644 index 0000000..e54ddfc --- /dev/null +++ b/src/ProtoSocket/IncomingContext.cs @@ -0,0 +1,31 @@ +using System; +using System.Collections.Generic; +using System.Net; +using System.Text; + +namespace ProtoSocket +{ + /// + /// Represents an incoming connection context. + /// + public struct IncomingContext + { + /// + /// Gets the network endpoint. + /// + public EndPoint RemoteEndPoint { get; internal set; } + + /// + /// Gets the server. + /// + public IProtocolServer Server { get; internal set; } + + /// + /// Gets the string representation + /// + /// + public override string ToString() { + return RemoteEndPoint.ToString(); + } + } +} diff --git a/src/ProtoSocket/ProtoSocket.csproj b/src/ProtoSocket/ProtoSocket.csproj index 7b1cb06..2980ecf 100644 --- a/src/ProtoSocket/ProtoSocket.csproj +++ b/src/ProtoSocket/ProtoSocket.csproj @@ -4,7 +4,7 @@ netstandard1.3 Alan Doherty & WIFIPLUG Ltd Alan Doherty - 0.4.0 + 0.4.1 true A networking library for frame-based, performant asynchronous sockets on .NET Core Alan Doherty 2018 @@ -31,4 +31,8 @@ + + + + diff --git a/src/ProtoSocket/ProtocolServer.cs b/src/ProtoSocket/ProtocolServer.cs index a66baa5..d87e16e 100644 --- a/src/ProtoSocket/ProtocolServer.cs +++ b/src/ProtoSocket/ProtocolServer.cs @@ -36,7 +36,7 @@ public class ProtocolServer : IDisposable, IProtocolServer /// /// Gets or sets the connection filter, if any. /// - public IConnectionFilter ConnectionFilter { + public IConnectionFilter Filter { get { return _filter; } set { @@ -150,14 +150,16 @@ private async void AcceptNext(TcpClient client) { // filter if (_filter != null) { bool allow = false; + IncomingContext incomingCtx = new IncomingContext() { + Server = this, + RemoteEndPoint = client.Client.RemoteEndPoint + }; try { - if (_filter is IAsyncConnectionFilter) - allow = await (_filter as IAsyncConnectionFilter).FilterAsync(client, _stopSource.Token).ConfigureAwait(false); - else if (_filter is IConnectionFilter) - allow = _filter.Filter(client); + if (_filter.IsAsynchronous) + allow = await _filter.FilterAsync(incomingCtx, _stopSource.Token).ConfigureAwait(false); else - throw new NotSupportedException("The connection filter is not supported"); + allow = _filter.Filter(incomingCtx); } catch (OperationCanceledException) { return; }