-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathinterface.go
469 lines (396 loc) · 11.4 KB
/
interface.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
package tentacle
import (
"fmt"
"time"
ma "github.com/multiformats/go-multiaddr"
)
const (
// SessionClose a session close
SessionClose uint = iota
// SessionOpen a session open
SessionOpen
// ListenClose a listener close
ListenClose
// ListenStarted a listener started
ListenStarted
)
// ServiceEvent event generated by the Service
type ServiceEvent struct {
Tag uint
Event interface{}
}
// Name error tag name
func (s *ServiceEvent) Name() string {
var r string
switch s.Tag {
case SessionClose:
r = "SessionClose"
case SessionOpen:
r = "SessionOpen"
case ListenClose:
r = "ListenClose"
case ListenStarted:
r = "ListenStarted"
}
return r
}
func (s *ServiceEvent) String() string {
var r string
switch s.Tag {
case SessionClose:
inner := s.Event.(*SessionContext)
r = fmt.Sprintf("SessionClose: sid %d", inner.Sid)
case SessionOpen:
inner := s.Event.(*SessionContext)
r = fmt.Sprintf("SessionOpen: sid %d", inner.Sid)
case ListenClose:
inner := s.Event.(ma.Multiaddr)
r = fmt.Sprintf("ListenClose: addr %s", inner)
case ListenStarted:
inner := s.Event.(ma.Multiaddr)
r = fmt.Sprintf("ListenStarted: addr %s", inner)
}
return r
}
const (
// DialerError when dial remote error
DialerError uint = iota
// ListenError when listen error
ListenError
// ProtocolSelectError Protocol select fail
ProtocolSelectError
// ProtocolError Protocol error during interaction
ProtocolError
// SessionTimeout After initializing the connection, the session does not open any protocol,
// suspected fd attack
SessionTimeout
// MuxerError Multiplex protocol error
MuxerError
// ProtocolHandleError protocol handle error, will cause memory leaks/abnormal CPU usage
ProtocolHandleError
)
// ProtocolHandleErrorInner is inner msg of this error
type ProtocolHandleErrorInner struct {
PID ProtocolID
// If SID == 0, it means that can not locate which session case this error
SID SessionID
}
func (p *ProtocolHandleErrorInner) String() string {
return fmt.Sprintf("protocol handle abnormally closed, session id: %d, pid %d", p.SID, p.PID)
}
// ProtocolSelectErrorInner protocol select fail
type ProtocolSelectErrorInner struct {
// Protocol name, if none, timeout or other net problem,
// if Some, don't support this proto
Name string
// Session context
Context *SessionContext
}
func (p *ProtocolSelectErrorInner) String() string {
return fmt.Sprintf("protocol select error, protocol name: %s, session id: %d", p.Name, p.Context.Sid)
}
// ProtocolErrorInner protocol error during interaction
type ProtocolErrorInner struct {
SID SessionID
PID ProtocolID
// Codec error
Err error
}
func (p *ProtocolErrorInner) String() string {
return fmt.Sprintf("protocol error during interaction, pid: %d, sid: %d, error: %s", p.PID, p.SID, p.Err)
}
// SessionTimeoutInner after initializing the connection, the session does not open any protocol,
// suspected fd attack
type SessionTimeoutInner struct {
Context *SessionContext
}
func (s *SessionTimeoutInner) String() string {
return fmt.Sprintf("sid: %d, remote addr: %s", s.Context.Sid, s.Context.RemoteAddr)
}
// MuxerErrorInner multiplex protocol error
type MuxerErrorInner struct {
Context *SessionContext
Err error
}
func (s *MuxerErrorInner) String() string {
return fmt.Sprintf("sid: %d, remote addr: %s, error: %s", s.Context.Sid, s.Context.RemoteAddr, s.Err)
}
const (
/*
DialerErrorInner and ListenErrorInner tags
but ListenErrorInner don't have `PeerIDNotMatch` and `HandshakeError` tag
*/
// RepeatedConnection connected to the connected peer, inner type SessionID
RepeatedConnection uint = iota
// PeerIDNotMatch when dial remote, peer id does not match
PeerIDNotMatch
// HandshakeError handshake error
HandshakeError
// TransportError transport error
TransportError
// IoError IO error
IoError
)
// DialerErrorInner when dial remote error
type DialerErrorInner struct {
Tag uint
Inner interface{}
// remote addr
Addr ma.Multiaddr
}
// Name error tag name
func (d *DialerErrorInner) Name() string {
var s string
switch d.Tag {
case RepeatedConnection:
s = "RepeatedConnection"
case PeerIDNotMatch:
s = "PeerIDNotMatch"
case HandshakeError:
s = "HandshakeError"
case TransportError:
s = "TransportError"
case IoError:
s = "IoError"
}
return s
}
func (d *DialerErrorInner) String() string {
var s string
switch d.Tag {
case RepeatedConnection:
id := d.Inner.(SessionID)
s = fmt.Sprintf("RepeatedConnection error: repeated by session %d, remote addr: %s", id, d.Addr)
case PeerIDNotMatch:
s = fmt.Sprintf("Peer id don't match by dail command, remote addr: %s", d.Addr)
case HandshakeError:
s = fmt.Sprintf("Handshake error: %s, remote addr: %s", d.Inner.(error), d.Addr)
case TransportError:
s = fmt.Sprintf("Transport error: %s, remote addr: %s", d.Inner.(error), d.Addr)
case IoError:
s = fmt.Sprintf("Io error: %s, remote addr: %s", d.Inner.(error), d.Addr)
}
return s
}
// ListenErrorInner when listen error
type ListenErrorInner struct {
Tag uint
Inner interface{}
// Listen address
Addr ma.Multiaddr
}
// Name error tag name
func (d *ListenErrorInner) Name() string {
var s string
switch d.Tag {
case RepeatedConnection:
s = "RepeatedConnection"
case TransportError:
s = "TransportError"
case IoError:
s = "IoError"
}
return s
}
func (d *ListenErrorInner) String() string {
var s string
switch d.Tag {
case RepeatedConnection:
id := d.Inner.(SessionID)
s = fmt.Sprintf("RepeatedConnection error: repeated by session %d, remote addr: %s", id, d.Addr)
case TransportError:
s = fmt.Sprintf("Transport error: %s, remote addr: %s", d.Inner.(error), d.Addr)
case IoError:
s = fmt.Sprintf("Io error: %s, remote addr: %s", d.Inner.(error), d.Addr)
}
return s
}
// ServiceError error generated by the Service
type ServiceError struct {
Tag uint
Event interface{}
}
// Name error tag name
func (s *ServiceError) Name() string {
var r string
switch s.Tag {
case DialerError:
r = "DialerError"
case ListenError:
r = "ListenError"
case ProtocolSelectError:
r = "ProtocolSelectError"
case ProtocolError:
r = "ProtocolError"
case SessionTimeout:
r = "SessionTimeout"
case MuxerError:
r = "MuxerError"
case ProtocolHandleError:
r = "ProtocolHandleError"
}
return r
}
func (s *ServiceError) String() string {
var r string
switch s.Tag {
case DialerError:
inner := s.Event.(DialerErrorInner)
r = fmt.Sprintf("DialerError: %s", inner.String())
case ListenError:
inner := s.Event.(ListenErrorInner)
r = fmt.Sprintf("ListenError: %s", inner.String())
case ProtocolSelectError:
inner := s.Event.(ProtocolSelectErrorInner)
r = fmt.Sprintf("ProtocolSelectError: %s", inner.String())
case ProtocolError:
inner := s.Event.(ProtocolErrorInner)
r = fmt.Sprintf("ProtocolError: %s", inner.String())
case SessionTimeout:
inner := s.Event.(SessionTimeoutInner)
r = fmt.Sprintf("SessionTimeout: %s", inner.String())
case MuxerError:
inner := s.Event.(MuxerErrorInner)
r = fmt.Sprintf("MuxerError: %s", inner.String())
case ProtocolHandleError:
inner := s.Event.(ProtocolHandleErrorInner)
r = fmt.Sprintf("ProtocolHandleError: %s", inner.String())
}
return r
}
// ServiceHandle is a handle to do something by service
//
// #### Behavior
//
// The handle that exists when the Service is created.
//
// Mainly handle some Service-level errors thrown at runtime, such as listening errors.
//
// At the same time, the session establishment and disconnection messages will also be perceived here.
type ServiceHandle interface {
// Handling runtime errors
HandleError(*ServiceContext, ServiceError)
// Handling session establishment and disconnection events
HandleEvent(*ServiceContext, ServiceEvent)
}
// ServiceProtocol is Service level protocol handle
//
// #### Behavior
//
// Define the behavior of each custom protocol in each state.
//
// Depending on whether the user defines a service handle or a session exclusive handle,
// the runtime has different performance.
//
// The **important difference** is that some state values are allowed in the service handle,
// and the handle exclusive to the session is "stateless", relative to the service handle,
// it can only retain the information between a protocol stream on and off.
//
// The opening and closing of the protocol will create and clean up the handle exclusive
// to the session, but the service handle will remain in the state until the service is closed.
//
type ServiceProtocol interface {
// This function is called when the service start.
//
// The service handle will only be called once
Init(*ProtocolContext)
// Called when opening protocol
Connected(ctx *ProtocolContextRef, version string)
// Called when closing protocol
Disconnected(*ProtocolContextRef)
// Called when the corresponding protocol message is received
Received(ctx *ProtocolContextRef, data []byte)
// Called when the Service receives the notify task
Notify(ctx *ProtocolContext, token uint64)
}
// SessionProtocol is Session level protocol handle
type SessionProtocol interface {
// Called when opening protocol
Connected(ctx *ProtocolContextRef, version string)
// Called when closing protocol
Disconnected(*ProtocolContextRef)
// Called when the corresponding protocol message is received
Received(ctx *ProtocolContextRef, data []byte)
// Called when the session receives the notify task
Notify(ctx *ProtocolContextRef, token uint64)
}
// SubstreamReadPart is the read part for the protocol side
type SubstreamReadPart interface {
// Get next message
NextMsg() (msg []byte, err error)
// Get protocol id
ProtocolID() ProtocolID
// Get protocol version
Version() string
}
// When the negotiation is completed and the agreement is opened, will call the implementation,
// allow users to implement the read processing of the protocol by themselves
//
// Implementing this interface means that streaming reading directly from the underlying substream
// will become possible
//
// This interface implementation and the callback implementation are mutually exclusive, and will be
// checked during construction, if both exist, it will panic
type ProtocolSpawn interface {
// Spawn call on protocol opened
// It is assumed that the user will use the go syntax internally to put the Reader in a separate goroutine for execution
Spawn(ctx *SessionContext, control *Service, read SubstreamReadPart)
}
const (
taskProtocolMessage uint = iota
taskProtocolOpen
taskProtocolClose
taskSetProtocolNotify
taskRemoveProtocolNotify
taskSetProtocolSessionNotify
taskRemoveProtocolSessionNotify
taskDisconnect
taskDial
taskListen
taskListenStart
taskShutdown
)
// Task received by the Service.
//
// An instruction that the outside world can send to the service
type serviceTask struct {
tag uint
event interface{}
}
type taskProtocolMessageInner struct {
target TargetSession
pid ProtocolID
data []byte
}
type taskProtocolOpenInner struct {
sid SessionID
target TargetProtocol
}
type taskProtocolCloseInner struct {
sid SessionID
pid ProtocolID
}
type taskDialInner struct {
addr ma.Multiaddr
target TargetProtocol
}
type taskSetProtocolNotifyInner struct {
pid ProtocolID
interval time.Duration
token uint64
}
type taskRemoveProtocolNotifyInner struct {
pid ProtocolID
token uint64
}
type taskSetProtocolSessionNotifyInner struct {
sid SessionID
pid ProtocolID
interval time.Duration
token uint64
}
type taskRemoveProtocolSessionNotifyInner struct {
sid SessionID
pid ProtocolID
token uint64
}