Skip to content

Commit

Permalink
Add sorting functionality and new rolling strategies
Browse files Browse the repository at this point in the history
Introduced a sorting mechanism for clients in the `rpc` package. Added interfaces and implementations for sticky rolling strategy, along with comprehensive test cases. Improved thread safety and replaced the old rolling strategy implementations.
  • Loading branch information
billettc committed Nov 26, 2024
1 parent 0f91a00 commit 76469b5
Show file tree
Hide file tree
Showing 8 changed files with 275 additions and 139 deletions.
4 changes: 4 additions & 0 deletions blockpoller/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,7 @@ type BlockFetcher[C any] interface {
IsBlockAvailable(requestedSlot uint64) bool
Fetch(ctx context.Context, client C, blkNum uint64) (b *pbbstream.Block, skipped bool, err error)
}

type HeadBlockNumberFetcher[C any] interface {
FetchHeadBlockNumber(ctx context.Context, client C) (uint64, error)
}
2 changes: 0 additions & 2 deletions blockpoller/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ func New[C any](
return b
}

var MaxStopBlock *uint64 = nil

func (p *BlockPoller[C]) Run(firstStreamableBlockNum uint64, stopBlock *uint64, blockFetchBatchSize int) error {
p.startBlockNumGate = firstStreamableBlockNum
p.logger.Info("starting poller",
Expand Down
50 changes: 0 additions & 50 deletions rpc/client_test.go
Original file line number Diff line number Diff line change
@@ -1,51 +1 @@
package rpc

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
)

type rollClient struct {
callCount int
name string
}

func TestRollingStrategy(t *testing.T) {

rollingStrategy := NewRollingStrategyRoundRobin[*rollClient]()
rollingStrategy.reset()

clients := NewClients(2*time.Second, rollingStrategy)
clients.Add(&rollClient{name: "c.1"})
clients.Add(&rollClient{name: "c.2"})
clients.Add(&rollClient{name: "c.3"})
clients.Add(&rollClient{name: "c.a"})
clients.Add(&rollClient{name: "c.b"})

var clientNames []string
_, err := WithClients(clients, func(ctx context.Context, client *rollClient) (v any, err error) {
clientNames = append(clientNames, client.name)
if client.name == "c.3" {
return nil, nil
}

return nil, fmt.Errorf("next please")
})

require.NoError(t, err)
//require.ErrorIs(t, err, ErrorNoMoreClient)
require.Equal(t, []string{"c.1", "c.2", "c.3"}, clientNames)

_, err = WithClients(clients, func(ctx context.Context, client *rollClient) (v any, err error) {
clientNames = append(clientNames, client.name)
return nil, fmt.Errorf("next please")
})

require.ErrorIs(t, err, ErrorNoMoreClient)
require.Equal(t, []string{"c.1", "c.2", "c.3", "c.3", "c.a", "c.b", "c.1", "c.2"}, clientNames)

}
110 changes: 23 additions & 87 deletions rpc/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package rpc
import (
"context"
"errors"
"sync"
"time"

"github.com/hashicorp/go-multierror"
"go.uber.org/zap"
)

var ErrorNoMoreClient = errors.New("no more clients")
Expand All @@ -14,20 +16,40 @@ type Clients[C any] struct {
clients []C
maxBlockFetchDuration time.Duration
rollingStrategy RollingStrategy[C]
lock sync.Mutex
logger *zap.Logger
}

func NewClients[C any](maxBlockFetchDuration time.Duration, rollingStrategy RollingStrategy[C]) *Clients[C] {
func NewClients[C any](maxBlockFetchDuration time.Duration, rollingStrategy RollingStrategy[C], logger *zap.Logger) *Clients[C] {
return &Clients[C]{
maxBlockFetchDuration: maxBlockFetchDuration,
rollingStrategy: rollingStrategy,
logger: logger,
}
}

func (c *Clients[C]) StartSorting(ctx context.Context, direction SortDirection, every time.Duration) {
go func() {
for {
c.logger.Info("sorting clients")
err := Sort(ctx, c, direction)
if err != nil {
c.logger.Warn("sorting", zap.Error(err))
}
time.Sleep(every)
}
}()
}

func (c *Clients[C]) Add(client C) {
c.lock.Lock()
defer c.lock.Unlock()
c.clients = append(c.clients, client)
}

func WithClients[C any, V any](clients *Clients[C], f func(context.Context, C) (v V, err error)) (v V, err error) {
clients.lock.Lock()
defer clients.lock.Unlock()
var errs error

clients.rollingStrategy.reset()
Expand Down Expand Up @@ -58,89 +80,3 @@ func WithClients[C any, V any](clients *Clients[C], f func(context.Context, C) (
return v, nil
}
}

type RollingStrategy[C any] interface {
reset()
next(clients *Clients[C]) (C, error)
}

type RollingStrategyRoundRobin[C any] struct {
fistCallToNewClient bool
usedClientCount int
nextClientIndex int
}

func NewRollingStrategyRoundRobin[C any]() RollingStrategy[C] {
return &RollingStrategyRoundRobin[C]{
fistCallToNewClient: true,
}
}

func (s *RollingStrategyRoundRobin[C]) reset() {
s.usedClientCount = 0
}
func (s *RollingStrategyRoundRobin[C]) next(clients *Clients[C]) (client C, err error) {
if len(clients.clients) == s.usedClientCount {
return client, ErrorNoMoreClient
}

if s.fistCallToNewClient {
s.fistCallToNewClient = false
client = clients.clients[0]
s.usedClientCount = s.usedClientCount + 1
s.nextClientIndex = s.nextClientIndex + 1
return client, nil
}

if s.nextClientIndex == len(clients.clients) { //roll to 1st client
s.nextClientIndex = 0
}

if s.usedClientCount == 0 { //just been reset
s.nextClientIndex = s.prevIndex(clients)
client = clients.clients[s.nextClientIndex]
s.usedClientCount = s.usedClientCount + 1
s.nextClientIndex = s.nextClientIndex + 1
return client, nil
}

if s.nextClientIndex == len(clients.clients) { //roll to 1st client
client = clients.clients[0]
s.usedClientCount = s.usedClientCount + 1
return client, nil
}

client = clients.clients[s.nextClientIndex]
s.usedClientCount = s.usedClientCount + 1
s.nextClientIndex = s.nextClientIndex + 1
return client, nil
}

func (s *RollingStrategyRoundRobin[C]) prevIndex(clients *Clients[C]) int {
if s.nextClientIndex == 0 {
return len(clients.clients) - 1
}
return s.nextClientIndex - 1
}

type RollingStrategyAlwaysUseFirst[C any] struct {
nextIndex int
}

func NewRollingStrategyAlwaysUseFirst[C any]() *RollingStrategyAlwaysUseFirst[C] {
return &RollingStrategyAlwaysUseFirst[C]{}
}

func (s *RollingStrategyAlwaysUseFirst[C]) reset() {
s.nextIndex = 0
}

func (s *RollingStrategyAlwaysUseFirst[C]) next(c *Clients[C]) (client C, err error) {
if len(c.clients) <= s.nextIndex {
return client, ErrorNoMoreClient
}
client = c.clients[s.nextIndex]
s.nextIndex++
return client, nil

}
96 changes: 96 additions & 0 deletions rpc/rolling_strategy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package rpc

type RollingStrategy[C any] interface {
reset()
next(clients *Clients[C]) (C, error)
}

type StickyRollingStrategy[C any] struct {
fistCallToNewClient bool
usedClientCount int
nextClientIndex int
}

func NewStickyRollingStrategy[C any]() *StickyRollingStrategy[C] {
return &StickyRollingStrategy[C]{
fistCallToNewClient: true,
}
}

func (s *StickyRollingStrategy[C]) reset() {
s.usedClientCount = 0
}
func (s *StickyRollingStrategy[C]) next(clients *Clients[C]) (client C, err error) {
clients.lock.Lock()
defer clients.lock.Unlock()

if len(clients.clients) == s.usedClientCount {
return client, ErrorNoMoreClient
}

if s.fistCallToNewClient {
s.fistCallToNewClient = false
client = clients.clients[0]
s.usedClientCount = s.usedClientCount + 1
s.nextClientIndex = s.nextClientIndex + 1
return client, nil
}

if s.nextClientIndex == len(clients.clients) { //roll to 1st client
s.nextClientIndex = 0
}

if s.usedClientCount == 0 { //just been reset
s.nextClientIndex = s.prevIndex(clients)
client = clients.clients[s.nextClientIndex]
s.usedClientCount = s.usedClientCount + 1
s.nextClientIndex = s.nextClientIndex + 1
return client, nil
}

if s.nextClientIndex == len(clients.clients) { //roll to 1st client
client = clients.clients[0]
s.usedClientCount = s.usedClientCount + 1
return client, nil
}

client = clients.clients[s.nextClientIndex]
s.usedClientCount = s.usedClientCount + 1
s.nextClientIndex = s.nextClientIndex + 1
return client, nil
}

func (s *StickyRollingStrategy[C]) prevIndex(clients *Clients[C]) int {
clients.lock.Lock()
defer clients.lock.Unlock()

if s.nextClientIndex == 0 {
return len(clients.clients) - 1
}
return s.nextClientIndex - 1
}

type RollingStrategyAlwaysUseFirst[C any] struct {
nextIndex int
}

func NewRollingStrategyAlwaysUseFirst[C any]() *RollingStrategyAlwaysUseFirst[C] {
return &RollingStrategyAlwaysUseFirst[C]{}
}

func (s *RollingStrategyAlwaysUseFirst[C]) reset() {
s.nextIndex = 0
}

func (s *RollingStrategyAlwaysUseFirst[C]) next(c *Clients[C]) (client C, err error) {
c.lock.Lock()
defer c.lock.Unlock()

if len(c.clients) <= s.nextIndex {
return client, ErrorNoMoreClient
}
client = c.clients[s.nextIndex]
s.nextIndex++
return client, nil

}
56 changes: 56 additions & 0 deletions rpc/rolling_strategy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package rpc

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
)

type rollClient struct {
callCount int
name string
sortValue uint64
}

func (r *rollClient) fetchSortValue(ctx context.Context) (sortValue uint64, err error) {
return r.sortValue, nil
}

func TestStickyRollingStrategy(t *testing.T) {

rollingStrategy := NewStickyRollingStrategy[*rollClient]()
rollingStrategy.reset()

clients := NewClients(2*time.Second, rollingStrategy)
clients.Add(&rollClient{name: "c.1"})
clients.Add(&rollClient{name: "c.2"})
clients.Add(&rollClient{name: "c.3"})
clients.Add(&rollClient{name: "c.a"})
clients.Add(&rollClient{name: "c.b"})

var clientNames []string
_, err := WithClients(clients, func(ctx context.Context, client *rollClient) (v any, err error) {
clientNames = append(clientNames, client.name)
if client.name == "c.3" {
return nil, nil
}

return nil, fmt.Errorf("next please")
})

require.NoError(t, err)
//require.ErrorIs(t, err, ErrorNoMoreClient)
require.Equal(t, []string{"c.1", "c.2", "c.3"}, clientNames)

_, err = WithClients(clients, func(ctx context.Context, client *rollClient) (v any, err error) {
clientNames = append(clientNames, client.name)
return nil, fmt.Errorf("next please")
})

require.ErrorIs(t, err, ErrorNoMoreClient)
require.Equal(t, []string{"c.1", "c.2", "c.3", "c.3", "c.a", "c.b", "c.1", "c.2"}, clientNames)

}
Loading

0 comments on commit 76469b5

Please sign in to comment.