Skip to content

Commit

Permalink
refactor of service overload logic
Browse files Browse the repository at this point in the history
  • Loading branch information
colindickson committed Mar 11, 2024
1 parent c21aca1 commit df5697a
Showing 1 changed file with 31 additions and 15 deletions.
46 changes: 31 additions & 15 deletions service/tier2.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,31 +120,47 @@ func NewTier2(
return s, nil
}

func (s *Tier2Service) isOverloaded() bool {
s.connectionCountMutex.RLock()
defer s.connectionCountMutex.RUnlock()
return s.runtimeConfig.MaxConcurrentRequests != 0 && s.currentConcurrentRequests >= s.runtimeConfig.MaxConcurrentRequests
}

func (s *Tier2Service) incrementConcurrentRequests() {
s.connectionCountMutex.Lock()
defer s.connectionCountMutex.Unlock()

s.currentConcurrentRequests++
s.setOverloaded()
}

func (s *Tier2Service) decrementConcurrentRequests() {
s.connectionCountMutex.Lock()
defer s.connectionCountMutex.Unlock()

s.currentConcurrentRequests--
s.setOverloaded()
}

func (s *Tier2Service) setOverloaded() {
overloaded := s.runtimeConfig.MaxConcurrentRequests != 0 && s.currentConcurrentRequests >= s.runtimeConfig.MaxConcurrentRequests
s.setReadyFunc(!overloaded)
}

func (s *Tier2Service) ProcessRange(request *pbssinternal.ProcessRangeRequest, streamSrv pbssinternal.Substreams_ProcessRangeServer) (grpcError error) {
// We keep `err` here as the unaltered error from `blocks` call, this is used in the EndSpan to record the full error
// and not only the `grpcError` one which is a subset view of the full `err`.
var err error
ctx := streamSrv.Context()

overloaded := true
s.connectionCountMutex.Lock()
if s.runtimeConfig.MaxConcurrentRequests == 0 || s.currentConcurrentRequests < s.runtimeConfig.MaxConcurrentRequests {
overloaded = false
}
if overloaded {
defer s.connectionCountMutex.Unlock()
return connect.NewError(connect.CodeUnavailable, fmt.Errorf("service overloaded"))
if s.isOverloaded() {
return connect.NewError(connect.CodeUnavailable, fmt.Errorf("service currently overloaded"))
}

s.currentConcurrentRequests++
s.setReadyFunc(s.runtimeConfig.MaxConcurrentRequests == 0 || s.currentConcurrentRequests <= s.runtimeConfig.MaxConcurrentRequests)
s.incrementConcurrentRequests()
defer func() {
s.connectionCountMutex.Lock()
s.currentConcurrentRequests--
s.setReadyFunc(s.runtimeConfig.MaxConcurrentRequests == 0 || s.currentConcurrentRequests < s.runtimeConfig.MaxConcurrentRequests)
s.connectionCountMutex.Unlock()
s.decrementConcurrentRequests()
}()
s.connectionCountMutex.Unlock()

// TODO: use stage and segment numbers when implemented
stage := request.OutputModule
Expand Down

0 comments on commit df5697a

Please sign in to comment.