Skip to content

Commit

Permalink
Support custom communication protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
andeya committed Nov 10, 2017
1 parent 810ffe8 commit 6ff3feb
Show file tree
Hide file tree
Showing 17 changed files with 871 additions and 619 deletions.
85 changes: 63 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go get -u github.com/henrylee2cn/teleport
- Server and client are peer-to-peer, have the same API method
- Packet contains both Header and Body two parts
- Support for customizing head and body coding types separately, e.g `JSON` `Protobuf` `string`
- Support custom communication protocol
- Body supports gzip compression
- Header contains the status code and its description text
- Support push, pull, reply and other means of communication
Expand Down Expand Up @@ -67,30 +68,16 @@ go get -u github.com/henrylee2cn/teleport
Peer -> Connection -> Socket -> Session -> Context
```


### 4.3 Packet

```
HeaderLength | HeaderCodecId | Header | BodyLength | BodyCodecId | Body
```

**Notes:**

- `HeaderLength`: uint32, 4 bytes, big endian
- `HeaderCodecId`: uint8, 1 byte
- `Header`: header bytes
- `BodyLength`: uint32, 4 bytes, big endian
* may be 0, meaning that the `Body` is empty and does not indicate the `BodyCodecId`
* may be 1, meaning that the `Body` is empty but indicates the `BodyCodecId`
- `BodyCodecId`: uint8, 1 byte
- `Body`: body bytes
The contents of every one packet:

```go
type Packet struct {
// HeaderCodec header codec name
HeaderCodec string `json:"header_codec"`
// BodyCodec body codec name
BodyCodec string `json:"body_codec"`
// HeaderCodec header codec string
HeaderCodec string
// BodyCodec body codec string
BodyCodec string
// header content
Header *Header `json:"header"`
// body content
Expand All @@ -104,7 +91,7 @@ type Packet struct {
}
```

### 4.4 Header
Among the contents of the header:

```go
type Header struct {
Expand All @@ -123,6 +110,60 @@ type Header struct {
}
```

### 4.4 Protocol

The default socket communication protocol:

```
HeaderLength | HeaderCodecId | Header | BodyLength | BodyCodecId | Body
```

**Notes:**

- `HeaderLength`: uint32, 4 bytes, big endian
- `HeaderCodecId`: uint8, 1 byte
- `Header`: header bytes
- `BodyLength`: uint32, 4 bytes, big endian
* may be 0, meaning that the `Body` is empty and does not indicate the `BodyCodecId`
* may be 1, meaning that the `Body` is empty but indicates the `BodyCodecId`
- `BodyCodecId`: uint8, 1 byte
- `Body`: body bytes

You can customize your own communication protocol by implementing the interface:

```go
// Protocol socket communication protocol
type Protocol interface {
// WritePacket writes header and body to the connection.
WritePacket(
packet *Packet,
destWriter *utils.BufioWriter,
tmpCodecWriterGetter func(codecName string) (*TmpCodecWriter, error),
isActiveClosed func() bool,
) error

// ReadPacket reads header and body from the connection.
ReadPacket(
packet *Packet,
bodyAdapter func() interface{},
srcReader *utils.BufioReader,
codecReaderGetter func(codecId byte) (*CodecReader, error),
isActiveClosed func() bool,
) error
}
```

Next, you can specify the communication protocol in the following ways:

```go
func SetDefaultProtocol(socket.Protocol)
func (*Peer) ServeConn(conn net.Conn, protocol ...socket.Protocol) Session
func (*Peer) DialContext(ctx context.Context, addr string, protocol ...socket.Protocol) (Session, error)
func (*Peer) Dial(addr string, protocol ...socket.Protocol) (Session, error)
func (*Peer) Listen(protocol ...socket.Protocol) error
```


## 5. Usage

- Create a server or client peer
Expand Down Expand Up @@ -151,7 +192,7 @@ var peer = tp.NewPeer(cfg)
peer.Listen()

// It can also be used as a client at the same time
var sess, err = peer.Dial("127.0.0.1:8080", "peerid-client")
var sess, err = peer.Dial("127.0.0.1:8080")
if err != nil {
tp.Panicf("%v", err)
}
Expand Down Expand Up @@ -367,7 +408,7 @@ func main() {
peer.PushRouter.Reg(new(Push))

{
var sess, err = peer.Dial("127.0.0.1:9090", "simple_server:9090")
var sess, err = peer.Dial("127.0.0.1:9090")
if err != nil {
tp.Panicf("%v", err)
}
Expand Down
89 changes: 66 additions & 23 deletions README_ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ go get -u github.com/henrylee2cn/teleport
- 服务器和客户端之间对等通信,两者API方法基本一致
- 底层通信数据包包含`Header``Body`两部分
- 支持单独定制`Header``Body`编码类型,例如`JSON` `Protobuf` `string`
- 支持定制通信协议
- `Body`支持gzip压缩
- `Header`包含状态码及其描述文本
- 支持推、拉、回复等通信模式
Expand Down Expand Up @@ -68,30 +69,16 @@ go get -u github.com/henrylee2cn/teleport
Peer -> Connection -> Socket -> Session -> Context
```

### 4.3 包内容

### 4.3 数据包

```
HeaderLength | HeaderCodecId | Header | BodyLength | BodyCodecId | Body
```

**注意:**

- `HeaderLength`: uint32, 4 bytes, big endian
- `HeaderCodecId`: uint8, 1 byte
- `Header`: header bytes
- `BodyLength`: uint32, 4 bytes, big endian
* 可能为0,表示`Body`为空且不指明`BodyCodecId`
* 可能为1,表示`Body`为空但是指明`BodyCodecId`
- `BodyCodecId`: uint8, 1 byte
- `Body`: body bytes
每个数据包的内容如下:

```go
type Packet struct {
// HeaderCodec header codec name
HeaderCodec string `json:"header_codec"`
// BodyCodec body codec name
BodyCodec string `json:"body_codec"`
// HeaderCodec header codec string
HeaderCodec string
// BodyCodec body codec string
BodyCodec string
// header content
Header *Header `json:"header"`
// body content
Expand All @@ -105,7 +92,7 @@ type Packet struct {
}
```

### 4.4 头信息
其中头部内容为:

```go
type Header struct {
Expand All @@ -124,6 +111,62 @@ type Header struct {
}
```


### 4.4 通信协议

默认的通信协议:

```
HeaderLength | HeaderCodecId | Header | BodyLength | BodyCodecId | Body
```

**注意:**

- `HeaderLength`: uint32, 4 bytes, big endian
- `HeaderCodecId`: uint8, 1 byte
- `Header`: header bytes
- `BodyLength`: uint32, 4 bytes, big endian
* 可能为0,表示`Body`为空且不指明`BodyCodecId`
* 可能为1,表示`Body`为空但是指明`BodyCodecId`
- `BodyCodecId`: uint8, 1 byte
- `Body`: body bytes


你可以通过实现接口的方法定制自己的通信协议:

```go
// Protocol socket communication protocol
type Protocol interface {
// WritePacket writes header and body to the connection.
WritePacket(
packet *Packet,
destWriter *utils.BufioWriter,
tmpCodecWriterGetter func(codecName string) (*TmpCodecWriter, error),
isActiveClosed func() bool,
) error

// ReadPacket reads header and body from the connection.
ReadPacket(
packet *Packet,
bodyAdapter func() interface{},
srcReader *utils.BufioReader,
codecReaderGetter func(codecId byte) (*CodecReader, error),
isActiveClosed func() bool,
) error
}
```

接着,你可以使用以下任意方式指定自己的通信协议:

```go
func SetDefaultProtocol(socket.Protocol)
func (*Peer) ServeConn(conn net.Conn, protocol ...socket.Protocol) Session
func (*Peer) DialContext(ctx context.Context, addr string, protocol ...socket.Protocol) (Session, error)
func (*Peer) Dial(addr string, protocol ...socket.Protocol) (Session, error)
func (*Peer) Listen(protocol ...socket.Protocol) error
```


## 5. 用法

- 创建一个Peer端点,服务端或客户端
Expand Down Expand Up @@ -152,7 +195,7 @@ var peer = tp.NewPeer(cfg)
peer.Listen()

// It can also be used as a client at the same time
var sess, err = peer.Dial("127.0.0.1:8080", "peerid-client")
var sess, err = peer.Dial("127.0.0.1:8080")
if err != nil {
tp.Panicf("%v", err)
}
Expand Down Expand Up @@ -368,7 +411,7 @@ func main() {
peer.PushRouter.Reg(new(Push))

{
var sess, err = peer.Dial("127.0.0.1:9090", "simple_server:9090")
var sess, err = peer.Dial("127.0.0.1:9090")
if err != nil {
tp.Panicf("%v", err)
}
Expand Down
21 changes: 11 additions & 10 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/henrylee2cn/goutil"
"github.com/henrylee2cn/goutil/errors"
"github.com/henrylee2cn/teleport/socket"
)

// Peer peer which is server or client.
Expand Down Expand Up @@ -89,22 +90,22 @@ func (p *Peer) GetSession(sessionId string) (Session, bool) {
}

// ServeConn serves the connection and returns a session.
func (p *Peer) ServeConn(conn net.Conn, id ...string) Session {
var session = newSession(p, conn, id...)
func (p *Peer) ServeConn(conn net.Conn, protocol ...socket.Protocol) Session {
var session = newSession(p, conn, protocol)
p.sessHub.Set(session)
return session
}

// Dial connects with the peer of the destination address.
func (p *Peer) Dial(addr string, id ...string) (Session, error) {
func (p *Peer) Dial(addr string, protocol ...socket.Protocol) (Session, error) {
var conn, err = net.DialTimeout("tcp", addr, p.defaultDialTimeout)
if err != nil {
return nil, err
}
if p.tlsConfig != nil {
conn = tls.Client(conn, p.tlsConfig)
}
var sess = newSession(p, conn, id...)
var sess = newSession(p, conn, protocol)
if err = p.pluginContainer.PostDial(sess); err != nil {
sess.Close()
return nil, err
Expand All @@ -117,7 +118,7 @@ func (p *Peer) Dial(addr string, id ...string) (Session, error) {

// DialContext connects with the peer of the destination address,
// using the provided context.
func (p *Peer) DialContext(ctx context.Context, addr string, id ...string) (Session, error) {
func (p *Peer) DialContext(ctx context.Context, addr string, protocol ...socket.Protocol) (Session, error) {
var d net.Dialer
var conn, err = d.DialContext(ctx, "tcp", addr)
if err != nil {
Expand All @@ -126,7 +127,7 @@ func (p *Peer) DialContext(ctx context.Context, addr string, id ...string) (Sess
if p.tlsConfig != nil {
conn = tls.Client(conn, p.tlsConfig)
}
var sess = newSession(p, conn, id...)
var sess = newSession(p, conn, protocol)
if err = p.pluginContainer.PostDial(sess); err != nil {
sess.Close()
return nil, err
Expand All @@ -141,7 +142,7 @@ func (p *Peer) DialContext(ctx context.Context, addr string, id ...string) (Sess
var ErrListenClosed = errors.New("listener is closed")

// Listen turns on the listening service.
func (p *Peer) Listen() error {
func (p *Peer) Listen(protocol ...socket.Protocol) error {
var (
wg sync.WaitGroup
count = len(p.listenAddrs)
Expand All @@ -151,7 +152,7 @@ func (p *Peer) Listen() error {
for _, addr := range p.listenAddrs {
go func(addr string) {
defer wg.Done()
errCh <- p.listen(addr)
errCh <- p.listen(addr, protocol)
}(addr)
}
wg.Wait()
Expand All @@ -164,7 +165,7 @@ func (p *Peer) Listen() error {
return errs
}

func (p *Peer) listen(addr string) error {
func (p *Peer) listen(addr string, protocols []socket.Protocol) error {
var lis, err = listen(addr, p.tlsConfig)
if err != nil {
Fatalf("%v", err)
Expand Down Expand Up @@ -219,7 +220,7 @@ func (p *Peer) listen(addr string) error {
time.Sleep(time.Second)
goto TRYGO
}
}(newSession(p, rw))
}(newSession(p, rw, protocols))
}
}

Expand Down
2 changes: 1 addition & 1 deletion samples/ab/frame_client_ab.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func main() {

var peer = tp.NewPeer(cfg)

var sess, err = peer.Dial("127.0.0.1:9090", "simple_server:9090")
var sess, err = peer.Dial("127.0.0.1:9090")
if err != nil {
tp.Panicf("%v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion samples/simple/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func main() {
peer.PushRouter.Reg(new(Push))

{
var sess, err = peer.Dial("127.0.0.1:9090", "simple_server:9090")
var sess, err = peer.Dial("127.0.0.1:9090")
if err != nil {
tp.Panicf("%v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,12 @@ var (
_ ForeSession = new(session)
)

func newSession(peer *Peer, conn net.Conn, id ...string) *session {
func newSession(peer *Peer, conn net.Conn, protocols []socket.Protocol) *session {
var s = &session{
peer: peer,
pullRouter: peer.PullRouter,
pushRouter: peer.PushRouter,
socket: socket.NewSocket(conn, id...),
socket: socket.NewSocket(conn, protocols...),
pullCmdMap: goutil.RwMap(),
readTimeout: peer.defaultReadTimeout,
writeTimeout: peer.defaultWriteTimeout,
Expand Down
Loading

0 comments on commit 6ff3feb

Please sign in to comment.