From df5697a6bf7df1cabd48634dbf79c34336afc45d Mon Sep 17 00:00:00 2001 From: colindickson Date: Mon, 11 Mar 2024 12:14:19 -0400 Subject: [PATCH] refactor of service overload logic --- service/tier2.go | 46 +++++++++++++++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 15 deletions(-) diff --git a/service/tier2.go b/service/tier2.go index bd683583f..0c28dae14 100644 --- a/service/tier2.go +++ b/service/tier2.go @@ -120,31 +120,47 @@ func NewTier2( return s, nil } +func (s *Tier2Service) isOverloaded() bool { + s.connectionCountMutex.RLock() + defer s.connectionCountMutex.RUnlock() + return s.runtimeConfig.MaxConcurrentRequests != 0 && s.currentConcurrentRequests >= s.runtimeConfig.MaxConcurrentRequests +} + +func (s *Tier2Service) incrementConcurrentRequests() { + s.connectionCountMutex.Lock() + defer s.connectionCountMutex.Unlock() + + s.currentConcurrentRequests++ + s.setOverloaded() +} + +func (s *Tier2Service) decrementConcurrentRequests() { + s.connectionCountMutex.Lock() + defer s.connectionCountMutex.Unlock() + + s.currentConcurrentRequests-- + s.setOverloaded() +} + +func (s *Tier2Service) setOverloaded() { + overloaded := s.runtimeConfig.MaxConcurrentRequests != 0 && s.currentConcurrentRequests >= s.runtimeConfig.MaxConcurrentRequests + s.setReadyFunc(!overloaded) +} + func (s *Tier2Service) ProcessRange(request *pbssinternal.ProcessRangeRequest, streamSrv pbssinternal.Substreams_ProcessRangeServer) (grpcError error) { // We keep `err` here as the unaltered error from `blocks` call, this is used in the EndSpan to record the full error // and not only the `grpcError` one which is a subset view of the full `err`. var err error ctx := streamSrv.Context() - overloaded := true - s.connectionCountMutex.Lock() - if s.runtimeConfig.MaxConcurrentRequests == 0 || s.currentConcurrentRequests < s.runtimeConfig.MaxConcurrentRequests { - overloaded = false - } - if overloaded { - defer s.connectionCountMutex.Unlock() - return connect.NewError(connect.CodeUnavailable, fmt.Errorf("service overloaded")) + if s.isOverloaded() { + return connect.NewError(connect.CodeUnavailable, fmt.Errorf("service currently overloaded")) } - s.currentConcurrentRequests++ - s.setReadyFunc(s.runtimeConfig.MaxConcurrentRequests == 0 || s.currentConcurrentRequests <= s.runtimeConfig.MaxConcurrentRequests) + s.incrementConcurrentRequests() defer func() { - s.connectionCountMutex.Lock() - s.currentConcurrentRequests-- - s.setReadyFunc(s.runtimeConfig.MaxConcurrentRequests == 0 || s.currentConcurrentRequests < s.runtimeConfig.MaxConcurrentRequests) - s.connectionCountMutex.Unlock() + s.decrementConcurrentRequests() }() - s.connectionCountMutex.Unlock() // TODO: use stage and segment numbers when implemented stage := request.OutputModule