From ea87fd943ea9ab5ae999837d9f284c880735b8a8 Mon Sep 17 00:00:00 2001 From: notacommonperson <1932049002@qq.com> Date: Fri, 26 Jan 2024 23:18:29 +0800 Subject: [PATCH 01/17] feat: support feeling client disconnetion --- go.mod | 2 + go.sum | 4 +- pkg/app/server/option.go | 18 ++++++++ pkg/app/server/option_test.go | 3 ++ pkg/common/config/option.go | 4 ++ pkg/common/config/option_test.go | 1 + pkg/network/netpoll/transport.go | 59 ++++++++++++++++++--------- pkg/network/netpoll/transport_test.go | 45 ++++++++++++++++++++ 8 files changed, 114 insertions(+), 22 deletions(-) diff --git a/go.mod b/go.mod index 3619b833d..36eb3a559 100644 --- a/go.mod +++ b/go.mod @@ -14,3 +14,5 @@ require ( golang.org/x/sys v0.0.0-20220412211240-33da011f77ad google.golang.org/protobuf v1.27.1 ) + +replace github.com/cloudwego/netpoll v0.5.0 => github.com/joway/netpoll v0.0.4-0.20240122121337-a95f4a19673e diff --git a/go.sum b/go.sum index 3cf3c2f96..c38186d28 100644 --- a/go.sum +++ b/go.sum @@ -10,8 +10,6 @@ github.com/bytedance/sonic v1.8.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZX github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY= github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams= github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk= -github.com/cloudwego/netpoll v0.5.0 h1:oRrOp58cPCvK2QbMozZNDESvrxQaEHW2dCimmwH1lcU= -github.com/cloudwego/netpoll v0.5.0/go.mod h1:xVefXptcyheopwNDZjDPcfU6kIjZXZ4nY550k1yH9eQ= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -29,6 +27,8 @@ github.com/henrylee2cn/ameda v1.4.10 h1:JdvI2Ekq7tapdPsuhrc4CaFiqw6QXFvZIULWJgQy github.com/henrylee2cn/ameda v1.4.10/go.mod h1:liZulR8DgHxdK+MEwvZIylGnmcjzQ6N6f2PlWe7nEO4= github.com/henrylee2cn/goutil v0.0.0-20210127050712-89660552f6f8 h1:yE9ULgp02BhYIrO6sdV/FPe0xQM6fNHkVQW2IAymfM0= github.com/henrylee2cn/goutil v0.0.0-20210127050712-89660552f6f8/go.mod h1:Nhe/DM3671a5udlv2AdV2ni/MZzgfv2qrPL5nIi3EGQ= +github.com/joway/netpoll v0.0.4-0.20240122121337-a95f4a19673e h1:o5RYe79HKcK3Vh6WM9Aag+KUWVdu+spRyfpfXMq2hQc= +github.com/joway/netpoll v0.0.4-0.20240122121337-a95f4a19673e/go.mod h1:xVefXptcyheopwNDZjDPcfU6kIjZXZ4nY550k1yH9eQ= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= diff --git a/pkg/app/server/option.go b/pkg/app/server/option.go index e7970348c..bf2746e7e 100644 --- a/pkg/app/server/option.go +++ b/pkg/app/server/option.go @@ -332,6 +332,24 @@ func WithDisablePrintRoute(b bool) config.Option { }} } +// WithSenseClientDisconnection sets the ability to sense client disconnections. +// If we don't set it, it will default to false. +// There are three issues to note when using this option: +// 1. It only applies to netpoll. +// 2. It needs to be used in conjunction with WithOnConnect,which will return a canceled context when peer closed. +// Examples: +// server.Default( +// server.WithSenseClientDisconnection(true), +// server.WithOnConnect(func(ctx context.Context, conn network.Conn) context.Context { +// return ctx +// })) +// 3. The cost is high after opening, please choose carefully. +func WithSenseClientDisconnection(b bool) config.Option { + return config.Option{F: func(o *config.Options) { + o.SenseClientDisconnection = b + }} +} + // WithOnAccept sets the callback function when a new connection is accepted but cannot // receive data in netpoll. In go net, it will be called before converting tls connection func WithOnAccept(fn func(conn net.Conn) context.Context) config.Option { diff --git a/pkg/app/server/option_test.go b/pkg/app/server/option_test.go index f5d7f7b32..bc43e7cd2 100644 --- a/pkg/app/server/option_test.go +++ b/pkg/app/server/option_test.go @@ -61,6 +61,7 @@ func TestOptions(t *testing.T) { WithBasePath("/"), WithMaxRequestBodySize(2), WithDisablePrintRoute(true), + WithSenseClientDisconnection(true), WithNetwork("unix"), WithExitWaitTime(time.Second), WithMaxKeepBodySize(500), @@ -93,6 +94,7 @@ func TestOptions(t *testing.T) { assert.DeepEqual(t, opt.BasePath, "/") assert.DeepEqual(t, opt.MaxRequestBodySize, 2) assert.DeepEqual(t, opt.DisablePrintRoute, true) + assert.DeepEqual(t, opt.SenseClientDisconnection, true) assert.DeepEqual(t, opt.Network, "unix") assert.DeepEqual(t, opt.ExitWaitTimeout, time.Second) assert.DeepEqual(t, opt.MaxKeepBodySize, 500) @@ -130,6 +132,7 @@ func TestDefaultOptions(t *testing.T) { assert.DeepEqual(t, opt.GetOnly, false) assert.DeepEqual(t, opt.DisableKeepalive, false) assert.DeepEqual(t, opt.DisablePrintRoute, false) + assert.DeepEqual(t, opt.SenseClientDisconnection, false) assert.DeepEqual(t, opt.Network, "tcp") assert.DeepEqual(t, opt.ExitWaitTimeout, time.Second*5) assert.DeepEqual(t, opt.MaxKeepBodySize, 4*1024*1024) diff --git a/pkg/common/config/option.go b/pkg/common/config/option.go index 417955fc9..958d9b3a3 100644 --- a/pkg/common/config/option.go +++ b/pkg/common/config/option.go @@ -63,6 +63,7 @@ type Options struct { StreamRequestBody bool NoDefaultServerHeader bool DisablePrintRoute bool + SenseClientDisconnection bool Network string Addr string BasePath string @@ -203,6 +204,9 @@ func NewOptions(opts []Option) *Options { // Disabled when set to True DisablePrintRoute: false, + // The ability to sense client disconnection is disabled by default + SenseClientDisconnection: false, + // "tcp", "udp", "unix"(unix domain socket) Network: defaultNetwork, diff --git a/pkg/common/config/option_test.go b/pkg/common/config/option_test.go index 67fcab796..b836a3a1a 100644 --- a/pkg/common/config/option_test.go +++ b/pkg/common/config/option_test.go @@ -39,6 +39,7 @@ func TestDefaultOptions(t *testing.T) { assert.False(t, options.RemoveExtraSlash) assert.True(t, options.UnescapePathValues) assert.False(t, options.DisablePreParseMultipartForm) + assert.False(t, options.SenseClientDisconnection) assert.DeepEqual(t, defaultNetwork, options.Network) assert.DeepEqual(t, defaultAddr, options.Addr) assert.DeepEqual(t, defaultMaxRequestBodySize, options.MaxRequestBodySize) diff --git a/pkg/network/netpoll/transport.go b/pkg/network/netpoll/transport.go index 17829cb83..6212271f7 100644 --- a/pkg/network/netpoll/transport.go +++ b/pkg/network/netpoll/transport.go @@ -38,31 +38,34 @@ func init() { type transporter struct { sync.RWMutex - network string - addr string - keepAliveTimeout time.Duration - readTimeout time.Duration - writeTimeout time.Duration - listener net.Listener - eventLoop netpoll.EventLoop - listenConfig *net.ListenConfig - OnAccept func(conn net.Conn) context.Context - OnConnect func(ctx context.Context, conn network.Conn) context.Context + senseClientDisconnection bool + network string + addr string + keepAliveTimeout time.Duration + readTimeout time.Duration + writeTimeout time.Duration + listener net.Listener + eventLoop netpoll.EventLoop + listenConfig *net.ListenConfig + OnAccept func(conn net.Conn) context.Context + OnConnect func(ctx context.Context, conn network.Conn) context.Context + OnDisconnect func(ctx context.Context, conn network.Conn) } // For transporter switch func NewTransporter(options *config.Options) network.Transporter { return &transporter{ - network: options.Network, - addr: options.Addr, - keepAliveTimeout: options.KeepAliveTimeout, - readTimeout: options.ReadTimeout, - writeTimeout: options.WriteTimeout, - listener: nil, - eventLoop: nil, - listenConfig: options.ListenConfig, - OnAccept: options.OnAccept, - OnConnect: options.OnConnect, + senseClientDisconnection: options.SenseClientDisconnection, + network: options.Network, + addr: options.Addr, + keepAliveTimeout: options.KeepAliveTimeout, + readTimeout: options.ReadTimeout, + writeTimeout: options.WriteTimeout, + listener: nil, + eventLoop: nil, + listenConfig: options.ListenConfig, + OnAccept: options.OnAccept, + OnConnect: options.OnConnect, } } @@ -95,12 +98,28 @@ func (t *transporter) ListenAndServe(onReq network.OnData) (err error) { }), } + type cKey string + const ctxKey cKey = "ctxKey" if t.OnConnect != nil { opts = append(opts, netpoll.WithOnConnect(func(ctx context.Context, conn netpoll.Connection) context.Context { + if t.senseClientDisconnection { + ctx, cancel := context.WithCancel(ctx) + t.OnConnect(ctx, newConn(conn)) + return context.WithValue(ctx, ctxKey, cancel) + } return t.OnConnect(ctx, newConn(conn)) })) } + if t.senseClientDisconnection { + opts = append(opts, netpoll.WithOnDisconnect(func(ctx context.Context, connection netpoll.Connection) { + cancelFunc, _ := ctx.Value(ctxKey).(context.CancelFunc) + if cancelFunc != nil { + cancelFunc() + } + })) + } + // Create EventLoop t.Lock() t.eventLoop, err = netpoll.NewEventLoop(func(ctx context.Context, connection netpoll.Connection) error { diff --git a/pkg/network/netpoll/transport_test.go b/pkg/network/netpoll/transport_test.go index d8a06090c..30df9a891 100644 --- a/pkg/network/netpoll/transport_test.go +++ b/pkg/network/netpoll/transport_test.go @@ -21,6 +21,7 @@ package netpoll import ( "context" "net" + "sync" "sync/atomic" "syscall" "testing" @@ -69,6 +70,50 @@ func TestTransport(t *testing.T) { assert.Assert(t, atomic.LoadInt32(&onDataFlag) == 1) }) + t.Run("TestSenseClientDisconnection", func(t *testing.T) { + var onConnFlag int32 + var mu sync.Mutex + var ctxVal context.Context + transporter := NewTransporter(&config.Options{ + Addr: addr, + Network: nw, + OnConnect: func(ctx context.Context, conn network.Conn) context.Context { + atomic.StoreInt32(&onConnFlag, 1) + mu.Lock() + defer mu.Unlock() + ctxVal = ctx + return ctx + }, + SenseClientDisconnection: true, + }) + + go transporter.ListenAndServe(func(ctx context.Context, conn interface{}) error { + return nil + }) + defer transporter.Close() + time.Sleep(100 * time.Millisecond) + + dial := NewDialer() + conn, err := dial.DialConnection(nw, addr, time.Second, nil) + assert.Nil(t, err) + _, err = conn.Write([]byte("123")) + assert.Nil(t, err) + time.Sleep(100 * time.Millisecond) + + assert.Assert(t, atomic.LoadInt32(&onConnFlag) == 1) + + mu.Lock() + assert.Nil(t, ctxVal.Err()) + mu.Unlock() + + err = conn.Close() + assert.Nil(t, err) + time.Sleep(100 * time.Millisecond) + mu.Lock() + defer mu.Unlock() + assert.DeepEqual(t, context.Canceled, ctxVal.Err()) + }) + t.Run("TestListenConfig", func(t *testing.T) { listenCfg := &net.ListenConfig{Control: func(network, address string, c syscall.RawConn) error { return c.Control(func(fd uintptr) { From ccb77fcd3a242ef4918c823aea398137f0aa269d Mon Sep 17 00:00:00 2001 From: Ricky-chen1 Date: Sun, 28 Jan 2024 17:10:16 +0800 Subject: [PATCH 02/17] test: add TestWithSenseClientDisconnection func --- pkg/app/server/hertz_test.go | 41 ++++++++++++++++++++++++++++++++ pkg/app/server/option.go | 7 +++++- pkg/network/netpoll/transport.go | 3 +-- 3 files changed, 48 insertions(+), 3 deletions(-) diff --git a/pkg/app/server/hertz_test.go b/pkg/app/server/hertz_test.go index 4a9cee3d0..af8e02f70 100644 --- a/pkg/app/server/hertz_test.go +++ b/pkg/app/server/hertz_test.go @@ -1092,3 +1092,44 @@ func TestWithDisableDefaultContentType(t *testing.T) { r, _ := hc.Get("http://127.0.0.1:8324") //nolint:errcheck assert.DeepEqual(t, "", r.Header.Get("Content-Type")) } + +type closeConnectionTransporter struct{} + +func (tr *closeConnectionTransporter) RoundTrip(req *http.Request) (*http.Response, error) { + resp, err := http.DefaultTransport.RoundTrip(req) + if resp != nil && resp.Body != nil { + resp.Body.Close() + } + return resp, err +} + +func TestWithSenseClientDisconnection(t *testing.T) { + var ctxVal context.Context + var mu sync.Mutex + h := New( + WithHostPorts("localhost:8327"), + WithSenseClientDisconnection(true), + WithOnConnect(func(ctx context.Context, conn network.Conn) context.Context { + mu.Lock() + defer mu.Unlock() + ctxVal = ctx + return ctx + })) + go h.Spin() + time.Sleep(100 * time.Millisecond) + + h.GET("/", func(c context.Context, ctx *app.RequestContext) { + ctx.Response.AppendBodyString("test") + }) + + hc := http.Client{ + Timeout: time.Second, + Transport: &closeConnectionTransporter{}, + } + hc.Get("http://127.0.0.1:8327") + time.Sleep(100 * time.Millisecond) + + mu.Lock() + defer mu.Unlock() + assert.DeepEqual(t, context.Canceled, ctxVal.Err()) +} diff --git a/pkg/app/server/option.go b/pkg/app/server/option.go index bf2746e7e..3e71507e8 100644 --- a/pkg/app/server/option.go +++ b/pkg/app/server/option.go @@ -336,11 +336,16 @@ func WithDisablePrintRoute(b bool) config.Option { // If we don't set it, it will default to false. // There are three issues to note when using this option: // 1. It only applies to netpoll. -// 2. It needs to be used in conjunction with WithOnConnect,which will return a canceled context when peer closed. +// 2. It needs to be used in conjunction with WithOnConnect,whose context will be canceled when peer closed. // Examples: +// var ctxVal context.Context +// var mu sync.Mutex // server.Default( // server.WithSenseClientDisconnection(true), // server.WithOnConnect(func(ctx context.Context, conn network.Conn) context.Context { +// mu.Lock() +// defer mu.Unlock() +// ctxVal = ctx // ctxVal will be canceled when SenseClientDisconnection is true // return ctx // })) // 3. The cost is high after opening, please choose carefully. diff --git a/pkg/network/netpoll/transport.go b/pkg/network/netpoll/transport.go index 6212271f7..7647c5cd4 100644 --- a/pkg/network/netpoll/transport.go +++ b/pkg/network/netpoll/transport.go @@ -98,8 +98,7 @@ func (t *transporter) ListenAndServe(onReq network.OnData) (err error) { }), } - type cKey string - const ctxKey cKey = "ctxKey" + const ctxKey = "ctxKey" if t.OnConnect != nil { opts = append(opts, netpoll.WithOnConnect(func(ctx context.Context, conn netpoll.Connection) context.Context { if t.senseClientDisconnection { From a25bf3034da63f9e7e30e0230307085881af2075 Mon Sep 17 00:00:00 2001 From: Ricky-chen1 Date: Sun, 28 Jan 2024 17:33:52 +0800 Subject: [PATCH 03/17] chore: modify comment --- pkg/app/server/option.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/app/server/option.go b/pkg/app/server/option.go index 3e71507e8..343e8cefc 100644 --- a/pkg/app/server/option.go +++ b/pkg/app/server/option.go @@ -348,7 +348,6 @@ func WithDisablePrintRoute(b bool) config.Option { // ctxVal = ctx // ctxVal will be canceled when SenseClientDisconnection is true // return ctx // })) -// 3. The cost is high after opening, please choose carefully. func WithSenseClientDisconnection(b bool) config.Option { return config.Option{F: func(o *config.Options) { o.SenseClientDisconnection = b From 6174ad62615ee9000ed30389932d153982b4f7f5 Mon Sep 17 00:00:00 2001 From: Ricky-chen1 Date: Sun, 28 Jan 2024 18:42:57 +0800 Subject: [PATCH 04/17] test: modify TestWithSenseClientDisconnetion func --- pkg/app/server/hertz_test.go | 10 ++++------ pkg/app/server/option.go | 14 +++----------- pkg/network/netpoll/transport.go | 12 +++++------- pkg/network/netpoll/transport_test.go | 22 ++++++++++------------ 4 files changed, 22 insertions(+), 36 deletions(-) diff --git a/pkg/app/server/hertz_test.go b/pkg/app/server/hertz_test.go index af8e02f70..ed468dd4e 100644 --- a/pkg/app/server/hertz_test.go +++ b/pkg/app/server/hertz_test.go @@ -1109,17 +1109,15 @@ func TestWithSenseClientDisconnection(t *testing.T) { h := New( WithHostPorts("localhost:8327"), WithSenseClientDisconnection(true), - WithOnConnect(func(ctx context.Context, conn network.Conn) context.Context { - mu.Lock() - defer mu.Unlock() - ctxVal = ctx - return ctx - })) + ) go h.Spin() time.Sleep(100 * time.Millisecond) h.GET("/", func(c context.Context, ctx *app.RequestContext) { ctx.Response.AppendBodyString("test") + mu.Lock() + defer mu.Unlock() + ctxVal = c }) hc := http.Client{ diff --git a/pkg/app/server/option.go b/pkg/app/server/option.go index 343e8cefc..3ab4f7a0a 100644 --- a/pkg/app/server/option.go +++ b/pkg/app/server/option.go @@ -334,20 +334,12 @@ func WithDisablePrintRoute(b bool) config.Option { // WithSenseClientDisconnection sets the ability to sense client disconnections. // If we don't set it, it will default to false. -// There are three issues to note when using this option: +// There are two issue to note when using this option: // 1. It only applies to netpoll. -// 2. It needs to be used in conjunction with WithOnConnect,whose context will be canceled when peer closed. -// Examples: -// var ctxVal context.Context -// var mu sync.Mutex +// 2. Example: // server.Default( // server.WithSenseClientDisconnection(true), -// server.WithOnConnect(func(ctx context.Context, conn network.Conn) context.Context { -// mu.Lock() -// defer mu.Unlock() -// ctxVal = ctx // ctxVal will be canceled when SenseClientDisconnection is true -// return ctx -// })) +// ) func WithSenseClientDisconnection(b bool) config.Option { return config.Option{F: func(o *config.Options) { o.SenseClientDisconnection = b diff --git a/pkg/network/netpoll/transport.go b/pkg/network/netpoll/transport.go index 7647c5cd4..a1e0eff16 100644 --- a/pkg/network/netpoll/transport.go +++ b/pkg/network/netpoll/transport.go @@ -98,20 +98,18 @@ func (t *transporter) ListenAndServe(onReq network.OnData) (err error) { }), } - const ctxKey = "ctxKey" if t.OnConnect != nil { opts = append(opts, netpoll.WithOnConnect(func(ctx context.Context, conn netpoll.Connection) context.Context { - if t.senseClientDisconnection { - ctx, cancel := context.WithCancel(ctx) - t.OnConnect(ctx, newConn(conn)) - return context.WithValue(ctx, ctxKey, cancel) - } return t.OnConnect(ctx, newConn(conn)) })) } + const ctxKey = "ctxKey" if t.senseClientDisconnection { - opts = append(opts, netpoll.WithOnDisconnect(func(ctx context.Context, connection netpoll.Connection) { + opts = append(opts, netpoll.WithOnConnect(func(ctx context.Context, connection netpoll.Connection) context.Context { + ctx, cancel := context.WithCancel(ctx) + return context.WithValue(ctx, ctxKey, cancel) + }), netpoll.WithOnDisconnect(func(ctx context.Context, connection netpoll.Connection) { cancelFunc, _ := ctx.Value(ctxKey).(context.CancelFunc) if cancelFunc != nil { cancelFunc() diff --git a/pkg/network/netpoll/transport_test.go b/pkg/network/netpoll/transport_test.go index 30df9a891..e7754120d 100644 --- a/pkg/network/netpoll/transport_test.go +++ b/pkg/network/netpoll/transport_test.go @@ -71,23 +71,20 @@ func TestTransport(t *testing.T) { }) t.Run("TestSenseClientDisconnection", func(t *testing.T) { - var onConnFlag int32 - var mu sync.Mutex + var onReqFlag int32 var ctxVal context.Context + var mu sync.Mutex transporter := NewTransporter(&config.Options{ - Addr: addr, - Network: nw, - OnConnect: func(ctx context.Context, conn network.Conn) context.Context { - atomic.StoreInt32(&onConnFlag, 1) - mu.Lock() - defer mu.Unlock() - ctxVal = ctx - return ctx - }, + Addr: addr, + Network: nw, SenseClientDisconnection: true, }) go transporter.ListenAndServe(func(ctx context.Context, conn interface{}) error { + atomic.StoreInt32(&onReqFlag, 1) + mu.Lock() + defer mu.Unlock() + ctxVal = ctx return nil }) defer transporter.Close() @@ -100,7 +97,7 @@ func TestTransport(t *testing.T) { assert.Nil(t, err) time.Sleep(100 * time.Millisecond) - assert.Assert(t, atomic.LoadInt32(&onConnFlag) == 1) + assert.Assert(t, atomic.LoadInt32(&onReqFlag) == 1) mu.Lock() assert.Nil(t, ctxVal.Err()) @@ -109,6 +106,7 @@ func TestTransport(t *testing.T) { err = conn.Close() assert.Nil(t, err) time.Sleep(100 * time.Millisecond) + mu.Lock() defer mu.Unlock() assert.DeepEqual(t, context.Canceled, ctxVal.Err()) From cb66c44d52ab29160202975297da4aa6302d0029 Mon Sep 17 00:00:00 2001 From: Ricky-chen1 Date: Tue, 30 Jan 2024 15:00:47 +0800 Subject: [PATCH 05/17] test: modify TestWithSenseClientDisconnection func --- pkg/app/server/hertz_test.go | 54 ++++++++++++++------------- pkg/app/server/option.go | 14 ++++--- pkg/network/netpoll/transport.go | 1 - pkg/network/netpoll/transport_test.go | 20 ++-------- 4 files changed, 40 insertions(+), 49 deletions(-) diff --git a/pkg/app/server/hertz_test.go b/pkg/app/server/hertz_test.go index ed468dd4e..9690c57e7 100644 --- a/pkg/app/server/hertz_test.go +++ b/pkg/app/server/hertz_test.go @@ -43,6 +43,7 @@ import ( "github.com/cloudwego/hertz/pkg/common/test/mock" "github.com/cloudwego/hertz/pkg/common/utils" "github.com/cloudwego/hertz/pkg/network" + "github.com/cloudwego/hertz/pkg/network/netpoll" "github.com/cloudwego/hertz/pkg/network/standard" "github.com/cloudwego/hertz/pkg/protocol" "github.com/cloudwego/hertz/pkg/protocol/consts" @@ -1093,41 +1094,44 @@ func TestWithDisableDefaultContentType(t *testing.T) { assert.DeepEqual(t, "", r.Header.Get("Content-Type")) } -type closeConnectionTransporter struct{} - -func (tr *closeConnectionTransporter) RoundTrip(req *http.Request) (*http.Response, error) { - resp, err := http.DefaultTransport.RoundTrip(req) - if resp != nil && resp.Body != nil { - resp.Body.Close() - } - return resp, err -} - func TestWithSenseClientDisconnection(t *testing.T) { - var ctxVal context.Context - var mu sync.Mutex h := New( WithHostPorts("localhost:8327"), WithSenseClientDisconnection(true), ) + var wg sync.WaitGroup + wg.Add(1) + h.GET("/test", func(c context.Context, ctx *app.RequestContext) { + defer wg.Done() + select { + case <-c.Done(): + return + case <-time.After(time.Second): + t.Fatal("cancel context failed") + } + }) go h.Spin() time.Sleep(100 * time.Millisecond) - h.GET("/", func(c context.Context, ctx *app.RequestContext) { - ctx.Response.AppendBodyString("test") - mu.Lock() - defer mu.Unlock() - ctxVal = c - }) - + dail := netpoll.NewDialer() + conn, err := dail.DialConnection("tcp", "127.0.0.1:8327", 0, nil) + assert.Nil(t, err) + tr := &http.Transport{ + DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { + return conn, nil + }, + } hc := http.Client{ Timeout: time.Second, - Transport: &closeConnectionTransporter{}, + Transport: tr, } - hc.Get("http://127.0.0.1:8327") - time.Sleep(100 * time.Millisecond) - mu.Lock() - defer mu.Unlock() - assert.DeepEqual(t, context.Canceled, ctxVal.Err()) + go func() { + _, err := hc.Get("http://127.0.0.1:8327/test") + assert.NotNil(t, err) + }() + time.Sleep(100 * time.Millisecond) + err = conn.Close() + assert.Nil(t, err) + wg.Wait() } diff --git a/pkg/app/server/option.go b/pkg/app/server/option.go index 3ab4f7a0a..e1d93b9c2 100644 --- a/pkg/app/server/option.go +++ b/pkg/app/server/option.go @@ -334,12 +334,14 @@ func WithDisablePrintRoute(b bool) config.Option { // WithSenseClientDisconnection sets the ability to sense client disconnections. // If we don't set it, it will default to false. -// There are two issue to note when using this option: -// 1. It only applies to netpoll. -// 2. Example: -// server.Default( -// server.WithSenseClientDisconnection(true), -// ) +// There are two issues to note when using this option: +// 1. Warning: It only applies to netpoll. +// 2. After opening, the context.Context in the request will be cancelled. +// +// Example: +// server.Default( +// server.WithSenseClientDisconnection(true), +// ) func WithSenseClientDisconnection(b bool) config.Option { return config.Option{F: func(o *config.Options) { o.SenseClientDisconnection = b diff --git a/pkg/network/netpoll/transport.go b/pkg/network/netpoll/transport.go index a1e0eff16..6402bd146 100644 --- a/pkg/network/netpoll/transport.go +++ b/pkg/network/netpoll/transport.go @@ -49,7 +49,6 @@ type transporter struct { listenConfig *net.ListenConfig OnAccept func(conn net.Conn) context.Context OnConnect func(ctx context.Context, conn network.Conn) context.Context - OnDisconnect func(ctx context.Context, conn network.Conn) } // For transporter switch diff --git a/pkg/network/netpoll/transport_test.go b/pkg/network/netpoll/transport_test.go index e7754120d..53239209e 100644 --- a/pkg/network/netpoll/transport_test.go +++ b/pkg/network/netpoll/transport_test.go @@ -21,7 +21,6 @@ package netpoll import ( "context" "net" - "sync" "sync/atomic" "syscall" "testing" @@ -72,8 +71,6 @@ func TestTransport(t *testing.T) { t.Run("TestSenseClientDisconnection", func(t *testing.T) { var onReqFlag int32 - var ctxVal context.Context - var mu sync.Mutex transporter := NewTransporter(&config.Options{ Addr: addr, Network: nw, @@ -82,9 +79,8 @@ func TestTransport(t *testing.T) { go transporter.ListenAndServe(func(ctx context.Context, conn interface{}) error { atomic.StoreInt32(&onReqFlag, 1) - mu.Lock() - defer mu.Unlock() - ctxVal = ctx + time.Sleep(100 * time.Millisecond) + assert.DeepEqual(t, context.Canceled, ctx.Err()) return nil }) defer transporter.Close() @@ -95,21 +91,11 @@ func TestTransport(t *testing.T) { assert.Nil(t, err) _, err = conn.Write([]byte("123")) assert.Nil(t, err) - time.Sleep(100 * time.Millisecond) - - assert.Assert(t, atomic.LoadInt32(&onReqFlag) == 1) - - mu.Lock() - assert.Nil(t, ctxVal.Err()) - mu.Unlock() - err = conn.Close() assert.Nil(t, err) time.Sleep(100 * time.Millisecond) - mu.Lock() - defer mu.Unlock() - assert.DeepEqual(t, context.Canceled, ctxVal.Err()) + assert.Assert(t, atomic.LoadInt32(&onReqFlag) == 1) }) t.Run("TestListenConfig", func(t *testing.T) { From c63f652efbce5d130a43ad0fc706b9a5106e1449 Mon Sep 17 00:00:00 2001 From: Ricky-chen1 Date: Thu, 8 Feb 2024 20:35:35 +0800 Subject: [PATCH 06/17] fix: ctx race when disconnect callback run with connect callback --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 36eb3a559..ec9640d56 100644 --- a/go.mod +++ b/go.mod @@ -15,4 +15,4 @@ require ( google.golang.org/protobuf v1.27.1 ) -replace github.com/cloudwego/netpoll v0.5.0 => github.com/joway/netpoll v0.0.4-0.20240122121337-a95f4a19673e +replace github.com/cloudwego/netpoll v0.5.0 => github.com/joway/netpoll v0.0.4-0.20240207064408-7486eebf98df diff --git a/go.sum b/go.sum index c38186d28..cb62bc0be 100644 --- a/go.sum +++ b/go.sum @@ -27,8 +27,8 @@ github.com/henrylee2cn/ameda v1.4.10 h1:JdvI2Ekq7tapdPsuhrc4CaFiqw6QXFvZIULWJgQy github.com/henrylee2cn/ameda v1.4.10/go.mod h1:liZulR8DgHxdK+MEwvZIylGnmcjzQ6N6f2PlWe7nEO4= github.com/henrylee2cn/goutil v0.0.0-20210127050712-89660552f6f8 h1:yE9ULgp02BhYIrO6sdV/FPe0xQM6fNHkVQW2IAymfM0= github.com/henrylee2cn/goutil v0.0.0-20210127050712-89660552f6f8/go.mod h1:Nhe/DM3671a5udlv2AdV2ni/MZzgfv2qrPL5nIi3EGQ= -github.com/joway/netpoll v0.0.4-0.20240122121337-a95f4a19673e h1:o5RYe79HKcK3Vh6WM9Aag+KUWVdu+spRyfpfXMq2hQc= -github.com/joway/netpoll v0.0.4-0.20240122121337-a95f4a19673e/go.mod h1:xVefXptcyheopwNDZjDPcfU6kIjZXZ4nY550k1yH9eQ= +github.com/joway/netpoll v0.0.4-0.20240207064408-7486eebf98df h1:rYQQQibbNpf1uDSPYpHJSai/0TsLLJ/74IHb5fQ9WWs= +github.com/joway/netpoll v0.0.4-0.20240207064408-7486eebf98df/go.mod h1:xVefXptcyheopwNDZjDPcfU6kIjZXZ4nY550k1yH9eQ= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= From 7d8a8c01c244b8d231213dce3267ce156f908c3b Mon Sep 17 00:00:00 2001 From: kinggo Date: Mon, 12 Feb 2024 16:48:53 +0800 Subject: [PATCH 07/17] feat: add cancelContext --- go.mod | 4 +-- go.sum | 4 +-- pkg/app/server/hertz_test.go | 43 ------------------------- pkg/app/server/hertz_unix_test.go | 53 +++++++++++++++++++++++++++++++ pkg/app/server/option.go | 32 +++++++++---------- pkg/network/netpoll/transport.go | 26 +++++++++------ pkg/protocol/http1/proxy/proxy.go | 2 -- 7 files changed, 89 insertions(+), 75 deletions(-) diff --git a/go.mod b/go.mod index ec9640d56..8530b25a9 100644 --- a/go.mod +++ b/go.mod @@ -7,12 +7,10 @@ require ( github.com/bytedance/gopkg v0.0.0-20220413063733-65bf48ffb3a7 github.com/bytedance/mockey v1.2.1 github.com/bytedance/sonic v1.8.1 - github.com/cloudwego/netpoll v0.5.0 + github.com/cloudwego/netpoll v0.5.2-0.20240220090456-7ba622bf763b github.com/fsnotify/fsnotify v1.5.4 github.com/tidwall/gjson v1.14.4 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/sys v0.0.0-20220412211240-33da011f77ad google.golang.org/protobuf v1.27.1 ) - -replace github.com/cloudwego/netpoll v0.5.0 => github.com/joway/netpoll v0.0.4-0.20240207064408-7486eebf98df diff --git a/go.sum b/go.sum index cb62bc0be..c043792ae 100644 --- a/go.sum +++ b/go.sum @@ -10,6 +10,8 @@ github.com/bytedance/sonic v1.8.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZX github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY= github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams= github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk= +github.com/cloudwego/netpoll v0.5.2-0.20240220090456-7ba622bf763b h1:ZHtA1Q20H9WoLPfMHCSkMv8wUrN7YENJfQCVybErGy8= +github.com/cloudwego/netpoll v0.5.2-0.20240220090456-7ba622bf763b/go.mod h1:xVefXptcyheopwNDZjDPcfU6kIjZXZ4nY550k1yH9eQ= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -27,8 +29,6 @@ github.com/henrylee2cn/ameda v1.4.10 h1:JdvI2Ekq7tapdPsuhrc4CaFiqw6QXFvZIULWJgQy github.com/henrylee2cn/ameda v1.4.10/go.mod h1:liZulR8DgHxdK+MEwvZIylGnmcjzQ6N6f2PlWe7nEO4= github.com/henrylee2cn/goutil v0.0.0-20210127050712-89660552f6f8 h1:yE9ULgp02BhYIrO6sdV/FPe0xQM6fNHkVQW2IAymfM0= github.com/henrylee2cn/goutil v0.0.0-20210127050712-89660552f6f8/go.mod h1:Nhe/DM3671a5udlv2AdV2ni/MZzgfv2qrPL5nIi3EGQ= -github.com/joway/netpoll v0.0.4-0.20240207064408-7486eebf98df h1:rYQQQibbNpf1uDSPYpHJSai/0TsLLJ/74IHb5fQ9WWs= -github.com/joway/netpoll v0.0.4-0.20240207064408-7486eebf98df/go.mod h1:xVefXptcyheopwNDZjDPcfU6kIjZXZ4nY550k1yH9eQ= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= diff --git a/pkg/app/server/hertz_test.go b/pkg/app/server/hertz_test.go index 9690c57e7..4a9cee3d0 100644 --- a/pkg/app/server/hertz_test.go +++ b/pkg/app/server/hertz_test.go @@ -43,7 +43,6 @@ import ( "github.com/cloudwego/hertz/pkg/common/test/mock" "github.com/cloudwego/hertz/pkg/common/utils" "github.com/cloudwego/hertz/pkg/network" - "github.com/cloudwego/hertz/pkg/network/netpoll" "github.com/cloudwego/hertz/pkg/network/standard" "github.com/cloudwego/hertz/pkg/protocol" "github.com/cloudwego/hertz/pkg/protocol/consts" @@ -1093,45 +1092,3 @@ func TestWithDisableDefaultContentType(t *testing.T) { r, _ := hc.Get("http://127.0.0.1:8324") //nolint:errcheck assert.DeepEqual(t, "", r.Header.Get("Content-Type")) } - -func TestWithSenseClientDisconnection(t *testing.T) { - h := New( - WithHostPorts("localhost:8327"), - WithSenseClientDisconnection(true), - ) - var wg sync.WaitGroup - wg.Add(1) - h.GET("/test", func(c context.Context, ctx *app.RequestContext) { - defer wg.Done() - select { - case <-c.Done(): - return - case <-time.After(time.Second): - t.Fatal("cancel context failed") - } - }) - go h.Spin() - time.Sleep(100 * time.Millisecond) - - dail := netpoll.NewDialer() - conn, err := dail.DialConnection("tcp", "127.0.0.1:8327", 0, nil) - assert.Nil(t, err) - tr := &http.Transport{ - DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { - return conn, nil - }, - } - hc := http.Client{ - Timeout: time.Second, - Transport: tr, - } - - go func() { - _, err := hc.Get("http://127.0.0.1:8327/test") - assert.NotNil(t, err) - }() - time.Sleep(100 * time.Millisecond) - err = conn.Close() - assert.Nil(t, err) - wg.Wait() -} diff --git a/pkg/app/server/hertz_unix_test.go b/pkg/app/server/hertz_unix_test.go index b37ddfbdf..32fbd2ee5 100644 --- a/pkg/app/server/hertz_unix_test.go +++ b/pkg/app/server/hertz_unix_test.go @@ -34,6 +34,7 @@ import ( c "github.com/cloudwego/hertz/pkg/app/client" "github.com/cloudwego/hertz/pkg/common/test/assert" "github.com/cloudwego/hertz/pkg/common/utils" + "github.com/cloudwego/hertz/pkg/network" "github.com/cloudwego/hertz/pkg/network/standard" "github.com/cloudwego/hertz/pkg/protocol/consts" "golang.org/x/sys/unix" @@ -134,3 +135,55 @@ func TestHertz_Spin(t *testing.T) { <-ch2 } + +func TestWithSenseClientDisconnection(t *testing.T) { + var closeFlag int32 + h := New(WithHostPorts("127.0.0.1:6631"), WithSenseClientDisconnection(true)) + h.GET("/ping", func(c context.Context, ctx *app.RequestContext) { + assert.DeepEqual(t, "aa", string(ctx.Host())) + ch := make(chan struct{}) + select { + case <-c.Done(): + atomic.StoreInt32(&closeFlag, 1) + assert.DeepEqual(t, context.Canceled, c.Err()) + case <-ch: + } + }) + go h.Spin() + time.Sleep(time.Second) + con, err := net.Dial("tcp", "127.0.0.1:6631") + assert.Nil(t, err) + _, err = con.Write([]byte("GET /ping HTTP/1.1\r\nHost: aa\r\n\r\n")) + assert.Nil(t, err) + time.Sleep(time.Second) + assert.Nil(t, con.Close()) + time.Sleep(time.Second) + assert.DeepEqual(t, atomic.LoadInt32(&closeFlag), int32(1)) +} + +func TestWithSenseClientDisconnectionAndWithOnConnect(t *testing.T) { + var closeFlag int32 + h := New(WithHostPorts("127.0.0.1:6632"), WithSenseClientDisconnection(true), WithOnConnect(func(ctx context.Context, conn network.Conn) context.Context { + return ctx + })) + h.GET("/ping", func(c context.Context, ctx *app.RequestContext) { + assert.DeepEqual(t, "aa", string(ctx.Host())) + ch := make(chan struct{}) + select { + case <-c.Done(): + atomic.StoreInt32(&closeFlag, 1) + assert.DeepEqual(t, context.Canceled, c.Err()) + case <-ch: + } + }) + go h.Spin() + time.Sleep(time.Second) + con, err := net.Dial("tcp", "127.0.0.1:6632") + assert.Nil(t, err) + _, err = con.Write([]byte("GET /ping HTTP/1.1\r\nHost: aa\r\n\r\n")) + assert.Nil(t, err) + time.Sleep(time.Second) + assert.Nil(t, con.Close()) + time.Sleep(time.Second) + assert.DeepEqual(t, atomic.LoadInt32(&closeFlag), int32(1)) +} diff --git a/pkg/app/server/option.go b/pkg/app/server/option.go index e1d93b9c2..18f184379 100644 --- a/pkg/app/server/option.go +++ b/pkg/app/server/option.go @@ -332,22 +332,6 @@ func WithDisablePrintRoute(b bool) config.Option { }} } -// WithSenseClientDisconnection sets the ability to sense client disconnections. -// If we don't set it, it will default to false. -// There are two issues to note when using this option: -// 1. Warning: It only applies to netpoll. -// 2. After opening, the context.Context in the request will be cancelled. -// -// Example: -// server.Default( -// server.WithSenseClientDisconnection(true), -// ) -func WithSenseClientDisconnection(b bool) config.Option { - return config.Option{F: func(o *config.Options) { - o.SenseClientDisconnection = b - }} -} - // WithOnAccept sets the callback function when a new connection is accepted but cannot // receive data in netpoll. In go net, it will be called before converting tls connection func WithOnAccept(fn func(conn net.Conn) context.Context) config.Option { @@ -410,3 +394,19 @@ func WithDisableDefaultContentType(disable bool) config.Option { o.NoDefaultContentType = disable }} } + +// WithSenseClientDisconnection sets the ability to sense client disconnections. +// If we don't set it, it will default to false. +// There are two issues to note when using this option: +// 1. Warning: It only applies to netpoll. +// 2. After opening, the context.Context in the request will be cancelled. +// +// Example: +// server.Default( +// server.WithSenseClientDisconnection(true), +// ) +func WithSenseClientDisconnection(b bool) config.Option { + return config.Option{F: func(o *config.Options) { + o.SenseClientDisconnection = b + }} +} diff --git a/pkg/network/netpoll/transport.go b/pkg/network/netpoll/transport.go index 6402bd146..0b96328c1 100644 --- a/pkg/network/netpoll/transport.go +++ b/pkg/network/netpoll/transport.go @@ -36,6 +36,14 @@ func init() { netpoll.SetLoggerOutput(io.Discard) } +const ctxCancelKey = "ctxCancelKey" + +func cancelContext(ctx context.Context) context.Context { + ctx, cancel := context.WithCancel(ctx) + ctx = context.WithValue(ctx, ctxCancelKey, cancel) + return ctx +} + type transporter struct { sync.RWMutex senseClientDisconnection bool @@ -90,10 +98,14 @@ func (t *transporter) ListenAndServe(onReq network.OnData) (err error) { if t.writeTimeout > 0 { conn.SetWriteTimeout(t.writeTimeout) } + ctx := context.Background() if t.OnAccept != nil { - return t.OnAccept(newConn(conn)) + ctx = t.OnAccept(newConn(conn)) + } + if t.senseClientDisconnection { + ctx = cancelContext(ctx) } - return context.Background() + return ctx }), } @@ -103,14 +115,10 @@ func (t *transporter) ListenAndServe(onReq network.OnData) (err error) { })) } - const ctxKey = "ctxKey" if t.senseClientDisconnection { - opts = append(opts, netpoll.WithOnConnect(func(ctx context.Context, connection netpoll.Connection) context.Context { - ctx, cancel := context.WithCancel(ctx) - return context.WithValue(ctx, ctxKey, cancel) - }), netpoll.WithOnDisconnect(func(ctx context.Context, connection netpoll.Connection) { - cancelFunc, _ := ctx.Value(ctxKey).(context.CancelFunc) - if cancelFunc != nil { + opts = append(opts, netpoll.WithOnDisconnect(func(ctx context.Context, connection netpoll.Connection) { + cancelFunc, ok := ctx.Value(ctxCancelKey).(context.CancelFunc) + if cancelFunc != nil && ok { cancelFunc() } })) diff --git a/pkg/protocol/http1/proxy/proxy.go b/pkg/protocol/http1/proxy/proxy.go index f8bae7608..2b243ff04 100644 --- a/pkg/protocol/http1/proxy/proxy.go +++ b/pkg/protocol/http1/proxy/proxy.go @@ -81,13 +81,11 @@ func SetupProxy(conn network.Conn, addr string, proxyURI *protocol.URI, tlsConfi defer close(didReadResponse) err = reqI.Write(connectReq, conn) - if err != nil { return } err = conn.Flush() - if err != nil { return } From d0b40ea9dcc74e5b4e96ca6d8abb574d069820a6 Mon Sep 17 00:00:00 2001 From: notacommonperson <1932049002@qq.com> Date: Fri, 26 Jan 2024 23:18:29 +0800 Subject: [PATCH 08/17] feat: support feeling client disconnetion --- go.mod | 2 + go.sum | 4 +- pkg/app/server/option.go | 18 ++++++++ pkg/app/server/option_test.go | 3 ++ pkg/common/config/option.go | 4 ++ pkg/common/config/option_test.go | 1 + pkg/network/netpoll/transport.go | 59 ++++++++++++++++++--------- pkg/network/netpoll/transport_test.go | 45 ++++++++++++++++++++ 8 files changed, 114 insertions(+), 22 deletions(-) diff --git a/go.mod b/go.mod index 3619b833d..36eb3a559 100644 --- a/go.mod +++ b/go.mod @@ -14,3 +14,5 @@ require ( golang.org/x/sys v0.0.0-20220412211240-33da011f77ad google.golang.org/protobuf v1.27.1 ) + +replace github.com/cloudwego/netpoll v0.5.0 => github.com/joway/netpoll v0.0.4-0.20240122121337-a95f4a19673e diff --git a/go.sum b/go.sum index 3cf3c2f96..c38186d28 100644 --- a/go.sum +++ b/go.sum @@ -10,8 +10,6 @@ github.com/bytedance/sonic v1.8.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZX github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY= github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams= github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk= -github.com/cloudwego/netpoll v0.5.0 h1:oRrOp58cPCvK2QbMozZNDESvrxQaEHW2dCimmwH1lcU= -github.com/cloudwego/netpoll v0.5.0/go.mod h1:xVefXptcyheopwNDZjDPcfU6kIjZXZ4nY550k1yH9eQ= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -29,6 +27,8 @@ github.com/henrylee2cn/ameda v1.4.10 h1:JdvI2Ekq7tapdPsuhrc4CaFiqw6QXFvZIULWJgQy github.com/henrylee2cn/ameda v1.4.10/go.mod h1:liZulR8DgHxdK+MEwvZIylGnmcjzQ6N6f2PlWe7nEO4= github.com/henrylee2cn/goutil v0.0.0-20210127050712-89660552f6f8 h1:yE9ULgp02BhYIrO6sdV/FPe0xQM6fNHkVQW2IAymfM0= github.com/henrylee2cn/goutil v0.0.0-20210127050712-89660552f6f8/go.mod h1:Nhe/DM3671a5udlv2AdV2ni/MZzgfv2qrPL5nIi3EGQ= +github.com/joway/netpoll v0.0.4-0.20240122121337-a95f4a19673e h1:o5RYe79HKcK3Vh6WM9Aag+KUWVdu+spRyfpfXMq2hQc= +github.com/joway/netpoll v0.0.4-0.20240122121337-a95f4a19673e/go.mod h1:xVefXptcyheopwNDZjDPcfU6kIjZXZ4nY550k1yH9eQ= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= diff --git a/pkg/app/server/option.go b/pkg/app/server/option.go index e7970348c..bf2746e7e 100644 --- a/pkg/app/server/option.go +++ b/pkg/app/server/option.go @@ -332,6 +332,24 @@ func WithDisablePrintRoute(b bool) config.Option { }} } +// WithSenseClientDisconnection sets the ability to sense client disconnections. +// If we don't set it, it will default to false. +// There are three issues to note when using this option: +// 1. It only applies to netpoll. +// 2. It needs to be used in conjunction with WithOnConnect,which will return a canceled context when peer closed. +// Examples: +// server.Default( +// server.WithSenseClientDisconnection(true), +// server.WithOnConnect(func(ctx context.Context, conn network.Conn) context.Context { +// return ctx +// })) +// 3. The cost is high after opening, please choose carefully. +func WithSenseClientDisconnection(b bool) config.Option { + return config.Option{F: func(o *config.Options) { + o.SenseClientDisconnection = b + }} +} + // WithOnAccept sets the callback function when a new connection is accepted but cannot // receive data in netpoll. In go net, it will be called before converting tls connection func WithOnAccept(fn func(conn net.Conn) context.Context) config.Option { diff --git a/pkg/app/server/option_test.go b/pkg/app/server/option_test.go index f5d7f7b32..bc43e7cd2 100644 --- a/pkg/app/server/option_test.go +++ b/pkg/app/server/option_test.go @@ -61,6 +61,7 @@ func TestOptions(t *testing.T) { WithBasePath("/"), WithMaxRequestBodySize(2), WithDisablePrintRoute(true), + WithSenseClientDisconnection(true), WithNetwork("unix"), WithExitWaitTime(time.Second), WithMaxKeepBodySize(500), @@ -93,6 +94,7 @@ func TestOptions(t *testing.T) { assert.DeepEqual(t, opt.BasePath, "/") assert.DeepEqual(t, opt.MaxRequestBodySize, 2) assert.DeepEqual(t, opt.DisablePrintRoute, true) + assert.DeepEqual(t, opt.SenseClientDisconnection, true) assert.DeepEqual(t, opt.Network, "unix") assert.DeepEqual(t, opt.ExitWaitTimeout, time.Second) assert.DeepEqual(t, opt.MaxKeepBodySize, 500) @@ -130,6 +132,7 @@ func TestDefaultOptions(t *testing.T) { assert.DeepEqual(t, opt.GetOnly, false) assert.DeepEqual(t, opt.DisableKeepalive, false) assert.DeepEqual(t, opt.DisablePrintRoute, false) + assert.DeepEqual(t, opt.SenseClientDisconnection, false) assert.DeepEqual(t, opt.Network, "tcp") assert.DeepEqual(t, opt.ExitWaitTimeout, time.Second*5) assert.DeepEqual(t, opt.MaxKeepBodySize, 4*1024*1024) diff --git a/pkg/common/config/option.go b/pkg/common/config/option.go index 417955fc9..958d9b3a3 100644 --- a/pkg/common/config/option.go +++ b/pkg/common/config/option.go @@ -63,6 +63,7 @@ type Options struct { StreamRequestBody bool NoDefaultServerHeader bool DisablePrintRoute bool + SenseClientDisconnection bool Network string Addr string BasePath string @@ -203,6 +204,9 @@ func NewOptions(opts []Option) *Options { // Disabled when set to True DisablePrintRoute: false, + // The ability to sense client disconnection is disabled by default + SenseClientDisconnection: false, + // "tcp", "udp", "unix"(unix domain socket) Network: defaultNetwork, diff --git a/pkg/common/config/option_test.go b/pkg/common/config/option_test.go index 67fcab796..b836a3a1a 100644 --- a/pkg/common/config/option_test.go +++ b/pkg/common/config/option_test.go @@ -39,6 +39,7 @@ func TestDefaultOptions(t *testing.T) { assert.False(t, options.RemoveExtraSlash) assert.True(t, options.UnescapePathValues) assert.False(t, options.DisablePreParseMultipartForm) + assert.False(t, options.SenseClientDisconnection) assert.DeepEqual(t, defaultNetwork, options.Network) assert.DeepEqual(t, defaultAddr, options.Addr) assert.DeepEqual(t, defaultMaxRequestBodySize, options.MaxRequestBodySize) diff --git a/pkg/network/netpoll/transport.go b/pkg/network/netpoll/transport.go index 17829cb83..6212271f7 100644 --- a/pkg/network/netpoll/transport.go +++ b/pkg/network/netpoll/transport.go @@ -38,31 +38,34 @@ func init() { type transporter struct { sync.RWMutex - network string - addr string - keepAliveTimeout time.Duration - readTimeout time.Duration - writeTimeout time.Duration - listener net.Listener - eventLoop netpoll.EventLoop - listenConfig *net.ListenConfig - OnAccept func(conn net.Conn) context.Context - OnConnect func(ctx context.Context, conn network.Conn) context.Context + senseClientDisconnection bool + network string + addr string + keepAliveTimeout time.Duration + readTimeout time.Duration + writeTimeout time.Duration + listener net.Listener + eventLoop netpoll.EventLoop + listenConfig *net.ListenConfig + OnAccept func(conn net.Conn) context.Context + OnConnect func(ctx context.Context, conn network.Conn) context.Context + OnDisconnect func(ctx context.Context, conn network.Conn) } // For transporter switch func NewTransporter(options *config.Options) network.Transporter { return &transporter{ - network: options.Network, - addr: options.Addr, - keepAliveTimeout: options.KeepAliveTimeout, - readTimeout: options.ReadTimeout, - writeTimeout: options.WriteTimeout, - listener: nil, - eventLoop: nil, - listenConfig: options.ListenConfig, - OnAccept: options.OnAccept, - OnConnect: options.OnConnect, + senseClientDisconnection: options.SenseClientDisconnection, + network: options.Network, + addr: options.Addr, + keepAliveTimeout: options.KeepAliveTimeout, + readTimeout: options.ReadTimeout, + writeTimeout: options.WriteTimeout, + listener: nil, + eventLoop: nil, + listenConfig: options.ListenConfig, + OnAccept: options.OnAccept, + OnConnect: options.OnConnect, } } @@ -95,12 +98,28 @@ func (t *transporter) ListenAndServe(onReq network.OnData) (err error) { }), } + type cKey string + const ctxKey cKey = "ctxKey" if t.OnConnect != nil { opts = append(opts, netpoll.WithOnConnect(func(ctx context.Context, conn netpoll.Connection) context.Context { + if t.senseClientDisconnection { + ctx, cancel := context.WithCancel(ctx) + t.OnConnect(ctx, newConn(conn)) + return context.WithValue(ctx, ctxKey, cancel) + } return t.OnConnect(ctx, newConn(conn)) })) } + if t.senseClientDisconnection { + opts = append(opts, netpoll.WithOnDisconnect(func(ctx context.Context, connection netpoll.Connection) { + cancelFunc, _ := ctx.Value(ctxKey).(context.CancelFunc) + if cancelFunc != nil { + cancelFunc() + } + })) + } + // Create EventLoop t.Lock() t.eventLoop, err = netpoll.NewEventLoop(func(ctx context.Context, connection netpoll.Connection) error { diff --git a/pkg/network/netpoll/transport_test.go b/pkg/network/netpoll/transport_test.go index d8a06090c..30df9a891 100644 --- a/pkg/network/netpoll/transport_test.go +++ b/pkg/network/netpoll/transport_test.go @@ -21,6 +21,7 @@ package netpoll import ( "context" "net" + "sync" "sync/atomic" "syscall" "testing" @@ -69,6 +70,50 @@ func TestTransport(t *testing.T) { assert.Assert(t, atomic.LoadInt32(&onDataFlag) == 1) }) + t.Run("TestSenseClientDisconnection", func(t *testing.T) { + var onConnFlag int32 + var mu sync.Mutex + var ctxVal context.Context + transporter := NewTransporter(&config.Options{ + Addr: addr, + Network: nw, + OnConnect: func(ctx context.Context, conn network.Conn) context.Context { + atomic.StoreInt32(&onConnFlag, 1) + mu.Lock() + defer mu.Unlock() + ctxVal = ctx + return ctx + }, + SenseClientDisconnection: true, + }) + + go transporter.ListenAndServe(func(ctx context.Context, conn interface{}) error { + return nil + }) + defer transporter.Close() + time.Sleep(100 * time.Millisecond) + + dial := NewDialer() + conn, err := dial.DialConnection(nw, addr, time.Second, nil) + assert.Nil(t, err) + _, err = conn.Write([]byte("123")) + assert.Nil(t, err) + time.Sleep(100 * time.Millisecond) + + assert.Assert(t, atomic.LoadInt32(&onConnFlag) == 1) + + mu.Lock() + assert.Nil(t, ctxVal.Err()) + mu.Unlock() + + err = conn.Close() + assert.Nil(t, err) + time.Sleep(100 * time.Millisecond) + mu.Lock() + defer mu.Unlock() + assert.DeepEqual(t, context.Canceled, ctxVal.Err()) + }) + t.Run("TestListenConfig", func(t *testing.T) { listenCfg := &net.ListenConfig{Control: func(network, address string, c syscall.RawConn) error { return c.Control(func(fd uintptr) { From 5b1b6f10e7ddfee6ee63f7056bcad3d8ca65bc6f Mon Sep 17 00:00:00 2001 From: Ricky-chen1 Date: Sun, 28 Jan 2024 17:10:16 +0800 Subject: [PATCH 09/17] test: add TestWithSenseClientDisconnection func --- pkg/app/server/hertz_test.go | 41 ++++++++++++++++++++++++++++++++ pkg/app/server/option.go | 7 +++++- pkg/network/netpoll/transport.go | 3 +-- 3 files changed, 48 insertions(+), 3 deletions(-) diff --git a/pkg/app/server/hertz_test.go b/pkg/app/server/hertz_test.go index 3352d34c8..d116ed90f 100644 --- a/pkg/app/server/hertz_test.go +++ b/pkg/app/server/hertz_test.go @@ -1147,3 +1147,44 @@ func TestWithDisableDefaultContentType(t *testing.T) { r, _ := hc.Get("http://127.0.0.1:8324") //nolint:errcheck assert.DeepEqual(t, "", r.Header.Get("Content-Type")) } + +type closeConnectionTransporter struct{} + +func (tr *closeConnectionTransporter) RoundTrip(req *http.Request) (*http.Response, error) { + resp, err := http.DefaultTransport.RoundTrip(req) + if resp != nil && resp.Body != nil { + resp.Body.Close() + } + return resp, err +} + +func TestWithSenseClientDisconnection(t *testing.T) { + var ctxVal context.Context + var mu sync.Mutex + h := New( + WithHostPorts("localhost:8327"), + WithSenseClientDisconnection(true), + WithOnConnect(func(ctx context.Context, conn network.Conn) context.Context { + mu.Lock() + defer mu.Unlock() + ctxVal = ctx + return ctx + })) + go h.Spin() + time.Sleep(100 * time.Millisecond) + + h.GET("/", func(c context.Context, ctx *app.RequestContext) { + ctx.Response.AppendBodyString("test") + }) + + hc := http.Client{ + Timeout: time.Second, + Transport: &closeConnectionTransporter{}, + } + hc.Get("http://127.0.0.1:8327") + time.Sleep(100 * time.Millisecond) + + mu.Lock() + defer mu.Unlock() + assert.DeepEqual(t, context.Canceled, ctxVal.Err()) +} diff --git a/pkg/app/server/option.go b/pkg/app/server/option.go index bf2746e7e..3e71507e8 100644 --- a/pkg/app/server/option.go +++ b/pkg/app/server/option.go @@ -336,11 +336,16 @@ func WithDisablePrintRoute(b bool) config.Option { // If we don't set it, it will default to false. // There are three issues to note when using this option: // 1. It only applies to netpoll. -// 2. It needs to be used in conjunction with WithOnConnect,which will return a canceled context when peer closed. +// 2. It needs to be used in conjunction with WithOnConnect,whose context will be canceled when peer closed. // Examples: +// var ctxVal context.Context +// var mu sync.Mutex // server.Default( // server.WithSenseClientDisconnection(true), // server.WithOnConnect(func(ctx context.Context, conn network.Conn) context.Context { +// mu.Lock() +// defer mu.Unlock() +// ctxVal = ctx // ctxVal will be canceled when SenseClientDisconnection is true // return ctx // })) // 3. The cost is high after opening, please choose carefully. diff --git a/pkg/network/netpoll/transport.go b/pkg/network/netpoll/transport.go index 6212271f7..7647c5cd4 100644 --- a/pkg/network/netpoll/transport.go +++ b/pkg/network/netpoll/transport.go @@ -98,8 +98,7 @@ func (t *transporter) ListenAndServe(onReq network.OnData) (err error) { }), } - type cKey string - const ctxKey cKey = "ctxKey" + const ctxKey = "ctxKey" if t.OnConnect != nil { opts = append(opts, netpoll.WithOnConnect(func(ctx context.Context, conn netpoll.Connection) context.Context { if t.senseClientDisconnection { From b183b2b5fb519dc405aae05b39ae71cec92ec5cc Mon Sep 17 00:00:00 2001 From: Ricky-chen1 Date: Sun, 28 Jan 2024 17:33:52 +0800 Subject: [PATCH 10/17] chore: modify comment --- pkg/app/server/option.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/app/server/option.go b/pkg/app/server/option.go index 3e71507e8..343e8cefc 100644 --- a/pkg/app/server/option.go +++ b/pkg/app/server/option.go @@ -348,7 +348,6 @@ func WithDisablePrintRoute(b bool) config.Option { // ctxVal = ctx // ctxVal will be canceled when SenseClientDisconnection is true // return ctx // })) -// 3. The cost is high after opening, please choose carefully. func WithSenseClientDisconnection(b bool) config.Option { return config.Option{F: func(o *config.Options) { o.SenseClientDisconnection = b From adf759c7eae9018d824fca240fb3433c364f271a Mon Sep 17 00:00:00 2001 From: Ricky-chen1 Date: Sun, 28 Jan 2024 18:42:57 +0800 Subject: [PATCH 11/17] test: modify TestWithSenseClientDisconnetion func --- pkg/app/server/hertz_test.go | 10 ++++------ pkg/app/server/option.go | 14 +++----------- pkg/network/netpoll/transport.go | 12 +++++------- pkg/network/netpoll/transport_test.go | 22 ++++++++++------------ 4 files changed, 22 insertions(+), 36 deletions(-) diff --git a/pkg/app/server/hertz_test.go b/pkg/app/server/hertz_test.go index d116ed90f..292a4f18f 100644 --- a/pkg/app/server/hertz_test.go +++ b/pkg/app/server/hertz_test.go @@ -1164,17 +1164,15 @@ func TestWithSenseClientDisconnection(t *testing.T) { h := New( WithHostPorts("localhost:8327"), WithSenseClientDisconnection(true), - WithOnConnect(func(ctx context.Context, conn network.Conn) context.Context { - mu.Lock() - defer mu.Unlock() - ctxVal = ctx - return ctx - })) + ) go h.Spin() time.Sleep(100 * time.Millisecond) h.GET("/", func(c context.Context, ctx *app.RequestContext) { ctx.Response.AppendBodyString("test") + mu.Lock() + defer mu.Unlock() + ctxVal = c }) hc := http.Client{ diff --git a/pkg/app/server/option.go b/pkg/app/server/option.go index 343e8cefc..3ab4f7a0a 100644 --- a/pkg/app/server/option.go +++ b/pkg/app/server/option.go @@ -334,20 +334,12 @@ func WithDisablePrintRoute(b bool) config.Option { // WithSenseClientDisconnection sets the ability to sense client disconnections. // If we don't set it, it will default to false. -// There are three issues to note when using this option: +// There are two issue to note when using this option: // 1. It only applies to netpoll. -// 2. It needs to be used in conjunction with WithOnConnect,whose context will be canceled when peer closed. -// Examples: -// var ctxVal context.Context -// var mu sync.Mutex +// 2. Example: // server.Default( // server.WithSenseClientDisconnection(true), -// server.WithOnConnect(func(ctx context.Context, conn network.Conn) context.Context { -// mu.Lock() -// defer mu.Unlock() -// ctxVal = ctx // ctxVal will be canceled when SenseClientDisconnection is true -// return ctx -// })) +// ) func WithSenseClientDisconnection(b bool) config.Option { return config.Option{F: func(o *config.Options) { o.SenseClientDisconnection = b diff --git a/pkg/network/netpoll/transport.go b/pkg/network/netpoll/transport.go index 7647c5cd4..a1e0eff16 100644 --- a/pkg/network/netpoll/transport.go +++ b/pkg/network/netpoll/transport.go @@ -98,20 +98,18 @@ func (t *transporter) ListenAndServe(onReq network.OnData) (err error) { }), } - const ctxKey = "ctxKey" if t.OnConnect != nil { opts = append(opts, netpoll.WithOnConnect(func(ctx context.Context, conn netpoll.Connection) context.Context { - if t.senseClientDisconnection { - ctx, cancel := context.WithCancel(ctx) - t.OnConnect(ctx, newConn(conn)) - return context.WithValue(ctx, ctxKey, cancel) - } return t.OnConnect(ctx, newConn(conn)) })) } + const ctxKey = "ctxKey" if t.senseClientDisconnection { - opts = append(opts, netpoll.WithOnDisconnect(func(ctx context.Context, connection netpoll.Connection) { + opts = append(opts, netpoll.WithOnConnect(func(ctx context.Context, connection netpoll.Connection) context.Context { + ctx, cancel := context.WithCancel(ctx) + return context.WithValue(ctx, ctxKey, cancel) + }), netpoll.WithOnDisconnect(func(ctx context.Context, connection netpoll.Connection) { cancelFunc, _ := ctx.Value(ctxKey).(context.CancelFunc) if cancelFunc != nil { cancelFunc() diff --git a/pkg/network/netpoll/transport_test.go b/pkg/network/netpoll/transport_test.go index 30df9a891..e7754120d 100644 --- a/pkg/network/netpoll/transport_test.go +++ b/pkg/network/netpoll/transport_test.go @@ -71,23 +71,20 @@ func TestTransport(t *testing.T) { }) t.Run("TestSenseClientDisconnection", func(t *testing.T) { - var onConnFlag int32 - var mu sync.Mutex + var onReqFlag int32 var ctxVal context.Context + var mu sync.Mutex transporter := NewTransporter(&config.Options{ - Addr: addr, - Network: nw, - OnConnect: func(ctx context.Context, conn network.Conn) context.Context { - atomic.StoreInt32(&onConnFlag, 1) - mu.Lock() - defer mu.Unlock() - ctxVal = ctx - return ctx - }, + Addr: addr, + Network: nw, SenseClientDisconnection: true, }) go transporter.ListenAndServe(func(ctx context.Context, conn interface{}) error { + atomic.StoreInt32(&onReqFlag, 1) + mu.Lock() + defer mu.Unlock() + ctxVal = ctx return nil }) defer transporter.Close() @@ -100,7 +97,7 @@ func TestTransport(t *testing.T) { assert.Nil(t, err) time.Sleep(100 * time.Millisecond) - assert.Assert(t, atomic.LoadInt32(&onConnFlag) == 1) + assert.Assert(t, atomic.LoadInt32(&onReqFlag) == 1) mu.Lock() assert.Nil(t, ctxVal.Err()) @@ -109,6 +106,7 @@ func TestTransport(t *testing.T) { err = conn.Close() assert.Nil(t, err) time.Sleep(100 * time.Millisecond) + mu.Lock() defer mu.Unlock() assert.DeepEqual(t, context.Canceled, ctxVal.Err()) From 45202d0d14bcf3210e6225a26d85ef0b36c0178f Mon Sep 17 00:00:00 2001 From: Ricky-chen1 Date: Tue, 30 Jan 2024 15:00:47 +0800 Subject: [PATCH 12/17] test: modify TestWithSenseClientDisconnection func --- pkg/app/server/hertz_test.go | 54 ++++++++++++++------------- pkg/app/server/option.go | 14 ++++--- pkg/network/netpoll/transport.go | 1 - pkg/network/netpoll/transport_test.go | 20 ++-------- 4 files changed, 40 insertions(+), 49 deletions(-) diff --git a/pkg/app/server/hertz_test.go b/pkg/app/server/hertz_test.go index 292a4f18f..646644e5d 100644 --- a/pkg/app/server/hertz_test.go +++ b/pkg/app/server/hertz_test.go @@ -43,6 +43,7 @@ import ( "github.com/cloudwego/hertz/pkg/common/test/mock" "github.com/cloudwego/hertz/pkg/common/utils" "github.com/cloudwego/hertz/pkg/network" + "github.com/cloudwego/hertz/pkg/network/netpoll" "github.com/cloudwego/hertz/pkg/network/standard" "github.com/cloudwego/hertz/pkg/protocol" "github.com/cloudwego/hertz/pkg/protocol/consts" @@ -1148,41 +1149,44 @@ func TestWithDisableDefaultContentType(t *testing.T) { assert.DeepEqual(t, "", r.Header.Get("Content-Type")) } -type closeConnectionTransporter struct{} - -func (tr *closeConnectionTransporter) RoundTrip(req *http.Request) (*http.Response, error) { - resp, err := http.DefaultTransport.RoundTrip(req) - if resp != nil && resp.Body != nil { - resp.Body.Close() - } - return resp, err -} - func TestWithSenseClientDisconnection(t *testing.T) { - var ctxVal context.Context - var mu sync.Mutex h := New( WithHostPorts("localhost:8327"), WithSenseClientDisconnection(true), ) + var wg sync.WaitGroup + wg.Add(1) + h.GET("/test", func(c context.Context, ctx *app.RequestContext) { + defer wg.Done() + select { + case <-c.Done(): + return + case <-time.After(time.Second): + t.Fatal("cancel context failed") + } + }) go h.Spin() time.Sleep(100 * time.Millisecond) - h.GET("/", func(c context.Context, ctx *app.RequestContext) { - ctx.Response.AppendBodyString("test") - mu.Lock() - defer mu.Unlock() - ctxVal = c - }) - + dail := netpoll.NewDialer() + conn, err := dail.DialConnection("tcp", "127.0.0.1:8327", 0, nil) + assert.Nil(t, err) + tr := &http.Transport{ + DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { + return conn, nil + }, + } hc := http.Client{ Timeout: time.Second, - Transport: &closeConnectionTransporter{}, + Transport: tr, } - hc.Get("http://127.0.0.1:8327") - time.Sleep(100 * time.Millisecond) - mu.Lock() - defer mu.Unlock() - assert.DeepEqual(t, context.Canceled, ctxVal.Err()) + go func() { + _, err := hc.Get("http://127.0.0.1:8327/test") + assert.NotNil(t, err) + }() + time.Sleep(100 * time.Millisecond) + err = conn.Close() + assert.Nil(t, err) + wg.Wait() } diff --git a/pkg/app/server/option.go b/pkg/app/server/option.go index 3ab4f7a0a..e1d93b9c2 100644 --- a/pkg/app/server/option.go +++ b/pkg/app/server/option.go @@ -334,12 +334,14 @@ func WithDisablePrintRoute(b bool) config.Option { // WithSenseClientDisconnection sets the ability to sense client disconnections. // If we don't set it, it will default to false. -// There are two issue to note when using this option: -// 1. It only applies to netpoll. -// 2. Example: -// server.Default( -// server.WithSenseClientDisconnection(true), -// ) +// There are two issues to note when using this option: +// 1. Warning: It only applies to netpoll. +// 2. After opening, the context.Context in the request will be cancelled. +// +// Example: +// server.Default( +// server.WithSenseClientDisconnection(true), +// ) func WithSenseClientDisconnection(b bool) config.Option { return config.Option{F: func(o *config.Options) { o.SenseClientDisconnection = b diff --git a/pkg/network/netpoll/transport.go b/pkg/network/netpoll/transport.go index a1e0eff16..6402bd146 100644 --- a/pkg/network/netpoll/transport.go +++ b/pkg/network/netpoll/transport.go @@ -49,7 +49,6 @@ type transporter struct { listenConfig *net.ListenConfig OnAccept func(conn net.Conn) context.Context OnConnect func(ctx context.Context, conn network.Conn) context.Context - OnDisconnect func(ctx context.Context, conn network.Conn) } // For transporter switch diff --git a/pkg/network/netpoll/transport_test.go b/pkg/network/netpoll/transport_test.go index e7754120d..53239209e 100644 --- a/pkg/network/netpoll/transport_test.go +++ b/pkg/network/netpoll/transport_test.go @@ -21,7 +21,6 @@ package netpoll import ( "context" "net" - "sync" "sync/atomic" "syscall" "testing" @@ -72,8 +71,6 @@ func TestTransport(t *testing.T) { t.Run("TestSenseClientDisconnection", func(t *testing.T) { var onReqFlag int32 - var ctxVal context.Context - var mu sync.Mutex transporter := NewTransporter(&config.Options{ Addr: addr, Network: nw, @@ -82,9 +79,8 @@ func TestTransport(t *testing.T) { go transporter.ListenAndServe(func(ctx context.Context, conn interface{}) error { atomic.StoreInt32(&onReqFlag, 1) - mu.Lock() - defer mu.Unlock() - ctxVal = ctx + time.Sleep(100 * time.Millisecond) + assert.DeepEqual(t, context.Canceled, ctx.Err()) return nil }) defer transporter.Close() @@ -95,21 +91,11 @@ func TestTransport(t *testing.T) { assert.Nil(t, err) _, err = conn.Write([]byte("123")) assert.Nil(t, err) - time.Sleep(100 * time.Millisecond) - - assert.Assert(t, atomic.LoadInt32(&onReqFlag) == 1) - - mu.Lock() - assert.Nil(t, ctxVal.Err()) - mu.Unlock() - err = conn.Close() assert.Nil(t, err) time.Sleep(100 * time.Millisecond) - mu.Lock() - defer mu.Unlock() - assert.DeepEqual(t, context.Canceled, ctxVal.Err()) + assert.Assert(t, atomic.LoadInt32(&onReqFlag) == 1) }) t.Run("TestListenConfig", func(t *testing.T) { From b9e0ead950ad46ef4f6a90458561e621538bc10d Mon Sep 17 00:00:00 2001 From: Ricky-chen1 Date: Thu, 8 Feb 2024 20:35:35 +0800 Subject: [PATCH 13/17] fix: ctx race when disconnect callback run with connect callback --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 36eb3a559..ec9640d56 100644 --- a/go.mod +++ b/go.mod @@ -15,4 +15,4 @@ require ( google.golang.org/protobuf v1.27.1 ) -replace github.com/cloudwego/netpoll v0.5.0 => github.com/joway/netpoll v0.0.4-0.20240122121337-a95f4a19673e +replace github.com/cloudwego/netpoll v0.5.0 => github.com/joway/netpoll v0.0.4-0.20240207064408-7486eebf98df diff --git a/go.sum b/go.sum index c38186d28..cb62bc0be 100644 --- a/go.sum +++ b/go.sum @@ -27,8 +27,8 @@ github.com/henrylee2cn/ameda v1.4.10 h1:JdvI2Ekq7tapdPsuhrc4CaFiqw6QXFvZIULWJgQy github.com/henrylee2cn/ameda v1.4.10/go.mod h1:liZulR8DgHxdK+MEwvZIylGnmcjzQ6N6f2PlWe7nEO4= github.com/henrylee2cn/goutil v0.0.0-20210127050712-89660552f6f8 h1:yE9ULgp02BhYIrO6sdV/FPe0xQM6fNHkVQW2IAymfM0= github.com/henrylee2cn/goutil v0.0.0-20210127050712-89660552f6f8/go.mod h1:Nhe/DM3671a5udlv2AdV2ni/MZzgfv2qrPL5nIi3EGQ= -github.com/joway/netpoll v0.0.4-0.20240122121337-a95f4a19673e h1:o5RYe79HKcK3Vh6WM9Aag+KUWVdu+spRyfpfXMq2hQc= -github.com/joway/netpoll v0.0.4-0.20240122121337-a95f4a19673e/go.mod h1:xVefXptcyheopwNDZjDPcfU6kIjZXZ4nY550k1yH9eQ= +github.com/joway/netpoll v0.0.4-0.20240207064408-7486eebf98df h1:rYQQQibbNpf1uDSPYpHJSai/0TsLLJ/74IHb5fQ9WWs= +github.com/joway/netpoll v0.0.4-0.20240207064408-7486eebf98df/go.mod h1:xVefXptcyheopwNDZjDPcfU6kIjZXZ4nY550k1yH9eQ= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= From bea3fbdbde496f253308c0af6399fadea45909cc Mon Sep 17 00:00:00 2001 From: kinggo Date: Mon, 12 Feb 2024 16:48:53 +0800 Subject: [PATCH 14/17] feat: add cancelContext --- go.mod | 4 +-- go.sum | 4 +-- pkg/app/server/hertz_test.go | 43 ------------------------- pkg/app/server/hertz_unix_test.go | 53 +++++++++++++++++++++++++++++++ pkg/app/server/option.go | 32 +++++++++---------- pkg/network/netpoll/transport.go | 26 +++++++++------ 6 files changed, 89 insertions(+), 73 deletions(-) diff --git a/go.mod b/go.mod index ec9640d56..8530b25a9 100644 --- a/go.mod +++ b/go.mod @@ -7,12 +7,10 @@ require ( github.com/bytedance/gopkg v0.0.0-20220413063733-65bf48ffb3a7 github.com/bytedance/mockey v1.2.1 github.com/bytedance/sonic v1.8.1 - github.com/cloudwego/netpoll v0.5.0 + github.com/cloudwego/netpoll v0.5.2-0.20240220090456-7ba622bf763b github.com/fsnotify/fsnotify v1.5.4 github.com/tidwall/gjson v1.14.4 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/sys v0.0.0-20220412211240-33da011f77ad google.golang.org/protobuf v1.27.1 ) - -replace github.com/cloudwego/netpoll v0.5.0 => github.com/joway/netpoll v0.0.4-0.20240207064408-7486eebf98df diff --git a/go.sum b/go.sum index cb62bc0be..c043792ae 100644 --- a/go.sum +++ b/go.sum @@ -10,6 +10,8 @@ github.com/bytedance/sonic v1.8.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZX github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY= github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams= github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk= +github.com/cloudwego/netpoll v0.5.2-0.20240220090456-7ba622bf763b h1:ZHtA1Q20H9WoLPfMHCSkMv8wUrN7YENJfQCVybErGy8= +github.com/cloudwego/netpoll v0.5.2-0.20240220090456-7ba622bf763b/go.mod h1:xVefXptcyheopwNDZjDPcfU6kIjZXZ4nY550k1yH9eQ= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -27,8 +29,6 @@ github.com/henrylee2cn/ameda v1.4.10 h1:JdvI2Ekq7tapdPsuhrc4CaFiqw6QXFvZIULWJgQy github.com/henrylee2cn/ameda v1.4.10/go.mod h1:liZulR8DgHxdK+MEwvZIylGnmcjzQ6N6f2PlWe7nEO4= github.com/henrylee2cn/goutil v0.0.0-20210127050712-89660552f6f8 h1:yE9ULgp02BhYIrO6sdV/FPe0xQM6fNHkVQW2IAymfM0= github.com/henrylee2cn/goutil v0.0.0-20210127050712-89660552f6f8/go.mod h1:Nhe/DM3671a5udlv2AdV2ni/MZzgfv2qrPL5nIi3EGQ= -github.com/joway/netpoll v0.0.4-0.20240207064408-7486eebf98df h1:rYQQQibbNpf1uDSPYpHJSai/0TsLLJ/74IHb5fQ9WWs= -github.com/joway/netpoll v0.0.4-0.20240207064408-7486eebf98df/go.mod h1:xVefXptcyheopwNDZjDPcfU6kIjZXZ4nY550k1yH9eQ= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= diff --git a/pkg/app/server/hertz_test.go b/pkg/app/server/hertz_test.go index 646644e5d..3352d34c8 100644 --- a/pkg/app/server/hertz_test.go +++ b/pkg/app/server/hertz_test.go @@ -43,7 +43,6 @@ import ( "github.com/cloudwego/hertz/pkg/common/test/mock" "github.com/cloudwego/hertz/pkg/common/utils" "github.com/cloudwego/hertz/pkg/network" - "github.com/cloudwego/hertz/pkg/network/netpoll" "github.com/cloudwego/hertz/pkg/network/standard" "github.com/cloudwego/hertz/pkg/protocol" "github.com/cloudwego/hertz/pkg/protocol/consts" @@ -1148,45 +1147,3 @@ func TestWithDisableDefaultContentType(t *testing.T) { r, _ := hc.Get("http://127.0.0.1:8324") //nolint:errcheck assert.DeepEqual(t, "", r.Header.Get("Content-Type")) } - -func TestWithSenseClientDisconnection(t *testing.T) { - h := New( - WithHostPorts("localhost:8327"), - WithSenseClientDisconnection(true), - ) - var wg sync.WaitGroup - wg.Add(1) - h.GET("/test", func(c context.Context, ctx *app.RequestContext) { - defer wg.Done() - select { - case <-c.Done(): - return - case <-time.After(time.Second): - t.Fatal("cancel context failed") - } - }) - go h.Spin() - time.Sleep(100 * time.Millisecond) - - dail := netpoll.NewDialer() - conn, err := dail.DialConnection("tcp", "127.0.0.1:8327", 0, nil) - assert.Nil(t, err) - tr := &http.Transport{ - DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { - return conn, nil - }, - } - hc := http.Client{ - Timeout: time.Second, - Transport: tr, - } - - go func() { - _, err := hc.Get("http://127.0.0.1:8327/test") - assert.NotNil(t, err) - }() - time.Sleep(100 * time.Millisecond) - err = conn.Close() - assert.Nil(t, err) - wg.Wait() -} diff --git a/pkg/app/server/hertz_unix_test.go b/pkg/app/server/hertz_unix_test.go index b37ddfbdf..32fbd2ee5 100644 --- a/pkg/app/server/hertz_unix_test.go +++ b/pkg/app/server/hertz_unix_test.go @@ -34,6 +34,7 @@ import ( c "github.com/cloudwego/hertz/pkg/app/client" "github.com/cloudwego/hertz/pkg/common/test/assert" "github.com/cloudwego/hertz/pkg/common/utils" + "github.com/cloudwego/hertz/pkg/network" "github.com/cloudwego/hertz/pkg/network/standard" "github.com/cloudwego/hertz/pkg/protocol/consts" "golang.org/x/sys/unix" @@ -134,3 +135,55 @@ func TestHertz_Spin(t *testing.T) { <-ch2 } + +func TestWithSenseClientDisconnection(t *testing.T) { + var closeFlag int32 + h := New(WithHostPorts("127.0.0.1:6631"), WithSenseClientDisconnection(true)) + h.GET("/ping", func(c context.Context, ctx *app.RequestContext) { + assert.DeepEqual(t, "aa", string(ctx.Host())) + ch := make(chan struct{}) + select { + case <-c.Done(): + atomic.StoreInt32(&closeFlag, 1) + assert.DeepEqual(t, context.Canceled, c.Err()) + case <-ch: + } + }) + go h.Spin() + time.Sleep(time.Second) + con, err := net.Dial("tcp", "127.0.0.1:6631") + assert.Nil(t, err) + _, err = con.Write([]byte("GET /ping HTTP/1.1\r\nHost: aa\r\n\r\n")) + assert.Nil(t, err) + time.Sleep(time.Second) + assert.Nil(t, con.Close()) + time.Sleep(time.Second) + assert.DeepEqual(t, atomic.LoadInt32(&closeFlag), int32(1)) +} + +func TestWithSenseClientDisconnectionAndWithOnConnect(t *testing.T) { + var closeFlag int32 + h := New(WithHostPorts("127.0.0.1:6632"), WithSenseClientDisconnection(true), WithOnConnect(func(ctx context.Context, conn network.Conn) context.Context { + return ctx + })) + h.GET("/ping", func(c context.Context, ctx *app.RequestContext) { + assert.DeepEqual(t, "aa", string(ctx.Host())) + ch := make(chan struct{}) + select { + case <-c.Done(): + atomic.StoreInt32(&closeFlag, 1) + assert.DeepEqual(t, context.Canceled, c.Err()) + case <-ch: + } + }) + go h.Spin() + time.Sleep(time.Second) + con, err := net.Dial("tcp", "127.0.0.1:6632") + assert.Nil(t, err) + _, err = con.Write([]byte("GET /ping HTTP/1.1\r\nHost: aa\r\n\r\n")) + assert.Nil(t, err) + time.Sleep(time.Second) + assert.Nil(t, con.Close()) + time.Sleep(time.Second) + assert.DeepEqual(t, atomic.LoadInt32(&closeFlag), int32(1)) +} diff --git a/pkg/app/server/option.go b/pkg/app/server/option.go index e1d93b9c2..18f184379 100644 --- a/pkg/app/server/option.go +++ b/pkg/app/server/option.go @@ -332,22 +332,6 @@ func WithDisablePrintRoute(b bool) config.Option { }} } -// WithSenseClientDisconnection sets the ability to sense client disconnections. -// If we don't set it, it will default to false. -// There are two issues to note when using this option: -// 1. Warning: It only applies to netpoll. -// 2. After opening, the context.Context in the request will be cancelled. -// -// Example: -// server.Default( -// server.WithSenseClientDisconnection(true), -// ) -func WithSenseClientDisconnection(b bool) config.Option { - return config.Option{F: func(o *config.Options) { - o.SenseClientDisconnection = b - }} -} - // WithOnAccept sets the callback function when a new connection is accepted but cannot // receive data in netpoll. In go net, it will be called before converting tls connection func WithOnAccept(fn func(conn net.Conn) context.Context) config.Option { @@ -410,3 +394,19 @@ func WithDisableDefaultContentType(disable bool) config.Option { o.NoDefaultContentType = disable }} } + +// WithSenseClientDisconnection sets the ability to sense client disconnections. +// If we don't set it, it will default to false. +// There are two issues to note when using this option: +// 1. Warning: It only applies to netpoll. +// 2. After opening, the context.Context in the request will be cancelled. +// +// Example: +// server.Default( +// server.WithSenseClientDisconnection(true), +// ) +func WithSenseClientDisconnection(b bool) config.Option { + return config.Option{F: func(o *config.Options) { + o.SenseClientDisconnection = b + }} +} diff --git a/pkg/network/netpoll/transport.go b/pkg/network/netpoll/transport.go index 6402bd146..0b96328c1 100644 --- a/pkg/network/netpoll/transport.go +++ b/pkg/network/netpoll/transport.go @@ -36,6 +36,14 @@ func init() { netpoll.SetLoggerOutput(io.Discard) } +const ctxCancelKey = "ctxCancelKey" + +func cancelContext(ctx context.Context) context.Context { + ctx, cancel := context.WithCancel(ctx) + ctx = context.WithValue(ctx, ctxCancelKey, cancel) + return ctx +} + type transporter struct { sync.RWMutex senseClientDisconnection bool @@ -90,10 +98,14 @@ func (t *transporter) ListenAndServe(onReq network.OnData) (err error) { if t.writeTimeout > 0 { conn.SetWriteTimeout(t.writeTimeout) } + ctx := context.Background() if t.OnAccept != nil { - return t.OnAccept(newConn(conn)) + ctx = t.OnAccept(newConn(conn)) + } + if t.senseClientDisconnection { + ctx = cancelContext(ctx) } - return context.Background() + return ctx }), } @@ -103,14 +115,10 @@ func (t *transporter) ListenAndServe(onReq network.OnData) (err error) { })) } - const ctxKey = "ctxKey" if t.senseClientDisconnection { - opts = append(opts, netpoll.WithOnConnect(func(ctx context.Context, connection netpoll.Connection) context.Context { - ctx, cancel := context.WithCancel(ctx) - return context.WithValue(ctx, ctxKey, cancel) - }), netpoll.WithOnDisconnect(func(ctx context.Context, connection netpoll.Connection) { - cancelFunc, _ := ctx.Value(ctxKey).(context.CancelFunc) - if cancelFunc != nil { + opts = append(opts, netpoll.WithOnDisconnect(func(ctx context.Context, connection netpoll.Connection) { + cancelFunc, ok := ctx.Value(ctxCancelKey).(context.CancelFunc) + if cancelFunc != nil && ok { cancelFunc() } })) From 8d7b453a915e800dfbcd3e647f7390f3fcf5b074 Mon Sep 17 00:00:00 2001 From: kinggo Date: Thu, 9 May 2024 14:33:53 +0800 Subject: [PATCH 15/17] chore: update netpoll version --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 8530b25a9..36f7e5132 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/bytedance/gopkg v0.0.0-20220413063733-65bf48ffb3a7 github.com/bytedance/mockey v1.2.1 github.com/bytedance/sonic v1.8.1 - github.com/cloudwego/netpoll v0.5.2-0.20240220090456-7ba622bf763b + github.com/cloudwego/netpoll v0.6.0 github.com/fsnotify/fsnotify v1.5.4 github.com/tidwall/gjson v1.14.4 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c diff --git a/go.sum b/go.sum index c043792ae..4b60e994d 100644 --- a/go.sum +++ b/go.sum @@ -10,8 +10,8 @@ github.com/bytedance/sonic v1.8.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZX github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY= github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams= github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk= -github.com/cloudwego/netpoll v0.5.2-0.20240220090456-7ba622bf763b h1:ZHtA1Q20H9WoLPfMHCSkMv8wUrN7YENJfQCVybErGy8= -github.com/cloudwego/netpoll v0.5.2-0.20240220090456-7ba622bf763b/go.mod h1:xVefXptcyheopwNDZjDPcfU6kIjZXZ4nY550k1yH9eQ= +github.com/cloudwego/netpoll v0.6.0 h1:JRMkrA1o8k/4quxzg6Q1XM+zIhwZsyoWlq6ef+ht31U= +github.com/cloudwego/netpoll v0.6.0/go.mod h1:xVefXptcyheopwNDZjDPcfU6kIjZXZ4nY550k1yH9eQ= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= From 515fb173bb59826a997bc96d46a40984f9794dcc Mon Sep 17 00:00:00 2001 From: kinggo Date: Fri, 10 May 2024 15:50:28 +0800 Subject: [PATCH 16/17] feat: use struct instead const --- pkg/network/netpoll/transport.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/network/netpoll/transport.go b/pkg/network/netpoll/transport.go index 0b96328c1..6a52bec32 100644 --- a/pkg/network/netpoll/transport.go +++ b/pkg/network/netpoll/transport.go @@ -36,11 +36,11 @@ func init() { netpoll.SetLoggerOutput(io.Discard) } -const ctxCancelKey = "ctxCancelKey" +type ctxCancelKey struct{} func cancelContext(ctx context.Context) context.Context { ctx, cancel := context.WithCancel(ctx) - ctx = context.WithValue(ctx, ctxCancelKey, cancel) + ctx = context.WithValue(ctx, ctxCancelKey{}, cancel) return ctx } @@ -117,7 +117,7 @@ func (t *transporter) ListenAndServe(onReq network.OnData) (err error) { if t.senseClientDisconnection { opts = append(opts, netpoll.WithOnDisconnect(func(ctx context.Context, connection netpoll.Connection) { - cancelFunc, ok := ctx.Value(ctxCancelKey).(context.CancelFunc) + cancelFunc, ok := ctx.Value(ctxCancelKey{}).(context.CancelFunc) if cancelFunc != nil && ok { cancelFunc() } From 33975b6144b247451d9f2320ed05cf7dff795860 Mon Sep 17 00:00:00 2001 From: kinggo Date: Mon, 27 May 2024 22:00:05 +0800 Subject: [PATCH 17/17] test: add assert --- pkg/app/server/hertz_unix_test.go | 2 ++ pkg/network/netpoll/transport.go | 8 +++++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/pkg/app/server/hertz_unix_test.go b/pkg/app/server/hertz_unix_test.go index 32fbd2ee5..b1f7d700c 100644 --- a/pkg/app/server/hertz_unix_test.go +++ b/pkg/app/server/hertz_unix_test.go @@ -156,6 +156,7 @@ func TestWithSenseClientDisconnection(t *testing.T) { _, err = con.Write([]byte("GET /ping HTTP/1.1\r\nHost: aa\r\n\r\n")) assert.Nil(t, err) time.Sleep(time.Second) + assert.DeepEqual(t, atomic.LoadInt32(&closeFlag), int32(0)) assert.Nil(t, con.Close()) time.Sleep(time.Second) assert.DeepEqual(t, atomic.LoadInt32(&closeFlag), int32(1)) @@ -183,6 +184,7 @@ func TestWithSenseClientDisconnectionAndWithOnConnect(t *testing.T) { _, err = con.Write([]byte("GET /ping HTTP/1.1\r\nHost: aa\r\n\r\n")) assert.Nil(t, err) time.Sleep(time.Second) + assert.DeepEqual(t, atomic.LoadInt32(&closeFlag), int32(0)) assert.Nil(t, con.Close()) time.Sleep(time.Second) assert.DeepEqual(t, atomic.LoadInt32(&closeFlag), int32(1)) diff --git a/pkg/network/netpoll/transport.go b/pkg/network/netpoll/transport.go index 6a52bec32..7450292ce 100644 --- a/pkg/network/netpoll/transport.go +++ b/pkg/network/netpoll/transport.go @@ -36,11 +36,13 @@ func init() { netpoll.SetLoggerOutput(io.Discard) } -type ctxCancelKey struct{} +type ctxCancelKeyStruct struct{} + +var ctxCancelKey = ctxCancelKeyStruct{} func cancelContext(ctx context.Context) context.Context { ctx, cancel := context.WithCancel(ctx) - ctx = context.WithValue(ctx, ctxCancelKey{}, cancel) + ctx = context.WithValue(ctx, ctxCancelKey, cancel) return ctx } @@ -117,7 +119,7 @@ func (t *transporter) ListenAndServe(onReq network.OnData) (err error) { if t.senseClientDisconnection { opts = append(opts, netpoll.WithOnDisconnect(func(ctx context.Context, connection netpoll.Connection) { - cancelFunc, ok := ctx.Value(ctxCancelKey{}).(context.CancelFunc) + cancelFunc, ok := ctx.Value(ctxCancelKey).(context.CancelFunc) if cancelFunc != nil && ok { cancelFunc() }