diff --git a/example/dubbo/go-client/cmd/client.go b/example/dubbo/go-client/cmd/client.go index 0683d5c..9974f4c 100644 --- a/example/dubbo/go-client/cmd/client.go +++ b/example/dubbo/go-client/cmd/client.go @@ -20,6 +20,9 @@ package main import ( "context" "fmt" + "net/http" + _ "net/http/pprof" + "runtime" "sync" "time" ) @@ -47,10 +50,13 @@ var greeterProvider = new(pkg.GreeterClientImpl) func init() { config.SetConsumerService(greeterProvider) + runtime.SetMutexProfileFraction(1) } - // need to setup environment variable "CONF_CONSUMER_FILE_PATH" to "conf/client.yml" before run func main() { + go func() { + _ = http.ListenAndServe("0.0.0.0:6061", nil) + }() config.Load() time.Sleep(time.Second * 3) testSayHello() @@ -94,7 +100,7 @@ func testSayHelloWithHighParallel() { wg := sync.WaitGroup{} goodCounter := atomic.Uint32{} badCounter := atomic.Uint32{} - for i := 0; i < 1000; i ++{ + for i := 0; i < 2000; i ++{ wg.Add(1) go func() { defer wg.Done() diff --git a/example/dubbo/go-server/cmd/server.go b/example/dubbo/go-server/cmd/server.go index 865b87d..5e8f811 100644 --- a/example/dubbo/go-server/cmd/server.go +++ b/example/dubbo/go-server/cmd/server.go @@ -19,8 +19,11 @@ package main import ( "fmt" + "net/http" + _ "net/http/pprof" "os" "os/signal" + "runtime" "syscall" "time" ) @@ -36,20 +39,28 @@ import ( _ "dubbo.apache.org/dubbo-go/v3/registry/nacos" _ "dubbo.apache.org/dubbo-go/v3/registry/protocol" _ "dubbo.apache.org/dubbo-go/v3/registry/zookeeper" + + _ "github.com/dubbogo/triple/pkg/triple" ) import ( "github.com/dubbogo/triple/example/dubbo/go-server/pkg" - _ "github.com/dubbogo/triple/pkg/triple" ) var ( survivalTimeout = int(3 * time.Second) ) +func init() { + runtime.SetMutexProfileFraction(1) +} + // need to setup environment variable "CONF_PROVIDER_FILE_PATH" to "conf/server.yml" before run func main() { config.SetProviderService(pkg.NewGreeterProvider()) + go func() { + _ = http.ListenAndServe("0.0.0.0:6060", nil) + }() config.Load() initSignal() } diff --git a/example/go.mod b/example/go.mod index 34be402..b9bfa4d 100644 --- a/example/go.mod +++ b/example/go.mod @@ -4,7 +4,6 @@ go 1.13 require ( dubbo.apache.org/dubbo-go/v3 v3.0.0-rc2 - github.com/dubbogo/gost v1.11.16 github.com/dubbogo/triple v1.0.1 github.com/golang/protobuf v1.5.2 github.com/stretchr/testify v1.7.0 diff --git a/pkg/http2/client.go b/pkg/http2/client.go index 1908dd5..51dae08 100644 --- a/pkg/http2/client.go +++ b/pkg/http2/client.go @@ -2,6 +2,7 @@ package http2 import ( "bytes" + "context" "crypto/tls" "net" "net/http" @@ -84,7 +85,7 @@ func (h *Client) StreamPost(addr, path string, sendChan chan *bytes.Buffer, opts close(closeChan) return } - ch := readSplitData(rsp.Body) + ch := readSplitData(context.Background(), rsp.Body) Loop: for { select { diff --git a/pkg/http2/server.go b/pkg/http2/server.go index 8b9f210..29b3b7a 100644 --- a/pkg/http2/server.go +++ b/pkg/http2/server.go @@ -2,6 +2,7 @@ package http2 import ( "bytes" + "context" "encoding/binary" "fmt" "io" @@ -179,7 +180,7 @@ func skipHeader(frameData []byte) ([]byte, uint32) { return frameData[5:], length } -func readSplitData(rBody io.ReadCloser) chan *bytes.Buffer { +func readSplitData(ctx context.Context, rBody io.ReadCloser) chan *bytes.Buffer { cbm := make(chan *bytes.Buffer) go func() { buf := make([]byte, 4098) // todo configurable @@ -224,7 +225,14 @@ func readSplitData(rBody io.ReadCloser) chan *bytes.Buffer { if err != nil { fmt.Printf("read SplitedDatas error = %v\n", err) } - cbm <- bytes.NewBuffer(allDataBody) + select { + case <-ctx.Done(): + close(cbm) + return + default: + cbm <- bytes.NewBuffer(allDataBody) + } + // temp data is sent, and reset wanting data size fromFrameHeaderDataSize = 0 } @@ -236,7 +244,15 @@ func readSplitData(rBody io.ReadCloser) chan *bytes.Buffer { func (s *Server) http2HandleFunction(wi http.ResponseWriter, r *http.Request) { // body data from http - bodyCh := readSplitData(r.Body) + ctx, cancel := context.WithCancel(context.Background()) + bodyCh := readSplitData(ctx, r.Body) + defer func() { + cancel() + select { + case <-bodyCh: + default: + } + }() sendChan := make(chan *bytes.Buffer) ctrlChan := make(chan http.Header) errChan := make(chan interface{})