forked from miyu/Dargon.Transport
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathNamedPipeFrameTransmitter.cs
174 lines (156 loc) · 5.76 KB
/
NamedPipeFrameTransmitter.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
using System;
using System.IO;
using System.IO.Pipes;
using System.ServiceModel.Channels;
namespace Dargon.Transport
{
public class NamedPipeFrameTransmitter : IFrameTransmitter
{
/// <summary>
/// The named pipe client stream associated with this frame transmitter.
/// </summary>
private readonly NamedPipeClientStream m_stream;
/// <summary>
/// The binary writer which is used to write from to network stream
/// </summary>
private readonly BinaryWriter m_writer;
/// <summary>
/// All input is stored into this input buffer.
/// </summary>
private readonly byte[] m_inputBuffer = new byte[DTPConstants.kMaxMessageSize];
/// <summary>
/// The buffer pool, which provides us input buffers for reading in messages.
/// </summary>
private readonly BufferManager m_bufferPool;
/// <summary>
/// Initializes a new instance of a TCP Frame Transmitter for DSPEx
/// </summary>
/// <param name="pipeName">
/// The name of the pipe which we are writing to
/// </param>
public NamedPipeFrameTransmitter(string pipeName)
{
m_stream = new NamedPipeClientStream(".", pipeName, PipeDirection.InOut, PipeOptions.Asynchronous);
m_stream.Connect();
m_writer = new BinaryWriter(m_stream);
// Elevate to DSPEx - this blocks until the byte has been written to the underlying stream.
m_writer.Write((byte)DTP.DSPEX_INIT);
m_bufferPool = BufferManager.CreateBufferManager(100000, DTPConstants.kMaxMessageSize);
for (int i = 0; i < 100; i++)
m_bufferPool.ReturnBuffer(new byte[DTPConstants.kMaxMessageSize]);
}
/// <summary>
/// Begins receiving message frames.
/// </summary>
/// <param name="onFrameReceived">
/// When a message frame is received, this callback is invoked.
/// </param>
public void BeginReceivingMessageFrames(Action<byte[]> onFrameReceived)
{
BeginReceiveMessage(onFrameReceived);
}
/// <summary>
/// Begins receiving a DSPEx message.
/// In reality, this starts async read to get the UINT messageLength of an incoming message.
/// After that is received, another async read begins to get the transaction id of the message.
/// </summary>
/// <param name="onFrameReceived">
/// This callback is invoked when a frame is received.
/// </param>
private void BeginReceiveMessage(Action<byte[]> onFrameReceived)
{
StateObject so = new StateObject()
{
buffer = m_bufferPool.TakeBuffer(DTPConstants.kMaxMessageSize),
bytesRead = 0
};
ContinueReceiveLength(so, onFrameReceived);
}
/// <summary>
/// Continues to receive a DSPEx message.
/// The first block of this method runs an async loop which reads the first four bytes of our
/// DSPEx message, which tells us the length of our message.
/// The second block of the method runs an async loop which reads the remainder of the message.
/// </summary>
/// <param name="so"></param>
/// <param name="onFrameReceived">
/// This callback is invoked when a frame is received.
/// </param>
private void ContinueReceiveLength(StateObject so, Action<byte[]> onFrameReceived)
{
m_stream.BeginRead(
so.buffer,
so.bytesRead, 4 - so.bytesRead, //Read bytes of index [0, 3]
(asyncResult) => {
int bytesRead = m_stream.EndRead(asyncResult);
so.bytesRead += bytesRead;
// When we've read four bytes, we're done.
if (so.bytesRead == 4)
{
so.bytesTotal = (int)BitConverter.ToUInt32(so.buffer, 0);
ContinueReceivePostLength(so, onFrameReceived);
}
else
{
ContinueReceiveLength(so, onFrameReceived);
}
},
null
);
}
private void ContinueReceivePostLength(StateObject so, Action<byte[]> onFrameReceived)
{
m_stream.BeginRead(
so.buffer,
so.bytesRead, so.bytesTotal - so.bytesRead,
(eSecond) =>
{
int bytesRead = m_stream.EndRead(eSecond);
so.bytesRead += bytesRead;
if (so.bytesRead == so.bytesTotal)
{
BeginReceiveMessage(onFrameReceived);
onFrameReceived(so.buffer);
m_bufferPool.ReturnBuffer(so.buffer);
}
else
{
ContinueReceivePostLength(so, onFrameReceived);
}
},
null
);
}
public void SendRawFrame(byte[] buffer, int offset, int size, Action onFrameSendComplete)
{
m_stream.BeginWrite(
buffer,
offset,
(int)size,
(s) => {
m_stream.EndWrite(s);
onFrameSendComplete();
},
m_stream
);
}
/// <summary>
/// State object for DSPEx asynchronous receiving.
/// </summary>
public class StateObject
{
/// <summary>
/// The total number of bytes which we have read so far
/// </summary>
public int bytesRead = 0;
/// <summary>
/// The total number of bytes in the DSP Message which we are reading
/// </summary>
public int bytesTotal = 0;
/// <summary>
/// The buffer which we are storing stuff in.
/// </summary>
public byte[] buffer; //Contains the entire message frame, including header
}
}
}