From db5318b356b939e46e0556a2fd5e454fc793e051 Mon Sep 17 00:00:00 2001 From: Yevhen Date: Thu, 28 Nov 2024 12:13:23 +0200 Subject: [PATCH] rpcclient: safe read and write to batch --- rpcclient/infrastructure.go | 40 +++++++++++++++++++++++++++---------- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/rpcclient/infrastructure.go b/rpcclient/infrastructure.go index ca039023f1..9a9c2bb878 100644 --- a/rpcclient/infrastructure.go +++ b/rpcclient/infrastructure.go @@ -74,6 +74,9 @@ var ( // client having already connected to the RPC server. ErrClientAlreadyConnected = errors.New("websocket client has already " + "connected") + + // ErrEmptyBatch is an error to describe that there is nothing to send. + ErrEmptyBatch = errors.New("batch is empty") ) const ( @@ -151,6 +154,7 @@ type Client struct { // whether or not to batch requests, false unless changed by Batch() batch bool + batchLock sync.Mutex batchList *list.List // retryCount holds the number of times the client has tried to @@ -214,7 +218,10 @@ func (c *Client) addRequest(jReq *jsonRequest) error { element := c.requestList.PushBack(jReq) c.requestMap[jReq.id] = element } else { + c.batchLock.Lock() element := c.batchList.PushBack(jReq) + c.batchLock.Unlock() + c.requestMap[jReq.id] = element } return nil @@ -238,7 +245,9 @@ func (c *Client) removeRequest(id uint64) *jsonRequest { var request *jsonRequest if c.batch { + c.batchLock.Lock() request = c.batchList.Remove(element).(*jsonRequest) + c.batchLock.Unlock() } else { request = c.requestList.Remove(element).(*jsonRequest) } @@ -1672,7 +1681,15 @@ func (c *Client) BackendVersion() (BackendVersion, error) { return c.backendVersion, nil } -func (c *Client) sendAsync() FutureGetBulkResult { +func (c *Client) sendAsync() (FutureGetBulkResult, error) { + c.batchLock.Lock() + defer c.batchLock.Unlock() + + // If batchList is empty, there's nothing to send. + if c.batchList.Len() == 0 { + return nil, ErrEmptyBatch + } + // convert the array of marshalled json requests to a single request we can send responseChan := make(chan *Response, 1) marshalledRequest := []byte("[") @@ -1694,25 +1711,24 @@ func (c *Client) sendAsync() FutureGetBulkResult { responseChan: responseChan, } c.sendPostRequest(&request) - return responseChan + return responseChan, nil } // Marshall's bulk requests and sends to the server // creates a response channel to receive the response func (c *Client) Send() error { - // if batchlist is empty, there's nothing to send - if c.batchList.Len() == 0 { - return nil + future, err := c.sendAsync() + if err != nil { + return err } - batchResp, err := c.sendAsync().Receive() + batchResp, err := future.Receive() if err != nil { // Clear batchlist in case of an error. - // - // TODO(yy): need to double check to make sure there's no - // concurrent access to this batch list, otherwise we may miss - // some batched requests. + + c.batchLock.Lock() c.batchList = list.New() + c.batchLock.Unlock() return err } @@ -1722,6 +1738,10 @@ func (c *Client) Send() error { // Perform a GC on batchList and requestMap before moving // forward. request := c.removeRequest(id) + if request == nil { + // Perhaps another goroutine has already processed this request. + continue + } // If there's an error, we log it and continue to the next // request.