Skip to content

Commit

Permalink
Upgrading work, protocol modes, optimisations and frame structs
Browse files Browse the repository at this point in the history
- Redefined the IProtocolUpgrader API, removed type parameter dependency
- Better handling of upgrading connections
- ProtocolPeer.Configure is now protected internal instead of internal
- Added ProtocolClient(IProtocolCoder, ProtocolMode) constructor
- Frames can now be structures instead of reference types
- QueueAsync task now completes when the frame is actually flushed not when it's written to the stream
- IProtocolCoder.WriteAsync is now IProtocolCoder.Write and is much much better optimised when sending bulk small-mid sized frames
- Removed ProtocolServer.Stop since it cannot reliably prevent new connections
- Added ProtocolPeer.GetStream to get the underlying data stream
- Added ProtocolPeer.TryReceive to try and dequeue a received frame waiting in the buffer without blocking
- Added more members to IProtocolPeer that are present in ProtocolPeer<TFrame>
- Minecraft example now spawns you looking at the text
- Minecraft example now uses structures for it's packets
- Frames can now implement IDisposable to indicate reusability/releasing of managed resources after sending
- Renamed ProtocolServer.Endpoint to ProtocolServer.Endpoints for multiple-endpoint listening future proofing
- Added ProtocolServer.FlushAsync, ProtocolServer.SendAsync and ProtocolServer.Queue utility functions with predicate support
- Added ProtocolPeer.GetStatistics for network stats and fixed incorrect frame out count
- Added protocol modes active/passive
- Improved chat example
- Fixed peers locking up when filling up the internal pipe buffer, the pipe buffer will now be 2x the buffer size or 34768, whichever is higher. Coders must always be able to process incoming data so that the queued data never raises above the threshold
- Removed ProtocolPeer.Transfer
- Moved to 0.5.0
  • Loading branch information
alandoherty committed Jan 31, 2019
1 parent 7761376 commit af26ac8
Show file tree
Hide file tree
Showing 33 changed files with 883 additions and 399 deletions.
56 changes: 52 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ In the Minecraft example, you can type `/text Moi moi or /texta Moi moi` for 3D

## Usage

Overall the library makes use of the concept of peers, which can represent a inbound connection or an outgoing client connection. You can use `Peer.Side` to determine if a peer is a client or server connection. The library provides three ways of receiving packets from the opposing peer, it is safe to use all at once if your application requires.
In ProtoSocket, sockets are represented as peers, which can be either an inbound connection or an outgoing client connection. You can use `Peer.Side` to determine if a peer is a client or server connection. The library provides three ways of receiving packets from the opposing peer, it is safe to use all at once if your application requires.

- `IObserver<TFrame>` subscriptions
- `Peer.Received` event
Expand All @@ -60,7 +60,7 @@ Your implementation simply needs to call `PipeReader.TryRead`, processing as muc

### Queueing

In many scenarios creating an asyncronous operation and waiting for every packet to be sent is not ideal, for these use cases you can use the `ProtocolPeer.Queue` and `ProtocolPeer.QueueAsync` methods.
In many scenarios creating an asynchronous operation and waiting for every packet to be sent is not ideal, for these use cases you can use the `ProtocolPeer.Queue` and `ProtocolPeer.QueueAsync` methods.

Queueing a packet does not provide any guarentee it will be sent in a timely fashion, it is up to you to call `ProtocolPeer.SendAsync`/`ProtocolPeer.FlushAsync` for any queued packets to be sent. If you want to queue packets but need to confirm or wait until they have been sent, you can use the `ProtocolPeer.QueueAsync` method.

Expand All @@ -71,11 +71,39 @@ Task message1 = peer.QueueAsync(new ChatMessage() { Text = "I like books" });
Task message2 = peer.QueueAsync(new ChatMessage() { Text = "I like books alot" });
Task message3 = peer.QueueAsync(new ChatMessage() { Text = "I like eBooks too" });

