diff --git a/src/xutil/xhttp/request.go b/src/xutil/xhttp/request.go index 113600b..4875338 100644 --- a/src/xutil/xhttp/request.go +++ b/src/xutil/xhttp/request.go @@ -2,6 +2,7 @@ package xhttp import ( "bytes" + "errors" "github.com/mix-go/xutil/xconv" "io" "net/http" @@ -9,6 +10,12 @@ import ( "time" ) +var ( + ErrAbortRetry = errors.New("xhttp: abort further retries") + + ErrShutdown = errors.New("xhttp: service is currently being shutdown and will no longer accept new requests") +) + type XRequest struct { *http.Request @@ -110,17 +117,26 @@ func Do(req *http.Request, opts ...RequestOption) (*XResponse, error) { return doRequest(o, newXRequest(req)) } -func doRequest(opts *requestOptions, req *XRequest) (*XResponse, error) { +func doRequest(opts *requestOptions, xReq *XRequest) (*XResponse, error) { + if !shutdownController.BeginRequest() { + return nil, ErrShutdown + } + defer shutdownController.EndRequest() + cli := http.Client{ Timeout: opts.Timeout, } startTime := time.Now() - r, err := cli.Do(req.Request) + r, err := cli.Do(xReq.Request) if err != nil { - doDebug(opts, time.Now().Sub(startTime), req, nil, err) + doDebug(opts, time.Now().Sub(startTime), xReq, nil, err) return nil, err } xResp := newXResponse(r) - doDebug(opts, time.Now().Sub(startTime), req, xResp, nil) + doDebug(opts, time.Now().Sub(startTime), xReq, xResp, nil) return xResp, nil } + +func Shutdown() { + shutdownController.InitiateShutdown() +} diff --git a/src/xutil/xhttp/retry.go b/src/xutil/xhttp/retry.go index a1a8f57..4f3f9de 100644 --- a/src/xutil/xhttp/retry.go +++ b/src/xutil/xhttp/retry.go @@ -6,8 +6,6 @@ import ( "github.com/avast/retry-go" ) -var ErrAbortRetry = errors.New("xhttp: abort further retries") - type RetryIfFunc func(*XResponse, error) error type Error []error diff --git a/src/xutil/xhttp/shutdownctrl.go b/src/xutil/xhttp/shutdownctrl.go new file mode 100644 index 0000000..9d7370d --- /dev/null +++ b/src/xutil/xhttp/shutdownctrl.go @@ -0,0 +1,34 @@ +package xhttp + +import ( + "sync" + "sync/atomic" +) + +var shutdownController = NewShutdownController() + +type ShutdownController struct { + shutdownFlag int32 // 原子标记,表示是否进入停机状态 + waitGroup sync.WaitGroup // 用于等待所有处理中的请求完成 +} + +func NewShutdownController() *ShutdownController { + return &ShutdownController{} +} + +func (sc *ShutdownController) BeginRequest() bool { + if atomic.LoadInt32(&sc.shutdownFlag) == 1 { + return false + } + sc.waitGroup.Add(1) + return true +} + +func (sc *ShutdownController) EndRequest() { + sc.waitGroup.Done() +} + +func (sc *ShutdownController) InitiateShutdown() { + atomic.StoreInt32(&sc.shutdownFlag, 1) + sc.waitGroup.Wait() +}