Skip to content

Commit

Permalink
Merge branches 'v1-feature-network-ws' and 'v1-feature-network-tcp' i…
Browse files Browse the repository at this point in the history
…nto main
  • Loading branch information
dobyte committed Aug 26, 2022
3 parents ce04c57 + 47a6614 + 056e2ae commit 9dd1e0f
Show file tree
Hide file tree
Showing 24 changed files with 2,100 additions and 0 deletions.
57 changes: 57 additions & 0 deletions network/tcp/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package tcp

import (
"github.com/dobyte/due/network"
"net"
)

type client struct {
opts *clientOptions // 配置
connectHandler network.ConnectHandler // 连接打开hook函数
disconnectHandler network.DisconnectHandler // 连接关闭hook函数
receiveHandler network.ReceiveHandler // 接收消息hook函数
}

var _ network.Client = &client{}

func NewClient(opts ...ClientOption) network.Client {
o := &clientOptions{
addr: "127.0.0.1:3553",
maxMsgLength: 1024 * 1024,
}
for _, opt := range opts {
opt(o)
}

return &client{opts: o}
}

// Dial 拨号连接
func (c *client) Dial() (network.Conn, error) {
addr, err := net.ResolveTCPAddr("tcp", c.opts.addr)
if err != nil {
return nil, err
}

conn, err := net.Dial(addr.Network(), addr.String())
if err != nil {
return nil, err
}

return newClientConn(c, conn), nil
}

// OnConnect 监听连接打开
func (c *client) OnConnect(handler network.ConnectHandler) {
c.connectHandler = handler
}

// OnDisconnect 监听连接关闭
func (c *client) OnDisconnect(handler network.DisconnectHandler) {
c.disconnectHandler = handler
}

// OnReceive 监听接收到消息
func (c *client) OnReceive(handler network.ReceiveHandler) {
c.receiveHandler = handler
}
239 changes: 239 additions & 0 deletions network/tcp/client_conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
package tcp

import (
"github.com/dobyte/due/internal/xnet"
"github.com/dobyte/due/log"
"github.com/dobyte/due/network"
"net"
"sync"
"sync/atomic"
)

type clientConn struct {
rw sync.RWMutex
id int64 // 连接ID
uid int64 // 用户ID
conn net.Conn // TCP源连接
state int32 // 连接状态
client *client // 客户端
chWrite chan chWrite // 写入队列
done chan struct{} // 写入完成信号
}

var _ network.Conn = &clientConn{}

func newClientConn(client *client, conn net.Conn) network.Conn {
c := &clientConn{
id: 1,
conn: conn,
state: int32(network.ConnOpened),
client: client,
chWrite: make(chan chWrite),
done: make(chan struct{}),
}

if c.client.connectHandler != nil {
c.client.connectHandler(c)
}

go c.read()

go c.write()

return c
}

// ID 获取连接ID
func (c *clientConn) ID() int64 {
return c.id
}

// UID 获取用户ID
func (c *clientConn) UID() int64 {
c.rw.RLock()
defer c.rw.RUnlock()

return c.uid
}

// Bind 绑定用户ID
func (c *clientConn) Bind(uid int64) {
c.rw.Lock()
defer c.rw.Unlock()

c.uid = uid
}

// Send 发送消息(同步)
func (c *clientConn) Send(msg []byte, msgType ...int) error {
c.rw.RLock()
defer c.rw.RUnlock()

if err := c.checkState(); err != nil {
return err
}

_, err := c.conn.Write(msg)
return err
}

// Push 发送消息(异步)
func (c *clientConn) Push(msg []byte, msgType ...int) error {
c.rw.RLock()
defer c.rw.RUnlock()

if err := c.checkState(); err != nil {
return err
}

c.chWrite <- chWrite{typ: dataPacket, msg: msg}

return nil
}

// State 获取连接状态
func (c *clientConn) State() network.ConnState {
return network.ConnState(atomic.LoadInt32(&c.state))
}

// Close 关闭连接
func (c *clientConn) Close(isForce ...bool) error {
c.rw.Lock()
defer c.rw.Unlock()

if err := c.checkState(); err != nil {
return err
}

if len(isForce) > 0 && isForce[0] {
atomic.StoreInt32(&c.state, int32(network.ConnClosed))
} else {
atomic.StoreInt32(&c.state, int32(network.ConnHanged))
c.chWrite <- chWrite{typ: closeSig}
<-c.done
}

close(c.chWrite)

return c.conn.Close()
}