// you can either call peer.FlushAsync or wait until the next call to peer.SendAsync(TFrame/TFrame[]/etc)
await peer.FlushAsync();
// you can either call peer.FlushAsync elsewhere or wait until the next call to peer.SendAsync(TFrame/TFrame[]/etc)
await Task.WhenAll(message1, message2, message3);
```

### Modes

In the newer versions of ProtoSocket you can now create peers in either `Active` or `Passive` mode. In Active mode the peers act

### Upgrading/SSL

In many scenarios you will want to perform an upgrade of the underlying transport connection, for example to support TLS/SSL connections. ProtoSocket provides an easy means of doing this via the `IProtocolUpgrader<TFrame>` interface. Upgrading a connection for versioned protocols, or changing the frame type is not supported and not the target use case of the upgrade API.

To upgrade the connection to SSL for example, use the pre-built `SslUpgrader` class. Note that flushing or sending frames on the peer will trigger an `InvalidOperationException`. You can queue frames however.

```
SslUpgrader upgrader = new SslUpgrader("www.google.com");
upgrader.Protocols = SslProtocols.Tls | SslProtocols.Tls11;
await peer.UpgradeAsync(upgrader);
await peer.SendAsync(new ChatMessage() { Text = "Encrypted chat message!" });
```

You can also upgrade explicitly after connecting, preventing the underlying read loop from accidently interpreting upgraded traffic.

```
ProtocolClient client = new ProtocolClient(new MyCoder(), ProtocolMode.Passive);
SslUpgrader upgrader = new SslUpgrader("www.google.com");
upgrader.Protocols = SslProtocols.Tls | SslProtocols.Tls11;
await client.UpgradeAsync(upgrader);
client.Mode = ProtocolMode.Active;
```

### Filters

You can selectively decline incoming connections by adding a filter to the `ProtocolServer` object. If filtered, the connection will not be added to the server and the socket will be closed instantly.
Expand All @@ -93,7 +121,27 @@ server.Filter = new AsyncConnectionFilter(async (ctx, ct) => {
await Task.Delay(3000);
return false;
});
```

### Reusing Frames

In some cases you may want to pool/optimise your frames in such a way that managed buffers are reused between frames. You can achieve this by implementing `IDisposable`, which will result in `Dispose` being called automatically when the frame is flushed to the opposing peer. Allowing you to take back any resources for use by other frames. The peer will not dispose a frame after receiving, it is up to you to call `Dispose` when you have processed the frame.

```csharp
public struct PooledFrame : IDisposable
{
public byte[] Buffer { get; set; }
public int Size { get; set; }

public void Dispose() {
ArrayPool<byte>.Shared.Return(Buffer);
}

public PooledFrame(int bufferSize) {
Buffer = ArrayPool<byte>.Shared.Rent(bufferSize);
Size = bufferSize;
}
}
```

## Contributing
Expand Down
4 changes: 2 additions & 2 deletions samples/Example.Chat/ChatClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

