diff --git a/core/safeclient/client.go b/core/safeclient/client.go index 4c7b37ec..c7e56075 100644 --- a/core/safeclient/client.go +++ b/core/safeclient/client.go @@ -39,12 +39,17 @@ type SafeEthClient struct { logger logging.Logger rpcUrl string isReinitializing bool - reinitInterval time.Duration reinitSubscribers []chan bool reinitC chan struct{} closeC chan struct{} closed bool - createClient func(string, logging.Logger) (eth.Client, error) + headerTimeout time.Duration + reinitInterval time.Duration + resubInterval time.Duration + blockChunkSize uint64 + blockMaxRange uint64 + + createClient func(string, logging.Logger) (eth.Client, error) } func NewSafeEthClient(rpcUrl string, logger logging.Logger, opts ...SafeEthClientOption) (*SafeEthClient, error) { @@ -52,6 +57,10 @@ func NewSafeEthClient(rpcUrl string, logger logging.Logger, opts ...SafeEthClien logger: logger, rpcUrl: rpcUrl, reinitInterval: REINIT_INTERVAL, + resubInterval: RESUB_INTERVAL, + headerTimeout: HEADER_TIMEOUT, + blockChunkSize: BLOCK_CHUNK_SIZE, + blockMaxRange: BLOCK_MAX_RANGE, reinitC: make(chan struct{}), closeC: make(chan struct{}), createClient: func(rpcUrl string, logger logging.Logger) (eth.Client, error) { @@ -90,6 +99,25 @@ func WithReinitInterval(interval time.Duration) SafeEthClientOption { } } +func WithResubInterval(interval time.Duration) SafeEthClientOption { + return func(c *SafeEthClient) { + c.resubInterval = interval + } +} + +func WithHeaderTimeout(timeout time.Duration) SafeEthClientOption { + return func(c *SafeEthClient) { + c.headerTimeout = timeout + } +} + +func WithLogFilteringParams(chunkSize, maxRange uint64) SafeEthClientOption { + return func(c *SafeEthClient) { + c.blockChunkSize = chunkSize + c.blockMaxRange = maxRange + } +} + func WithInstrumentedCreateClient(collector *rpccalls.Collector) SafeEthClientOption { return func(c *SafeEthClient) { c.createClient = func(rpcUrl string, logger logging.Logger) (eth.Client, error) { @@ -284,10 +312,10 @@ func (c *SafeEthClient) SubscribeFilterLogs(ctx context.Context, q ethereum.Filt } missedLogs := make([]types.Log, 0) - fromBlock := max(lastBlock, currentBlock-BLOCK_MAX_RANGE) + 1 + fromBlock := max(lastBlock, currentBlock-c.blockMaxRange) + 1 - for ; fromBlock < currentBlock; fromBlock += (BLOCK_CHUNK_SIZE + 1) { - toBlock := min(fromBlock+BLOCK_CHUNK_SIZE, currentBlock) + for ; fromBlock < currentBlock; fromBlock += (c.blockChunkSize + 1) { + toBlock := min(fromBlock+c.blockChunkSize, currentBlock) logs, err := c.Client.FilterLogs(ctx, ethereum.FilterQuery{ FromBlock: big.NewInt(int64(fromBlock)), @@ -342,7 +370,7 @@ func (c *SafeEthClient) SubscribeFilterLogs(ctx context.Context, q ethereum.Filt go func() { defer c.wg.Done() - ticker := time.NewTicker(RESUB_INTERVAL) + ticker := time.NewTicker(c.resubInterval) defer ticker.Stop() reinitC := c.WatchReinit() @@ -474,7 +502,7 @@ func (c *SafeEthClient) SubscribeNewHead(ctx context.Context, ch chan<- *types.H go func() { defer c.wg.Done() - headerTicker := time.NewTicker(HEADER_TIMEOUT) + headerTicker := time.NewTicker(c.headerTimeout) defer headerTicker.Stop() reinitC := c.WatchReinit()