From 613ddcbdecbe27626513f56a561d7bb20450ded5 Mon Sep 17 00:00:00 2001 From: chaoranz758 <2715584135@qq.com> Date: Mon, 16 Oct 2023 10:58:54 +0800 Subject: [PATCH 1/5] feature: support the ability to sense client disconnection --- pkg/app/server/option.go | 18 ++++++++++++ pkg/app/server/option_test.go | 3 ++ pkg/common/config/option.go | 4 +++ pkg/common/config/option_test.go | 1 + pkg/network/netpoll/transport.go | 50 +++++++++++++++++++------------- 5 files changed, 56 insertions(+), 20 deletions(-) diff --git a/pkg/app/server/option.go b/pkg/app/server/option.go index fcf380485..0a03792f8 100644 --- a/pkg/app/server/option.go +++ b/pkg/app/server/option.go @@ -332,6 +332,24 @@ func WithDisablePrintRoute(b bool) config.Option { }} } +// WithSenseClientDisconnection sets the ability to sense client disconnections. +// If we don't set it, it will default to false. +// There are three issues to note when using this option: +// 1. It only applies to netpoll. +// 2. It needs to be used in conjunction with WithOnAccept. +// Examples: +// server.Default( +// server.WithSenseClientDisconnection(true), +// server.WithOnConnect(func(ctx context.Context, conn network.Conn) context.Context { +// return ctx +// })) +// 3. The cost is high after opening, please choose carefully. +func WithSenseClientDisconnection(b bool) config.Option { + return config.Option{F: func(o *config.Options) { + o.SenseClientDisconnection = b + }} +} + // WithOnAccept sets the callback function when a new connection is accepted but cannot // receive data in netpoll. In go net, it will be called before converting tls connection func WithOnAccept(fn func(conn net.Conn) context.Context) config.Option { diff --git a/pkg/app/server/option_test.go b/pkg/app/server/option_test.go index f5d7f7b32..bc43e7cd2 100644 --- a/pkg/app/server/option_test.go +++ b/pkg/app/server/option_test.go @@ -61,6 +61,7 @@ func TestOptions(t *testing.T) { WithBasePath("/"), WithMaxRequestBodySize(2), WithDisablePrintRoute(true), + WithSenseClientDisconnection(true), WithNetwork("unix"), WithExitWaitTime(time.Second), WithMaxKeepBodySize(500), @@ -93,6 +94,7 @@ func TestOptions(t *testing.T) { assert.DeepEqual(t, opt.BasePath, "/") assert.DeepEqual(t, opt.MaxRequestBodySize, 2) assert.DeepEqual(t, opt.DisablePrintRoute, true) + assert.DeepEqual(t, opt.SenseClientDisconnection, true) assert.DeepEqual(t, opt.Network, "unix") assert.DeepEqual(t, opt.ExitWaitTimeout, time.Second) assert.DeepEqual(t, opt.MaxKeepBodySize, 500) @@ -130,6 +132,7 @@ func TestDefaultOptions(t *testing.T) { assert.DeepEqual(t, opt.GetOnly, false) assert.DeepEqual(t, opt.DisableKeepalive, false) assert.DeepEqual(t, opt.DisablePrintRoute, false) + assert.DeepEqual(t, opt.SenseClientDisconnection, false) assert.DeepEqual(t, opt.Network, "tcp") assert.DeepEqual(t, opt.ExitWaitTimeout, time.Second*5) assert.DeepEqual(t, opt.MaxKeepBodySize, 4*1024*1024) diff --git a/pkg/common/config/option.go b/pkg/common/config/option.go index 048fb366f..bc6c24c51 100644 --- a/pkg/common/config/option.go +++ b/pkg/common/config/option.go @@ -61,6 +61,7 @@ type Options struct { StreamRequestBody bool NoDefaultServerHeader bool DisablePrintRoute bool + SenseClientDisconnection bool Network string Addr string BasePath string @@ -195,6 +196,9 @@ func NewOptions(opts []Option) *Options { // Disabled when set to True DisablePrintRoute: false, + // The ability to sense client disconnection is disabled by default + SenseClientDisconnection: false, + // "tcp", "udp", "unix"(unix domain socket) Network: defaultNetwork, diff --git a/pkg/common/config/option_test.go b/pkg/common/config/option_test.go index 67fcab796..315ec9c97 100644 --- a/pkg/common/config/option_test.go +++ b/pkg/common/config/option_test.go @@ -37,6 +37,7 @@ func TestDefaultOptions(t *testing.T) { assert.False(t, options.HandleMethodNotAllowed) assert.False(t, options.UseRawPath) assert.False(t, options.RemoveExtraSlash) + assert.False(t, options.SenseClientDisconnection) assert.True(t, options.UnescapePathValues) assert.False(t, options.DisablePreParseMultipartForm) assert.DeepEqual(t, defaultNetwork, options.Network) diff --git a/pkg/network/netpoll/transport.go b/pkg/network/netpoll/transport.go index 17829cb83..72e69ca7f 100644 --- a/pkg/network/netpoll/transport.go +++ b/pkg/network/netpoll/transport.go @@ -38,31 +38,33 @@ func init() { type transporter struct { sync.RWMutex - network string - addr string - keepAliveTimeout time.Duration - readTimeout time.Duration - writeTimeout time.Duration - listener net.Listener - eventLoop netpoll.EventLoop - listenConfig *net.ListenConfig - OnAccept func(conn net.Conn) context.Context - OnConnect func(ctx context.Context, conn network.Conn) context.Context + network string + addr string + senseClientDisconnection bool + keepAliveTimeout time.Duration + readTimeout time.Duration + writeTimeout time.Duration + listener net.Listener + eventLoop netpoll.EventLoop + listenConfig *net.ListenConfig + OnAccept func(conn net.Conn) context.Context + OnConnect func(ctx context.Context, conn network.Conn) context.Context } // For transporter switch func NewTransporter(options *config.Options) network.Transporter { return &transporter{ - network: options.Network, - addr: options.Addr, - keepAliveTimeout: options.KeepAliveTimeout, - readTimeout: options.ReadTimeout, - writeTimeout: options.WriteTimeout, - listener: nil, - eventLoop: nil, - listenConfig: options.ListenConfig, - OnAccept: options.OnAccept, - OnConnect: options.OnConnect, + network: options.Network, + addr: options.Addr, + senseClientDisconnection: options.SenseClientDisconnection, + keepAliveTimeout: options.KeepAliveTimeout, + readTimeout: options.ReadTimeout, + writeTimeout: options.WriteTimeout, + listener: nil, + eventLoop: nil, + listenConfig: options.ListenConfig, + OnAccept: options.OnAccept, + OnConnect: options.OnConnect, } } @@ -97,6 +99,14 @@ func (t *transporter) ListenAndServe(onReq network.OnData) (err error) { if t.OnConnect != nil { opts = append(opts, netpoll.WithOnConnect(func(ctx context.Context, conn netpoll.Connection) context.Context { + if t.senseClientDisconnection { + ctx, cancel := context.WithCancel(ctx) + conn.AddCloseCallback(func(connection netpoll.Connection) error { + cancel() + return nil + }) + return t.OnConnect(ctx, newConn(conn)) + } return t.OnConnect(ctx, newConn(conn)) })) } From dee4aaa53c9ae125be263cd188ac51bad3d53947 Mon Sep 17 00:00:00 2001 From: chaoranz758 <2715584135@qq.com> Date: Mon, 16 Oct 2023 11:04:24 +0800 Subject: [PATCH 2/5] format --- .../server/binding/internal/decoder/decoder.go | 2 +- pkg/app/server/option.go | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/app/server/binding/internal/decoder/decoder.go b/pkg/app/server/binding/internal/decoder/decoder.go index 0bd13442a..bdc85071d 100644 --- a/pkg/app/server/binding/internal/decoder/decoder.go +++ b/pkg/app/server/binding/internal/decoder/decoder.go @@ -103,7 +103,7 @@ func GetReqDecoder(rt reflect.Type, byTag string, config *DecodeConfig) (Decoder }, needValidate, nil } -func getFieldDecoder(field reflect.StructField, index int, parentIdx []int, parentJSONName string, byTag string, config *DecodeConfig) ([]fieldDecoder, bool, error) { +func getFieldDecoder(field reflect.StructField, index int, parentIdx []int, parentJSONName, byTag string, config *DecodeConfig) ([]fieldDecoder, bool, error) { for field.Type.Kind() == reflect.Ptr { field.Type = field.Type.Elem() } diff --git a/pkg/app/server/option.go b/pkg/app/server/option.go index 0a03792f8..6af8ac423 100644 --- a/pkg/app/server/option.go +++ b/pkg/app/server/option.go @@ -335,15 +335,15 @@ func WithDisablePrintRoute(b bool) config.Option { // WithSenseClientDisconnection sets the ability to sense client disconnections. // If we don't set it, it will default to false. // There are three issues to note when using this option: -// 1. It only applies to netpoll. -// 2. It needs to be used in conjunction with WithOnAccept. -// Examples: +// 1. It only applies to netpoll. +// 2. It needs to be used in conjunction with WithOnAccept. +// Examples: // server.Default( -// server.WithSenseClientDisconnection(true), -// server.WithOnConnect(func(ctx context.Context, conn network.Conn) context.Context { -// return ctx -// })) -// 3. The cost is high after opening, please choose carefully. +// server.WithSenseClientDisconnection(true), +// server.WithOnConnect(func(ctx context.Context, conn network.Conn) context.Context { +// return ctx +// })) +// 3. The cost is high after opening, please choose carefully. func WithSenseClientDisconnection(b bool) config.Option { return config.Option{F: func(o *config.Options) { o.SenseClientDisconnection = b From 2b429a3fe3a3edbc7eab5a5349a6a28aee00333f Mon Sep 17 00:00:00 2001 From: chaoranz758 <2715584135@qq.com> Date: Mon, 16 Oct 2023 15:01:36 +0800 Subject: [PATCH 3/5] add ut --- pkg/network/netpoll/transport_test.go | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/pkg/network/netpoll/transport_test.go b/pkg/network/netpoll/transport_test.go index d8a06090c..270bea1a2 100644 --- a/pkg/network/netpoll/transport_test.go +++ b/pkg/network/netpoll/transport_test.go @@ -69,6 +69,33 @@ func TestTransport(t *testing.T) { assert.Assert(t, atomic.LoadInt32(&onDataFlag) == 1) }) + t.Run("TestSenseClientDisconnection", func(t *testing.T) { + var onConnFlag int32 + transporter := NewTransporter(&config.Options{ + Addr: addr, + Network: nw, + SenseClientDisconnection: true, + OnConnect: func(ctx context.Context, conn network.Conn) context.Context { + atomic.StoreInt32(&onConnFlag, 1) + return ctx + }, + }) + go transporter.ListenAndServe(func(ctx context.Context, conn interface{}) error { + return nil + }) + defer transporter.Close() + time.Sleep(100 * time.Millisecond) + + dial := NewDialer() + conn, err := dial.DialConnection(nw, addr, time.Second, nil) + assert.Nil(t, err) + _, err = conn.Write([]byte("456")) + assert.Nil(t, err) + time.Sleep(100 * time.Millisecond) + + assert.Assert(t, atomic.LoadInt32(&onConnFlag) == 1) + }) + t.Run("TestListenConfig", func(t *testing.T) { listenCfg := &net.ListenConfig{Control: func(network, address string, c syscall.RawConn) error { return c.Control(func(fd uintptr) { From 0b0e58dc5736688f4cf0a194ca9e7cceceb1db28 Mon Sep 17 00:00:00 2001 From: chaoranz758 <2715584135@qq.com> Date: Mon, 16 Oct 2023 18:03:57 +0800 Subject: [PATCH 4/5] add ut --- pkg/network/netpoll/transport_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/network/netpoll/transport_test.go b/pkg/network/netpoll/transport_test.go index 270bea1a2..6515b565c 100644 --- a/pkg/network/netpoll/transport_test.go +++ b/pkg/network/netpoll/transport_test.go @@ -21,6 +21,7 @@ package netpoll import ( "context" "net" + "reflect" "sync/atomic" "syscall" "testing" @@ -71,12 +72,14 @@ func TestTransport(t *testing.T) { t.Run("TestSenseClientDisconnection", func(t *testing.T) { var onConnFlag int32 + var ctxVal context.Context transporter := NewTransporter(&config.Options{ Addr: addr, Network: nw, SenseClientDisconnection: true, OnConnect: func(ctx context.Context, conn network.Conn) context.Context { atomic.StoreInt32(&onConnFlag, 1) + ctxVal = ctx return ctx }, }) @@ -94,6 +97,7 @@ func TestTransport(t *testing.T) { time.Sleep(100 * time.Millisecond) assert.Assert(t, atomic.LoadInt32(&onConnFlag) == 1) + assert.DeepEqual(t, "*context.cancelCtx", reflect.TypeOf(ctxVal).String()) }) t.Run("TestListenConfig", func(t *testing.T) { From 34ce7b2ab2f47381d4d9cde481433040f658284b Mon Sep 17 00:00:00 2001 From: chaoranz758 <2715584135@qq.com> Date: Mon, 16 Oct 2023 18:13:35 +0800 Subject: [PATCH 5/5] solove data race --- pkg/network/netpoll/transport_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/network/netpoll/transport_test.go b/pkg/network/netpoll/transport_test.go index 6515b565c..4c65802bd 100644 --- a/pkg/network/netpoll/transport_test.go +++ b/pkg/network/netpoll/transport_test.go @@ -22,6 +22,7 @@ import ( "context" "net" "reflect" + "sync" "sync/atomic" "syscall" "testing" @@ -73,12 +74,15 @@ func TestTransport(t *testing.T) { t.Run("TestSenseClientDisconnection", func(t *testing.T) { var onConnFlag int32 var ctxVal context.Context + var mutex sync.Mutex transporter := NewTransporter(&config.Options{ Addr: addr, Network: nw, SenseClientDisconnection: true, OnConnect: func(ctx context.Context, conn network.Conn) context.Context { atomic.StoreInt32(&onConnFlag, 1) + mutex.Lock() + defer mutex.Unlock() ctxVal = ctx return ctx }, @@ -97,6 +101,8 @@ func TestTransport(t *testing.T) { time.Sleep(100 * time.Millisecond) assert.Assert(t, atomic.LoadInt32(&onConnFlag) == 1) + mutex.Lock() + defer mutex.Unlock() assert.DeepEqual(t, "*context.cancelCtx", reflect.TypeOf(ctxVal).String()) })