namespace Example.Chat
{
public class ChatClient : ProtocolClient<ChatMessage>
public class ChatClient : ProtocolClient<ChatFrame>
{
public ChatClient() : base(new ChatCoder()) {
public ChatClient() : base(new ChatCoder(), ProtocolMode.Active) {
}
}
}
63 changes: 34 additions & 29 deletions samples/Example.Chat/ChatCoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ namespace Example.Chat
/// <summary>
/// Encodes/decodes chat messages.
/// </summary>
public class ChatCoder : IProtocolCoder<ChatMessage>
public class ChatCoder : IProtocolCoder<ChatFrame>
{
private ReadState _state;
private int _messageLength;
private byte[] _messagePayload;
private int _messageOffset;

public bool Read(PipeReader reader, CoderContext<ChatMessage> ctx, out ChatMessage frame) {
public bool Read(PipeReader reader, CoderContext<ChatFrame> ctx, out ChatFrame frame) {
if (reader.TryRead(out ReadResult result) && !result.IsCompleted) {
// get the sequence buffer
ReadOnlySequence<byte> buffer = result.Buffer;
Expand All @@ -30,7 +31,8 @@ public bool Read(PipeReader reader, CoderContext<ChatMessage> ctx, out ChatMessa
if (buffer.Length >= 2) {
// to array
byte[] messageLengthBytes = buffer.Slice(0, 2).ToArray();
_messageLength = BitConverter.ToInt16(messageLengthBytes, 0);
_messagePayload = new byte[BitConverter.ToUInt16(messageLengthBytes, 0)];
_messageOffset = 0;

// increment the amount we were able to copy in
buffer = buffer.Slice(2);
Expand All @@ -39,24 +41,28 @@ public bool Read(PipeReader reader, CoderContext<ChatMessage> ctx, out ChatMessa
break;
}
} else if (_state == ReadState.Message) {
if (buffer.Length >= _messageLength) {
// to array
byte[] messagePayload = buffer.Slice(0, _messageLength).ToArray();

// increment the amount we were able to copy in
buffer = buffer.Slice(_messageLength);
// copy as much as possible
int numSliceBytes = Math.Min((int)buffer.Length, _messagePayload.Length - _messageOffset);

// copy in array, increment offset and set new buffer position
buffer.Slice(0, numSliceBytes).CopyTo(new Span<byte>(_messagePayload, _messageOffset, numSliceBytes));
_messageOffset += numSliceBytes;
buffer = buffer.Slice(numSliceBytes);

if (_messageOffset == _messagePayload.Length) {
// output the frames
using (MemoryStream ms = new MemoryStream(messagePayload)) {
using (MemoryStream ms = new MemoryStream(_messagePayload)) {
BinaryReader msReader = new BinaryReader(ms);

string text = msReader.ReadString();
string name = msReader.ReadString();
ushort textLength = msReader.ReadUInt16();
byte[] textBytes = msReader.ReadBytes(textLength);
byte nameLength = msReader.ReadByte();
byte[] nameBytes = msReader.ReadBytes(nameLength);

// output the frames
frame = new ChatMessage() {
Text = text,
Name = name
frame = new ChatFrame() {
Message = Encoding.UTF8.GetString(textBytes),
Name = Encoding.UTF8.GetString(nameBytes)
};
}

Expand All @@ -74,13 +80,14 @@ public bool Read(PipeReader reader, CoderContext<ChatMessage> ctx, out ChatMessa
}

// we didn't find a frame
frame = null;
frame = default(ChatFrame);
return false;
}

public void Reset() {
_state = ReadState.Length;
_messageLength = 0;
_messagePayload = null;
_messageOffset = 0;
}

/// <summary>
Expand All @@ -92,18 +99,16 @@ enum ReadState
Message
}

public async Task WriteAsync(Stream stream, ChatMessage frame, CoderContext<ChatMessage> ctx, CancellationToken cancellationToken) {
using (MemoryStream ms = new MemoryStream()) {
BinaryWriter writer = new BinaryWriter(ms);
public void Write(Stream stream, ChatFrame frame, CoderContext<ChatFrame> ctx) {
byte[] nameBytes = string.IsNullOrEmpty(frame.Name) ? new byte[0] : Encoding.UTF8.GetBytes(frame.Name);
byte[] messageBytes = Encoding.UTF8.GetBytes(frame.Message);

writer.Write((short)0);
writer.Write(frame.Text);
writer.Write(frame.Name);
writer.Seek(0, SeekOrigin.Begin);
writer.Write((short)(ms.Length - 2));

await stream.WriteAsync(ms.ToArray(), 0, (int)ms.Length, cancellationToken).ConfigureAwait(false);
}
BinaryWriter writer = new BinaryWriter(stream, Encoding.UTF8, true);
writer.Write((ushort)(3 + nameBytes.Length + messageBytes.Length));
writer.Write((ushort)messageBytes.Length);
writer.Write(messageBytes);
writer.Write((byte)nameBytes.Length);
writer.Write(nameBytes);
}
}
}
13 changes: 11 additions & 2 deletions samples/Example.Chat/ChatConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,18 @@

namespace Example.Chat
{
public class ChatConnection : ProtocolConnection<ChatConnection, ChatMessage>
public class ChatConnection : ProtocolConnection<ChatConnection, ChatFrame>
{
public ChatConnection(ProtocolServer<ChatConnection, ChatMessage> server, ProtocolCoderFactory<ChatMessage> coderFactory) : base(server, coderFactory) {
public string Name { get; private set; } = "Unnamed";

protected override bool OnReceived(PeerReceivedEventArgs<ChatFrame> e) {
if (!string.IsNullOrEmpty(e.Frame.Name))
Name = e.Frame.Name;

return base.OnReceived(e);
}

public ChatConnection(ProtocolServer<ChatConnection, ChatFrame> server, ProtocolCoderFactory<ChatFrame> coderFactory, ProtocolMode mode, int bufferSize) : base(server, coderFactory, mode, bufferSize) {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
namespace Example.Chat
{
/// <summary>
/// Represents a chat message.
/// Represents a chat frame.
/// </summary>
public class ChatMessage
public struct ChatFrame
{
public string Text { get; set; }
public string Name { get; set; }
public string Message { get; set; }
}
}
4 changes: 2 additions & 2 deletions samples/Example.Chat/ChatServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

namespace Example.Chat
{
public class ChatServer : ProtocolServer<ChatConnection, ChatMessage>
public class ChatServer : ProtocolServer<ChatConnection, ChatFrame>
{
public ChatServer() : base((p) => new ChatCoder()) {
public ChatServer() : base((p) => new ChatCoder(), ProtocolMode.Active) {
}
}
}
Loading

0 comments on commit af26ac8

Please sign in to comment.