diff --git a/blockpoller/poller_client_test.go b/blockpoller/poller_client_test.go index f34fbe4..8b68727 100644 --- a/blockpoller/poller_client_test.go +++ b/blockpoller/poller_client_test.go @@ -98,7 +98,7 @@ func (t TestBlockFetcherWithClient) Fetch(ctx context.Context, client *TestBlock } func TestPollerClient(t *testing.T) { - clients := rpc.NewClients[*TestBlockClient]() + clients := rpc.NewClients[*TestBlockClient](1*time.Second, rpc.NewRollingStrategyAlwaysUseFirst[*TestBlockClient]()) var blockItems1 []*TestBlockItem var blockItems2 []*TestBlockItem diff --git a/blockpoller/poller_test.go b/blockpoller/poller_test.go index 192b497..114f5e2 100644 --- a/blockpoller/poller_test.go +++ b/blockpoller/poller_test.go @@ -167,7 +167,7 @@ func TestForkHandler_run(t *testing.T) { blockFetcher := newTestBlockFetcher[any](t, tt.blocks) blockFinalizer := newTestBlockFinalizer(t, tt.expectFireBlock) - clients := rpc.NewClients[any](1 * time.Second) + clients := rpc.NewClients[any](1*time.Second, rpc.NewRollingStrategyAlwaysUseFirst[any]()) clients.Add(new(any)) poller := New(blockFetcher, blockFinalizer, clients) diff --git a/blockpoller/state_file_test.go b/blockpoller/state_file_test.go index 2a85523..919d0cb 100644 --- a/blockpoller/state_file_test.go +++ b/blockpoller/state_file_test.go @@ -4,6 +4,7 @@ import ( "os" "path/filepath" "testing" + "time" "github.com/streamingfast/bstream" "github.com/streamingfast/bstream/forkable" @@ -75,7 +76,7 @@ func TestFireBlockFinalizer_noSstate(t *testing.T) { defer os.Remove(dirName) blockFetcher := newTestBlockFetcher[any](t, []*TestBlock{tb("60a", "59a", 60)}) - clients := rpc.NewClients[any]() + clients := rpc.NewClients[any](1*time.Second, rpc.NewRollingStrategyAlwaysUseFirst[any]()) clients.Add(new(any)) poller := &BlockPoller[any]{ diff --git a/rpc/client_test.go b/rpc/client_test.go new file mode 100644 index 0000000..4900a04 --- /dev/null +++ b/rpc/client_test.go @@ -0,0 +1,51 @@ +package rpc + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type rollClient struct { + callCount int + name string +} + +func TestRollingStrategy(t *testing.T) { + + rollingStrategy := NewRollingStrategyRoundRobin[*rollClient]() + rollingStrategy.reset() + + clients := NewClients(2*time.Second, rollingStrategy) + clients.Add(&rollClient{name: "c.1"}) + clients.Add(&rollClient{name: "c.2"}) + clients.Add(&rollClient{name: "c.3"}) + clients.Add(&rollClient{name: "c.a"}) + clients.Add(&rollClient{name: "c.b"}) + + var clientNames []string + _, err := WithClients(clients, func(ctx context.Context, client *rollClient) (v any, err error) { + clientNames = append(clientNames, client.name) + if client.name == "c.3" { + return nil, nil + } + + return nil, fmt.Errorf("next please") + }) + + require.NoError(t, err) + //require.ErrorIs(t, err, ErrorNoMoreClient) + require.Equal(t, []string{"c.1", "c.2", "c.3"}, clientNames) + + _, err = WithClients(clients, func(ctx context.Context, client *rollClient) (v any, err error) { + clientNames = append(clientNames, client.name) + return nil, fmt.Errorf("next please") + }) + + require.ErrorIs(t, err, ErrorNoMoreClient) + require.Equal(t, []string{"c.1", "c.2", "c.3", "c.3", "c.a", "c.b", "c.1", "c.2"}, clientNames) + +} diff --git a/rpc/clients.go b/rpc/clients.go index 2ce27ff..66e6986 100644 --- a/rpc/clients.go +++ b/rpc/clients.go @@ -12,14 +12,14 @@ var ErrorNoMoreClient = errors.New("no more clients") type Clients[C any] struct { clients []C - next int maxBlockFetchDuration time.Duration + rollingStrategy RollingStrategy[C] } -func NewClients[C any](maxBlockFetchDuration time.Duration) *Clients[C] { +func NewClients[C any](maxBlockFetchDuration time.Duration, rollingStrategy RollingStrategy[C]) *Clients[C] { return &Clients[C]{ - next: 0, maxBlockFetchDuration: maxBlockFetchDuration, + rollingStrategy: rollingStrategy, } } @@ -27,32 +27,120 @@ func (c *Clients[C]) Add(client C) { c.clients = append(c.clients, client) } -func (c *Clients[C]) Next() (client C, err error) { - if len(c.clients) <= c.next { - return client, ErrorNoMoreClient - } - client = c.clients[c.next] - c.next++ - return client, nil -} - func WithClients[C any, V any](clients *Clients[C], f func(context.Context, C) (v V, err error)) (v V, err error) { - clients.next = 0 var errs error + + clients.rollingStrategy.reset() + client, err := clients.rollingStrategy.next(clients) + if err != nil { + errs = multierror.Append(errs, err) + return v, errs + } + for { - client, err := clients.Next() - if err != nil { - errs = multierror.Append(errs, err) - return v, errs - } + ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, clients.maxBlockFetchDuration) + v, err := f(ctx, client) cancel() + if err != nil { errs = multierror.Append(errs, err) + client, err = clients.rollingStrategy.next(clients) + if err != nil { + errs = multierror.Append(errs, err) + return v, errs + } + continue } return v, nil } } + +type RollingStrategy[C any] interface { + reset() + next(clients *Clients[C]) (C, error) +} + +type RollingStrategyRoundRobin[C any] struct { + fistCallToNewClient bool + usedClientCount int + nextClientIndex int +} + +func NewRollingStrategyRoundRobin[C any]() RollingStrategy[C] { + return &RollingStrategyRoundRobin[C]{ + fistCallToNewClient: true, + } +} + +func (s *RollingStrategyRoundRobin[C]) reset() { + s.usedClientCount = 0 +} +func (s *RollingStrategyRoundRobin[C]) next(clients *Clients[C]) (client C, err error) { + if len(clients.clients) == s.usedClientCount { + return client, ErrorNoMoreClient + } + + if s.fistCallToNewClient { + s.fistCallToNewClient = false + client = clients.clients[0] + s.usedClientCount = s.usedClientCount + 1 + s.nextClientIndex = s.nextClientIndex + 1 + return client, nil + } + + if s.nextClientIndex == len(clients.clients) { //roll to 1st client + s.nextClientIndex = 0 + } + + if s.usedClientCount == 0 { //just been reset + s.nextClientIndex = s.prevIndex(clients) + client = clients.clients[s.nextClientIndex] + s.usedClientCount = s.usedClientCount + 1 + s.nextClientIndex = s.nextClientIndex + 1 + return client, nil + } + + if s.nextClientIndex == len(clients.clients) { //roll to 1st client + client = clients.clients[0] + s.usedClientCount = s.usedClientCount + 1 + return client, nil + } + + client = clients.clients[s.nextClientIndex] + s.usedClientCount = s.usedClientCount + 1 + s.nextClientIndex = s.nextClientIndex + 1 + return client, nil +} + +func (s *RollingStrategyRoundRobin[C]) prevIndex(clients *Clients[C]) int { + if s.nextClientIndex == 0 { + return len(clients.clients) - 1 + } + return s.nextClientIndex - 1 +} + +type RollingStrategyAlwaysUseFirst[C any] struct { + nextIndex int +} + +func NewRollingStrategyAlwaysUseFirst[C any]() *RollingStrategyAlwaysUseFirst[C] { + return &RollingStrategyAlwaysUseFirst[C]{} +} + +func (s *RollingStrategyAlwaysUseFirst[C]) reset() { + s.nextIndex = 0 +} + +func (s *RollingStrategyAlwaysUseFirst[C]) next(c *Clients[C]) (client C, err error) { + if len(c.clients) <= s.nextIndex { + return client, ErrorNoMoreClient + } + client = c.clients[s.nextIndex] + s.nextIndex++ + return client, nil + +}