Skip to content

Commit

Permalink
feat: Allow setting previously const params through options
Browse files Browse the repository at this point in the history
  • Loading branch information
Hyodar committed May 17, 2024
1 parent ac2da2f commit 462c9ca
Showing 1 changed file with 35 additions and 7 deletions.
42 changes: 35 additions & 7 deletions core/safeclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,28 @@ type SafeEthClient struct {
logger logging.Logger
rpcUrl string
isReinitializing bool
reinitInterval time.Duration
reinitSubscribers []chan bool
reinitC chan struct{}
closeC chan struct{}
closed bool
createClient func(string, logging.Logger) (eth.Client, error)
headerTimeout time.Duration
reinitInterval time.Duration
resubInterval time.Duration
blockChunkSize uint64
blockMaxRange uint64

createClient func(string, logging.Logger) (eth.Client, error)
}

func NewSafeEthClient(rpcUrl string, logger logging.Logger, opts ...SafeEthClientOption) (*SafeEthClient, error) {
safeClient := &SafeEthClient{
logger: logger,
rpcUrl: rpcUrl,
reinitInterval: REINIT_INTERVAL,
resubInterval: RESUB_INTERVAL,
headerTimeout: HEADER_TIMEOUT,
blockChunkSize: BLOCK_CHUNK_SIZE,
blockMaxRange: BLOCK_MAX_RANGE,
reinitC: make(chan struct{}),
closeC: make(chan struct{}),
createClient: func(rpcUrl string, logger logging.Logger) (eth.Client, error) {
Expand Down Expand Up @@ -90,6 +99,25 @@ func WithReinitInterval(interval time.Duration) SafeEthClientOption {
}
}

func WithResubInterval(interval time.Duration) SafeEthClientOption {
return func(c *SafeEthClient) {
c.resubInterval = interval
}
}

func WithHeaderTimeout(timeout time.Duration) SafeEthClientOption {
return func(c *SafeEthClient) {
c.headerTimeout = timeout
}
}

func WithLogFilteringParams(chunkSize, maxRange uint64) SafeEthClientOption {
return func(c *SafeEthClient) {
c.blockChunkSize = chunkSize
c.blockMaxRange = maxRange
}
}

func WithInstrumentedCreateClient(collector *rpccalls.Collector) SafeEthClientOption {
return func(c *SafeEthClient) {
c.createClient = func(rpcUrl string, logger logging.Logger) (eth.Client, error) {
Expand Down Expand Up @@ -284,10 +312,10 @@ func (c *SafeEthClient) SubscribeFilterLogs(ctx context.Context, q ethereum.Filt
}

missedLogs := make([]types.Log, 0)
fromBlock := max(lastBlock, currentBlock-BLOCK_MAX_RANGE) + 1
fromBlock := max(lastBlock, currentBlock-c.blockMaxRange) + 1

for ; fromBlock < currentBlock; fromBlock += (BLOCK_CHUNK_SIZE + 1) {
toBlock := min(fromBlock+BLOCK_CHUNK_SIZE, currentBlock)
for ; fromBlock < currentBlock; fromBlock += (c.blockChunkSize + 1) {
toBlock := min(fromBlock+c.blockChunkSize, currentBlock)

logs, err := c.Client.FilterLogs(ctx, ethereum.FilterQuery{
FromBlock: big.NewInt(int64(fromBlock)),
Expand Down Expand Up @@ -342,7 +370,7 @@ func (c *SafeEthClient) SubscribeFilterLogs(ctx context.Context, q ethereum.Filt
go func() {
defer c.wg.Done()

ticker := time.NewTicker(RESUB_INTERVAL)
ticker := time.NewTicker(c.resubInterval)
defer ticker.Stop()

reinitC := c.WatchReinit()
Expand Down Expand Up @@ -474,7 +502,7 @@ func (c *SafeEthClient) SubscribeNewHead(ctx context.Context, ch chan<- *types.H
go func() {
defer c.wg.Done()

headerTicker := time.NewTicker(HEADER_TIMEOUT)
headerTicker := time.NewTicker(c.headerTimeout)
defer headerTicker.Stop()

reinitC := c.WatchReinit()
Expand Down

0 comments on commit 462c9ca

Please sign in to comment.