Skip to content

Commit

Permalink
Add rolling strategy support to RPC clients
Browse files Browse the repository at this point in the history
Implemented new rolling strategy interfaces and updated RPC clients to utilize the rolling strategies. Introduced two strategies: RoundRobin and AlwaysUseFirst, modifying existing tests to incorporate these changes.
  • Loading branch information
billettc committed Nov 18, 2024
1 parent 2bcd5c5 commit cf7a94a
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 21 deletions.
2 changes: 1 addition & 1 deletion blockpoller/poller_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (t TestBlockFetcherWithClient) Fetch(ctx context.Context, client *TestBlock
}

func TestPollerClient(t *testing.T) {
clients := rpc.NewClients[*TestBlockClient]()
clients := rpc.NewClients[*TestBlockClient](1*time.Second, rpc.NewRollingStrategyAlwaysUseFirst[*TestBlockClient]())
var blockItems1 []*TestBlockItem
var blockItems2 []*TestBlockItem

Expand Down
2 changes: 1 addition & 1 deletion blockpoller/poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestForkHandler_run(t *testing.T) {
blockFetcher := newTestBlockFetcher[any](t, tt.blocks)
blockFinalizer := newTestBlockFinalizer(t, tt.expectFireBlock)

clients := rpc.NewClients[any](1 * time.Second)
clients := rpc.NewClients[any](1*time.Second, rpc.NewRollingStrategyAlwaysUseFirst[any]())
clients.Add(new(any))

poller := New(blockFetcher, blockFinalizer, clients)
Expand Down
3 changes: 2 additions & 1 deletion blockpoller/state_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"os"
"path/filepath"
"testing"
"time"

"github.com/streamingfast/bstream"
"github.com/streamingfast/bstream/forkable"
Expand Down Expand Up @@ -75,7 +76,7 @@ func TestFireBlockFinalizer_noSstate(t *testing.T) {
defer os.Remove(dirName)

blockFetcher := newTestBlockFetcher[any](t, []*TestBlock{tb("60a", "59a", 60)})
clients := rpc.NewClients[any]()
clients := rpc.NewClients[any](1*time.Second, rpc.NewRollingStrategyAlwaysUseFirst[any]())
clients.Add(new(any))

poller := &BlockPoller[any]{
Expand Down
51 changes: 51 additions & 0 deletions rpc/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package rpc

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
)

type rollClient struct {
callCount int
name string
}

func TestRollingStrategy(t *testing.T) {

rollingStrategy := NewRollingStrategyRoundRobin[*rollClient]()
rollingStrategy.reset()

clients := NewClients(2*time.Second, rollingStrategy)
clients.Add(&rollClient{name: "c.1"})
clients.Add(&rollClient{name: "c.2"})
clients.Add(&rollClient{name: "c.3"})
clients.Add(&rollClient{name: "c.a"})
clients.Add(&rollClient{name: "c.b"})

var clientNames []string
_, err := WithClients(clients, func(ctx context.Context, client *rollClient) (v any, err error) {
clientNames = append(clientNames, client.name)
if client.name == "c.3" {
return nil, nil
}

return nil, fmt.Errorf("next please")
})

require.NoError(t, err)
//require.ErrorIs(t, err, ErrorNoMoreClient)
require.Equal(t, []string{"c.1", "c.2", "c.3"}, clientNames)

_, err = WithClients(clients, func(ctx context.Context, client *rollClient) (v any, err error) {
clientNames = append(clientNames, client.name)
return nil, fmt.Errorf("next please")
})

require.ErrorIs(t, err, ErrorNoMoreClient)
require.Equal(t, []string{"c.1", "c.2", "c.3", "c.3", "c.a", "c.b", "c.1", "c.2"}, clientNames)

}
124 changes: 106 additions & 18 deletions rpc/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,47 +12,135 @@ var ErrorNoMoreClient = errors.New("no more clients")

type Clients[C any] struct {
clients []C
next int
maxBlockFetchDuration time.Duration
rollingStrategy RollingStrategy[C]
}

func NewClients[C any](maxBlockFetchDuration time.Duration) *Clients[C] {
func NewClients[C any](maxBlockFetchDuration time.Duration, rollingStrategy RollingStrategy[C]) *Clients[C] {
return &Clients[C]{
next: 0,
maxBlockFetchDuration: maxBlockFetchDuration,
rollingStrategy: rollingStrategy,
}
}

func (c *Clients[C]) Add(client C) {
c.clients = append(c.clients, client)
}

func (c *Clients[C]) Next() (client C, err error) {
if len(c.clients) <= c.next {
return client, ErrorNoMoreClient
}
client = c.clients[c.next]
c.next++
return client, nil
}

func WithClients[C any, V any](clients *Clients[C], f func(context.Context, C) (v V, err error)) (v V, err error) {
clients.next = 0
var errs error

clients.rollingStrategy.reset()
client, err := clients.rollingStrategy.next(clients)
if err != nil {
errs = multierror.Append(errs, err)
return v, errs
}

for {
client, err := clients.Next()
if err != nil {
errs = multierror.Append(errs, err)
return v, errs
}

ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, clients.maxBlockFetchDuration)

v, err := f(ctx, client)
cancel()

if err != nil {
errs = multierror.Append(errs, err)
client, err = clients.rollingStrategy.next(clients)
if err != nil {
errs = multierror.Append(errs, err)
return v, errs
}

continue
}
return v, nil
}
}

type RollingStrategy[C any] interface {
reset()
next(clients *Clients[C]) (C, error)
}

type RollingStrategyRoundRobin[C any] struct {
fistCallToNewClient bool
usedClientCount int
nextClientIndex int
}

func NewRollingStrategyRoundRobin[C any]() RollingStrategy[C] {
return &RollingStrategyRoundRobin[C]{
fistCallToNewClient: true,
}
}

func (s *RollingStrategyRoundRobin[C]) reset() {
s.usedClientCount = 0
}
func (s *RollingStrategyRoundRobin[C]) next(clients *Clients[C]) (client C, err error) {
if len(clients.clients) == s.usedClientCount {
return client, ErrorNoMoreClient
}

if s.fistCallToNewClient {
s.fistCallToNewClient = false
client = clients.clients[0]
s.usedClientCount = s.usedClientCount + 1
s.nextClientIndex = s.nextClientIndex + 1
return client, nil
}

if s.nextClientIndex == len(clients.clients) { //roll to 1st client
s.nextClientIndex = 0
}

if s.usedClientCount == 0 { //just been reset
s.nextClientIndex = s.prevIndex(clients)
client = clients.clients[s.nextClientIndex]
s.usedClientCount = s.usedClientCount + 1
s.nextClientIndex = s.nextClientIndex + 1
return client, nil
}

if s.nextClientIndex == len(clients.clients) { //roll to 1st client
client = clients.clients[0]
s.usedClientCount = s.usedClientCount + 1
return client, nil
}

client = clients.clients[s.nextClientIndex]
s.usedClientCount = s.usedClientCount + 1
s.nextClientIndex = s.nextClientIndex + 1
return client, nil
}

func (s *RollingStrategyRoundRobin[C]) prevIndex(clients *Clients[C]) int {
if s.nextClientIndex == 0 {
return len(clients.clients) - 1
}
return s.nextClientIndex - 1
}

type RollingStrategyAlwaysUseFirst[C any] struct {
nextIndex int
}

func NewRollingStrategyAlwaysUseFirst[C any]() *RollingStrategyAlwaysUseFirst[C] {
return &RollingStrategyAlwaysUseFirst[C]{}
}

func (s *RollingStrategyAlwaysUseFirst[C]) reset() {
s.nextIndex = 0
}

func (s *RollingStrategyAlwaysUseFirst[C]) next(c *Clients[C]) (client C, err error) {
if len(c.clients) <= s.nextIndex {
return client, ErrorNoMoreClient
}
client = c.clients[s.nextIndex]
s.nextIndex++
return client, nil

}

0 comments on commit cf7a94a

Please sign in to comment.