Skip to content

Commit

Permalink
Merge branch 'v1-feature-main'
Browse files Browse the repository at this point in the history
  • Loading branch information
dobyte committed May 29, 2023
2 parents 5c445f4 + 1199380 commit 9cb7ae5
Show file tree
Hide file tree
Showing 19 changed files with 228 additions and 386 deletions.
34 changes: 11 additions & 23 deletions cluster/gate/gate.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,26 @@ package gate
import (
"context"
"github.com/dobyte/due/cluster"
"github.com/dobyte/due/session"
"github.com/dobyte/due/transport"
"sync"
"time"

"github.com/dobyte/due/packet"
"github.com/dobyte/due/registry"
"github.com/dobyte/due/session"

"github.com/dobyte/due/component"
"github.com/dobyte/due/log"
"github.com/dobyte/due/network"
"github.com/dobyte/due/packet"
"github.com/dobyte/due/registry"
)

type Gate struct {
component.Base
opts *options
ctx context.Context
cancel context.CancelFunc
group *session.Group
sessions sync.Pool
proxy *proxy
instance *registry.ServiceInstance
rpc transport.Server
session *session.Session
}

func NewGate(opts ...Option) *Gate {
Expand All @@ -43,9 +40,8 @@ func NewGate(opts ...Option) *Gate {

g := &Gate{}
g.opts = o
g.group = session.NewGroup()
g.proxy = newProxy(g)
g.sessions.New = func() interface{} { return session.NewSession() }
g.session = session.NewSession()
g.ctx, g.cancel = context.WithCancel(o.ctx)

return g
Expand Down Expand Up @@ -123,30 +119,21 @@ func (g *Gate) stopNetworkServer() {

// 处理连接打开
func (g *Gate) handleConnect(conn network.Conn) {
s := g.sessions.Get().(*session.Session)
s.Init(conn)
g.group.AddSession(s)
g.session.AddConn(conn)
}

// 处理断开连接
func (g *Gate) handleDisconnect(conn network.Conn) {
s, err := g.group.RemSession(session.Conn, conn.ID())
if err != nil {
log.Errorf("session remove failed, gid: %d, cid: %d, uid: %d, err: %v", g.opts.id, s.CID(), s.UID(), err)
return
}
g.session.RemConn(conn)

if uid := conn.UID(); uid > 0 {
if cid, uid := conn.ID(), conn.UID(); uid > 0 {
ctx, cancel := context.WithTimeout(g.ctx, g.opts.timeout)
err = g.proxy.unbindGate(ctx, conn.ID(), uid)
err := g.proxy.unbindGate(ctx, cid, uid)
cancel()
if err != nil {
log.Errorf("user unbind failed, gid: %d, uid: %d, err: %v", g.opts.id, uid, err)
}
}

s.Reset()
g.sessions.Put(s)
}

// 处理接收到的消息
Expand All @@ -157,8 +144,9 @@ func (g *Gate) handleReceive(conn network.Conn, data []byte, _ int) {
return
}

cid, uid := conn.ID(), conn.UID()
ctx, cancel := context.WithTimeout(g.ctx, g.opts.timeout)
err = g.proxy.deliver(ctx, conn.ID(), conn.UID(), message)
err = g.proxy.deliver(ctx, cid, uid, message)
cancel()
if err != nil {
log.Warnf("deliver message failed: %v", err)
Expand Down
42 changes: 14 additions & 28 deletions cluster/gate/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,17 @@ func (p *provider) Bind(ctx context.Context, cid, uid int64) error {
return ErrInvalidArgument
}

s, err := p.gate.group.GetSession(session.Conn, cid)
err := p.gate.session.Bind(cid, uid)
if err != nil {
return err
}

err = p.gate.proxy.bindGate(ctx, cid, uid)
if err != nil {
return err
_, _ = p.gate.session.Unbind(uid)
}

s.Bind(uid)

return nil
return err
}

// Unbind 解绑用户与网关间的关系
Expand All @@ -37,29 +35,22 @@ func (p *provider) Unbind(ctx context.Context, uid int64) error {
return ErrInvalidArgument
}

s, err := p.gate.group.GetSession(session.User, uid)
cid, err := p.gate.session.Unbind(uid)
if err != nil {
return err
}

err = p.gate.proxy.unbindGate(ctx, s.CID(), uid)
err = p.gate.proxy.unbindGate(ctx, cid, uid)
if err != nil {
return err
}

s.Unbind(uid)

return nil
}

// GetIP 获取客户端IP地址
func (p *provider) GetIP(kind session.Kind, target int64) (string, error) {
s, err := p.gate.group.GetSession(kind, target)
if err != nil {
return "", err
}

return s.RemoteIP()
return p.gate.session.RemoteIP(kind, target)
}

// Push 发送消息
Expand All @@ -69,19 +60,21 @@ func (p *provider) Push(kind session.Kind, target int64, message *packet.Message
return err
}

return p.gate.group.Push(kind, target, msg)
return p.gate.session.Push(kind, target, msg)
}

// Multicast 推送组播消息
func (p *provider) Multicast(kind session.Kind, targets []int64, message *packet.Message) (int64, error) {
if len(targets) == 0 {
return 0, nil
}

msg, err := packet.Pack(message)
if err != nil {
return 0, err
}

total, err := p.gate.group.Multicast(kind, targets, msg)

return int64(total), err
return p.gate.session.Multicast(kind, targets, msg)
}

// Broadcast 推送广播消息
Expand All @@ -91,17 +84,10 @@ func (p *provider) Broadcast(kind session.Kind, message *packet.Message) (int64,
return 0, err
}

total, err := p.gate.group.Broadcast(kind, msg)

return int64(total), err
return p.gate.session.Broadcast(kind, msg)
}

// Disconnect 断开连接
func (p *provider) Disconnect(kind session.Kind, target int64, isForce bool) error {
s, err := p.gate.group.GetSession(kind, target)
if err != nil {
return err
}

return s.Close(isForce)
return p.gate.session.Close(kind, target, isForce)
}
2 changes: 1 addition & 1 deletion eventbus/kafka/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.16

require (
github.com/Shopify/sarama v1.38.1
github.com/dobyte/due v0.0.21
github.com/dobyte/due v0.0.22
)

replace github.com/dobyte/due => ./../../
2 changes: 1 addition & 1 deletion eventbus/nats/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/dobyte/due/eventbus/nats
go 1.16

require (
github.com/dobyte/due v0.0.21
github.com/dobyte/due v0.0.22
github.com/nats-io/nats-server/v2 v2.9.14 // indirect
github.com/nats-io/nats.go v1.23.0
)
Expand Down
2 changes: 1 addition & 1 deletion eventbus/redis/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/dobyte/due/eventbus/redis
go 1.16

require (
github.com/dobyte/due v0.0.21
github.com/dobyte/due v0.0.22
github.com/go-redis/redis/v8 v8.11.5
)

Expand Down
2 changes: 1 addition & 1 deletion locate/redis/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/dobyte/due/locate/redis
go 1.16

require (
github.com/dobyte/due v0.0.21
github.com/dobyte/due v0.0.22
github.com/go-redis/redis/v8 v8.11.5
github.com/jonboulle/clockwork v0.3.0 // indirect
golang.org/x/sync v0.1.0
Expand Down
2 changes: 1 addition & 1 deletion log/aliyun/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.16

require (
github.com/aliyun/aliyun-log-go-sdk v0.1.37
github.com/dobyte/due v0.0.21
github.com/dobyte/due v0.0.22
github.com/go-kit/kit v0.12.0 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
Expand Down
2 changes: 1 addition & 1 deletion log/logrus/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/dobyte/due/log/logrus
go 1.16

require (
github.com/dobyte/due v0.0.21
github.com/dobyte/due v0.0.22
github.com/sirupsen/logrus v1.9.0
)

Expand Down
2 changes: 1 addition & 1 deletion log/tencent/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/dobyte/due/log/tencent
go 1.16

require (
github.com/dobyte/due v0.0.21
github.com/dobyte/due v0.0.22
github.com/tencentcloud/tencentcloud-cls-sdk-go v1.0.2
)

Expand Down
2 changes: 1 addition & 1 deletion log/zap/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/dobyte/due/log/zap
go 1.16

require (
github.com/dobyte/due v0.0.21
github.com/dobyte/due v0.0.22
go.uber.org/zap v1.22.0
)

Expand Down
2 changes: 1 addition & 1 deletion network/tcp/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ module github.com/dobyte/due/network/tcp

go 1.16

require github.com/dobyte/due v0.0.21
require github.com/dobyte/due v0.0.22

replace github.com/dobyte/due => ../../
2 changes: 1 addition & 1 deletion network/ws/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/dobyte/due/network/ws
go 1.16

require (
github.com/dobyte/due v0.0.21
github.com/dobyte/due v0.0.22
github.com/gorilla/websocket v1.5.0
)

Expand Down
2 changes: 1 addition & 1 deletion registry/consul/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/dobyte/due/registry/consul
go 1.16

require (
github.com/dobyte/due v0.0.21
github.com/dobyte/due v0.0.22
github.com/hashicorp/consul/api v1.13.0
)

Expand Down
2 changes: 1 addition & 1 deletion registry/etcd/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/dobyte/due/registry/etcd
go 1.16

require (
github.com/dobyte/due v0.0.21
github.com/dobyte/due v0.0.22
go.etcd.io/etcd/api/v3 v3.5.4
go.etcd.io/etcd/client/v3 v3.5.4
)
Expand Down
Loading

0 comments on commit 9cb7ae5

Please sign in to comment.