Skip to content

Commit

Permalink
Various fixes and minor changes
Browse files Browse the repository at this point in the history
- Moved to 0.4.0
- Added Example.Line
- Renamed ProtocolPeer.SendAsync(CancellationToken) to ProtocolPeer.FlushAsync(CancellationToken)
- Cleared up end of stream logic
- Fixed bug which would prevent peers fully disconnecting if a observer was subscribed
  • Loading branch information
alandoherty committed Jan 23, 2019
1 parent 5324dc7 commit a25a281
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 24 deletions.
11 changes: 9 additions & 2 deletions ProtoSocket.sln
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.27703.2035
# Visual Studio Version 16
VisualStudioVersion = 16.0.28407.52
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ProtoSocket", "src\ProtoSocket\ProtoSocket.csproj", "{BBCD03F1-F9A0-4C4C-93C1-8B156C4F8231}"
EndProject
Expand All @@ -16,6 +16,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Example.Chat", "samples\Exa
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Example.Minecraft", "samples\Example.Minecraft\Example.Minecraft.csproj", "{D9E913D0-B075-489A-96AE-7389C86C790E}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.Line", "samples\Example.Line\Example.Line.csproj", "{5C1CEDDA-43A7-4501-BF28-E4A8BC142574}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -34,13 +36,18 @@ Global
{D9E913D0-B075-489A-96AE-7389C86C790E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D9E913D0-B075-489A-96AE-7389C86C790E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D9E913D0-B075-489A-96AE-7389C86C790E}.Release|Any CPU.Build.0 = Release|Any CPU
{5C1CEDDA-43A7-4501-BF28-E4A8BC142574}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5C1CEDDA-43A7-4501-BF28-E4A8BC142574}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5C1CEDDA-43A7-4501-BF28-E4A8BC142574}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5C1CEDDA-43A7-4501-BF28-E4A8BC142574}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{3244FC3C-5703-4B56-A754-CC6D7DB87039} = {232DEEE3-098B-46F0-803A-591781CEB430}
{D9E913D0-B075-489A-96AE-7389C86C790E} = {232DEEE3-098B-46F0-803A-591781CEB430}
{5C1CEDDA-43A7-4501-BF28-E4A8BC142574} = {232DEEE3-098B-46F0-803A-591781CEB430}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {056A9443-6614-4870-A4BA-EE86AF7BF8C8}
Expand Down
6 changes: 2 additions & 4 deletions samples/Example.Chat/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ class Program
{
static void Main(string[] args)
{
Task.WhenAny(RunServerAsync(args),
RunClientAsync(new string[] { "", "Kevin Bacon" })).Wait();

if (args.Length == 0)
Console.Error.WriteLine("Example.Chat.exe <server|client> [name]");
else if (args[0].Equals("server", StringComparison.CurrentCultureIgnoreCase))
Expand All @@ -25,7 +22,7 @@ static void Main(string[] args)

static Task RunServerAsync(string[] args) {
ChatServer server = new ChatServer();
server.Configure(new Uri("tcp://127.0.0.1:6060"));
server.Configure(new Uri("tcp://0.0.0.0:6060"));

// setup events
server.Connected += async (s, e) => {
Expand All @@ -43,6 +40,7 @@ static Task RunServerAsync(string[] args) {
} catch (Exception) { }
}


e.Peer.Received += async (ss, ee) => {
if (ee.Frame.Text == "/list") {
string[] ips = server.Connections.Select(c => "\t" + c.RemoteEndPoint.ToString()).ToArray();
Expand Down
13 changes: 13 additions & 0 deletions samples/Example.Line/Example.Line.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.2</TargetFramework>
<LangVersion>7.2</LangVersion>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\ProtoSocket\ProtoSocket.csproj" />
</ItemGroup>

</Project>
44 changes: 44 additions & 0 deletions samples/Example.Line/LineCoder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
using ProtoSocket;
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipelines;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Example.Line
{
/// <summary>
/// Encodes/decodes strings.
/// </summary>
public class LineCoder : IProtocolCoder<string>
{
public bool Read(PipeReader reader, CoderContext<string> ctx, out string frame) {
if (reader.TryRead(out ReadResult result) && !result.IsCompleted) {
// create array (not the most efficient way we could do this)
byte[] arr = result.Buffer.ToArray();

// set frame
frame = Encoding.UTF8.GetString(arr);

// advance
reader.AdvanceTo(result.Buffer.End, result.Buffer.End);
return true;
}

// we didn't find a frame
frame = null;
return false;
}

public void Reset() {
}

public Task WriteAsync(Stream stream, string frame, CoderContext<string> ctx, CancellationToken cancellationToken) {
byte[] arr = Encoding.UTF8.GetBytes(frame);
return stream.WriteAsync(arr, 0, arr.Length);
}
}
}
13 changes: 13 additions & 0 deletions samples/Example.Line/LineConnection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using ProtoSocket;
using System;
using System.Collections.Generic;
using System.Text;

namespace Example.Line
{
class LineConnection : ProtocolConnection<LineConnection, string>
{
public LineConnection(ProtocolServer<LineConnection, string> server, ProtocolCoderFactory<string> coderFactory) : base(server, coderFactory) {
}
}
}
13 changes: 13 additions & 0 deletions samples/Example.Line/LineServer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using ProtoSocket;
using System;
using System.Collections.Generic;
using System.Text;

namespace Example.Line
{
class LineServer : ProtocolServer<LineConnection, string>
{
public LineServer() : base((p) => new LineCoder()) {
}
}
}
44 changes: 44 additions & 0 deletions samples/Example.Line/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Example.Line
{
class ConsoleObserver : IObserver<string>
{
public void OnCompleted() {
}

public void OnError(Exception error) {
}

public void OnNext(string value) {
Console.WriteLine(value);
}
}

class Program
{
static async Task Main(string[] args) {
LineServer server = new LineServer();
server.Configure("tcp://0.0.0.0:6060");
server.Start();

server.Connected += async (o, e) => {
e.Peer.Subscribe(new ConsoleObserver());
};

while(true) {
var conns = server.Connections;

foreach(var c in conns) {
Console.WriteLine($"IsConnected: {c.IsConnected} State: {c.State} CR: {c.CloseReason} CE: {(c.CloseException == null ? "null" : c.CloseException.Message)}");
}

await Task.Delay(2000);
}

await Task.Delay(Timeout.InfiniteTimeSpan);
}
}
}
2 changes: 1 addition & 1 deletion samples/Example.Minecraft/World.cs
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ public async Task RunAsync() {
connection.Queue(new ClassicPacket() { Id = PacketId.Ping, Payload = new byte[0] });

// send all queued frames
sendTasks.Add(connection.SendAsync());
sendTasks.Add(connection.FlushAsync());
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/ProtoSocket/ProtoSocket.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<TargetFramework>netstandard1.3</TargetFramework>
<Authors>Alan Doherty &amp; WIFIPLUG Ltd</Authors>
<Company>Alan Doherty</Company>
<Version>0.3.3</Version>
<Version>0.4.0</Version>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<Description>A networking library for frame-based, performant asynchronous sockets on .NET Core</Description>
<Copyright>Alan Doherty 2018</Copyright>
Expand Down
41 changes: 25 additions & 16 deletions src/ProtoSocket/ProtocolPeer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,15 @@ public string CloseReason {
}
}

/// <summary>
/// Gets the close exception, if any.
/// </summary>
public Exception CloseException {
get {
return _closeException;
}
}

/// <summary>
/// Gets or sets if to enable TCP keep alive.
/// </summary>
Expand Down Expand Up @@ -231,7 +240,7 @@ protected virtual bool OnReceived(PeerReceivedEventArgs<TFrame> e) {

#region Queued Sending
/// <summary>
/// Queues the frames to be sent, this will happen on the next call to <see cref="SendAsync(CancellationToken)"/>.
/// Queues the frames to be sent, this will happen on the next call to <see cref="FlushAsync(CancellationToken)"/>.
/// </summary>
/// <param name="frames">The frames.</param>
/// <exception cref="ObjectDisposedException">If the peer closes during the operation.</exception>
Expand All @@ -254,7 +263,7 @@ public virtual int Queue(IEnumerable<TFrame> frames) {
}

/// <summary>
/// Queues the frames to be sent, this will happen on the next call to <see cref="SendAsync(CancellationToken)"/>.
/// Queues the frames to be sent, this will happen on the next call to <see cref="FlushAsync(CancellationToken)"/>.
/// </summary>
/// <param name="frames">The frames.</param>
/// <exception cref="ObjectDisposedException">If the peer closes during the operation.</exception>
Expand All @@ -265,7 +274,7 @@ public int Queue(params TFrame[] frames) {
}

/// <summary>
/// Queues the frame to be sent, this will happen on the next call to <see cref="SendAsync(CancellationToken)"/>.
/// Queues the frame to be sent, this will happen on the next call to <see cref="FlushAsync(CancellationToken)"/>.
/// </summary>
/// <param name="frame">The frame.</param>
/// <exception cref="ObjectDisposedException">If the peer closes during the operation.</exception>
Expand All @@ -287,7 +296,7 @@ public virtual int Queue(TFrame frame) {

/// <summary>
/// Queues the frame to be sent and returns a task which will complete after sending.
/// The frame will be sent on the next call to <see cref="SendAsync(CancellationToken)"/>.
/// The frame will be sent on the next call to <see cref="FlushAsync(CancellationToken)"/>.
/// </summary>
/// <param name="frame">The frame.</param>
/// <exception cref="ObjectDisposedException">If the peer closes during the operation.</exception>
Expand Down Expand Up @@ -357,15 +366,15 @@ private async Task<int> SendAllAsync() {
}

/// <summary>
/// Sends all queued frames asyncronously.
/// Flushes all queued frames asyncronously.
/// </summary>
/// <param name="cancellationToken">The cancellation token.</param>
/// <exception cref="ObjectDisposedException">If the peer closes during the operation.</exception>
/// <exception cref="InvalidOperationException">If the peer is disconnecting or disconnected.</exception>
/// <exception cref="InvalidOperationException">If the peer has not been configured yet.</exception>
/// <exception cref="OperationCanceledException">If the operation is cancelled.</exception>
/// <returns>The number of sent frames.</returns>
public virtual async Task<int> SendAsync(CancellationToken cancellationToken = default(CancellationToken)) {
public virtual async Task<int> FlushAsync(CancellationToken cancellationToken = default(CancellationToken)) {
// validate the peer isn't disposed or not ready
if (_disposed == 1)
throw new ObjectDisposedException(nameof(ProtocolPeer<TFrame>), "The peer has been disposed");
Expand Down Expand Up @@ -752,17 +761,17 @@ private async void ReadLoop() {
readBufferCount = await _dataStream.ReadAsync(readBuffer, 0, readBuffer.Length, _disposeSource.Token).ConfigureAwait(false);
} catch (OperationCanceledException ex) {
if (ex.CancellationToken == _disposeSource.Token) {
_closeReason = "Peer disposed before incoming frame decoded";
throw;
Abort("Peer disposed before incoming frame decoded", ex);
} else if (ex.CancellationToken == _readCancelSource.Token) {
return;
}
} catch (ObjectDisposedException) {
Abort("End of stream");

return;
} catch (ObjectDisposedException ex) {
Abort("End of stream", ex);
return;
} catch (Exception ex) {
Abort($"Read exception: {ex.Message}");
return;
} catch (Exception) {
_closeReason = "Failed to decode incoming frame";
throw;
}

// check if end of stream has been reached
Expand Down Expand Up @@ -971,8 +980,8 @@ public void Dispose() {

// remove all subscriptions mark as completeds
lock (_subscriptions) {
foreach (IObserver<TFrame> observer in _subscriptions) {
observer.OnCompleted();
foreach (Subscription sub in _subscriptions) {
sub.Observer.OnCompleted();
}

_subscriptions.Clear();
Expand Down

0 comments on commit a25a281

Please sign in to comment.