// LocalIP 获取本地IP
func (c *clientConn) LocalIP() (string, error) {
addr, err := c.LocalAddr()
if err != nil {
return "", err
}

return xnet.ExtractIP(addr)
}

// LocalAddr 获取本地地址
func (c *clientConn) LocalAddr() (net.Addr, error) {
c.rw.RLock()
defer c.rw.RUnlock()

if err := c.checkState(); err != nil {
return nil, err
}

return c.conn.LocalAddr(), nil
}

// RemoteIP 获取远端IP
func (c *clientConn) RemoteIP() (string, error) {
addr, err := c.RemoteAddr()
if err != nil {
return "", err
}

return xnet.ExtractIP(addr)
}

// RemoteAddr 获取远端地址
func (c *clientConn) RemoteAddr() (net.Addr, error) {
c.rw.RLock()
defer c.rw.RUnlock()

if err := c.checkState(); err != nil {
return nil, err
}

return c.conn.RemoteAddr(), nil
}

// 关闭连接
func (c *clientConn) close() {
atomic.StoreInt32(&c.state, int32(network.ConnClosed))

if c.client.disconnectHandler != nil {
c.client.disconnectHandler(c)
}
}

// 检测连接状态
func (c *clientConn) checkState() error {
switch network.ConnState(atomic.LoadInt32(&c.state)) {
case network.ConnHanged:
return network.ErrConnectionHanged
case network.ConnClosed:
return network.ErrConnectionClosed
}

return nil
}

// 读取消息
func (c *clientConn) read() {
defer c.close()

for {
msg, err := readMsgFromConn(c.conn, c.client.opts.maxMsgLength)
if err != nil {
if err == errMsgSizeTooLarge {
log.Warnf("the msg size too large, has been ignored")
continue
}
return
}

switch c.State() {
case network.ConnHanged:
continue
case network.ConnClosed:
return
}

if c.client.receiveHandler != nil {
c.client.receiveHandler(c, msg, 0)
}
}
}

// 写入消息
func (c *clientConn) write() {
for {
select {
case write, ok := <-c.chWrite:
if !ok {
return
}

if write.typ == closeSig {
c.done <- struct{}{}
return
}

buf, err := pack(write.msg)
if err != nil {
log.Errorf("packet message error: %v", err)
continue
}

if _, err = c.conn.Write(buf); err != nil {
log.Errorf("write message error: %v", err)
continue
}
}
}
}
18 changes: 18 additions & 0 deletions network/tcp/client_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package tcp

type ClientOption func(o *clientOptions)

type clientOptions struct {
addr string // 地址
maxMsgLength int // 最大消息长度
}

// WithClientDialAddr 设置拨号地址
func WithClientDialAddr(addr string) ClientOption {
return func(o *clientOptions) { o.addr = addr }
}

// WithClientMaxMsgLength 设置消息最大长度
func WithClientMaxMsgLength(maxMsgLength int) ClientOption {
return func(o *clientOptions) { o.maxMsgLength = maxMsgLength }
}
42 changes: 42 additions & 0 deletions network/tcp/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package tcp_test

import (
"github.com/dobyte/due/network"
"github.com/dobyte/due/network/tcp"
"testing"
"time"
)

func TestNewClient(t *testing.T) {
client := tcp.NewClient(
tcp.WithClientDialAddr("127.0.0.1:3553"),
)

client.OnConnect(func(conn network.Conn) {
t.Log("connection is opened")
})
client.OnDisconnect(func(conn network.Conn) {
t.Log("connection is closed")
})
client.OnReceive(func(conn network.Conn, msg []byte, msgType int) {
t.Logf("receive msg from server, msg: %s", string(msg))
})

conn, err := client.Dial()
if err != nil {
t.Fatal(err)
}

ticker := time.NewTicker(time.Second)
defer ticker.Stop()
defer conn.Close()
for {
select {
case <-ticker.C:
if err = conn.Push([]byte("hello server~~")); err != nil {
t.Error(err)
return
}
}
}
}
11 changes: 11 additions & 0 deletions network/tcp/def.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package tcp

const (
closeSig int = iota // 关闭信号
dataPacket // 数据包
)

type chWrite struct {
typ int
msg []byte
}
7 changes: 7 additions & 0 deletions network/tcp/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module github.com/dobyte/due/network/tcp

go 1.16

require github.com/dobyte/due v0.0.1

replace github.com/dobyte/due => ../../
Loading

0 comments on commit 9dd1e0f

Please sign in to comment.