Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Suggest filter range for oversized getLogs request #239

Merged
merged 4 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 45 additions & 15 deletions rpc/handler/cfx_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/Conflux-Chain/confura/store"
"github.com/Conflux-Chain/confura/store/mysql"
citypes "github.com/Conflux-Chain/confura/types"
"github.com/Conflux-Chain/confura/util/metrics"
sdk "github.com/Conflux-Chain/go-conflux-sdk"
"github.com/Conflux-Chain/go-conflux-sdk/types"
Expand Down Expand Up @@ -106,9 +107,11 @@ func (handler *CfxLogsApiHandler) getLogsReorgGuard(
metrics.Registry.RPC.Percentage(delegatedRpcMethod, "filter/split/partial").Mark(len(dbFilters) > 0 && fnFilter != nil)

if blkRange, valid := calculateCfxBlockRange(fnFilter); valid {
metrics.Registry.RPC.LogFilterSplit(delegatedRpcMethod, "fullnode/blockRange").Update(blkRange)
numBlocks := int64(blkRange.To - blkRange.From + 1)
metrics.Registry.RPC.LogFilterSplit(delegatedRpcMethod, "fullnode/blockRange").Update(numBlocks)
} else if epochRange, valid := calculateEpochRange(fnFilter); valid {
metrics.Registry.RPC.LogFilterSplit(delegatedRpcMethod, "fullnode/epochRange").Update(epochRange)
numEpochs := int64(epochRange.To - epochRange.From + 1)
metrics.Registry.RPC.LogFilterSplit(delegatedRpcMethod, "fullnode/epochRange").Update(numEpochs)
}
}

Expand Down Expand Up @@ -143,6 +146,23 @@ func (handler *CfxLogsApiHandler) getLogsReorgGuard(
continue
}

// convert block number back to epoch number for log filter with epoch range
if filter.FromEpoch != nil {
var valErr *store.DataSetTooLargeError
if errors.As(err, &valErr) && valErr.SuggestedRange != nil {
fromEpoch, _ := filter.FromEpoch.ToInt()
suggstedEpoch, ok, err := handler.ms.ClosestEpochUpToBlock(valErr.SuggestedRange.To)

if err == nil && ok && suggstedEpoch >= fromEpoch.Uint64() {
valErr.SuggestedRange.From = fromEpoch.Uint64()
valErr.SuggestedRange.To = suggstedEpoch
} else {
valErr.SuggestedRange = nil
}
return nil, false, valErr
}
}

if !errors.Is(err, store.ErrAlreadyPruned) {
return nil, false, err
}
Expand Down Expand Up @@ -205,7 +225,7 @@ func (handler *CfxLogsApiHandler) getLogsReorgGuard(

// ensure result set never oversized
if len(logs) > int(store.MaxLogLimit) {
return nil, false, store.ErrGetLogsResultSetTooLarge
return nil, false, store.NewResultSetTooLargeError()
}

return logs, len(dbFilters) > 0, nil
Expand Down Expand Up @@ -412,15 +432,19 @@ func (handler *CfxLogsApiHandler) splitLogFilterByEpochRange(
func (handler *CfxLogsApiHandler) checkFullnodeLogFilter(filter *types.LogFilter) error {
// Epoch range bound checking
if epochRange, valid := calculateEpochRange(filter); valid {
if epochRange > int64(store.MaxLogEpochRange) {
return store.ErrGetLogsQuerySetTooLarge
numEpochs := epochRange.To - epochRange.From + 1
if numEpochs > uint64(store.MaxLogEpochRange) {
epochRange.To = epochRange.From + uint64(store.MaxLogEpochRange) - 1
return store.NewQuerySetTooLargeError(&epochRange)
}
}

// Block range bound checking
if blockRange, valid := calculateCfxBlockRange(filter); valid {
if blockRange > int64(store.MaxLogBlockRange) {
return store.ErrGetLogsQuerySetTooLarge
numBlocks := blockRange.To - blockRange.From + 1
if numBlocks > uint64(store.MaxLogBlockRange) {
blockRange.To = blockRange.From + uint64(store.MaxLogBlockRange) - 1
return store.NewQuerySetTooLargeError(&blockRange)
}
}

Expand All @@ -439,29 +463,35 @@ func checkTimeout(ctx context.Context) error {
}

// calculateCfxBlockRange calculates the block range from the log filter.
func calculateCfxBlockRange(filter *types.LogFilter) (int64, bool) {
func calculateCfxBlockRange(filter *types.LogFilter) (blockRange citypes.RangeUint64, ok bool) {
if filter == nil || filter.FromBlock == nil || filter.ToBlock == nil {
return 0, false
return blockRange, false
}

bf := filter.FromBlock.ToInt()
bt := filter.ToBlock.ToInt()
blockFrom, blockTo := bf.Int64(), bt.Int64()
if bf.Uint64() > bt.Uint64() {
return blockRange, false
}

return blockTo - blockFrom + 1, true
blockRange.From, blockRange.To = bf.Uint64(), bt.Uint64()
return blockRange, true
}

// calculateEpochRange calculates the epoch range from the log filter.
func calculateEpochRange(filter *types.LogFilter) (int64, bool) {
func calculateEpochRange(filter *types.LogFilter) (epochRange citypes.RangeUint64, ok bool) {
if filter == nil || filter.FromEpoch == nil || filter.ToEpoch == nil {
return 0, false
return epochRange, false
}

ef, _ := filter.FromEpoch.ToInt()
et, _ := filter.ToEpoch.ToInt()
epochFrom, epochTo := ef.Int64(), et.Int64()
if ef.Uint64() > et.Uint64() {
return epochRange, false
}

return epochTo - epochFrom + 1, true
epochRange.From, epochRange.To = ef.Uint64(), et.Uint64()
return epochRange, true
}

// responseBodySizeAccumulator is a helper to check if the result body size exceeds the limit.
Expand Down
29 changes: 19 additions & 10 deletions rpc/handler/eth_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/Conflux-Chain/confura/rpc/ethbridge"
"github.com/Conflux-Chain/confura/store"
"github.com/Conflux-Chain/confura/store/mysql"
citypes "github.com/Conflux-Chain/confura/types"
"github.com/Conflux-Chain/confura/util/metrics"
"github.com/openweb3/web3go/client"
"github.com/openweb3/web3go/types"
Expand Down Expand Up @@ -79,8 +80,9 @@ func (handler *EthLogsApiHandler) getLogsReorgGuard(
metrics.Registry.RPC.Percentage(delegatedRpcMethod, "filter/split/allfullnode").Mark(dbFilter == nil)
metrics.Registry.RPC.Percentage(delegatedRpcMethod, "filter/split/partial").Mark(dbFilter != nil && fnFilter != nil)

if blkRange, valid := calculateEthBlockRange(fnFilter); valid {
metrics.Registry.RPC.LogFilterSplit(delegatedRpcMethod, "fullnode/blockRange").Update(blkRange)
if blockRange, valid := calculateEthBlockRange(fnFilter); valid {
numBlocks := blockRange.To - blockRange.From + 1
metrics.Registry.RPC.LogFilterSplit(delegatedRpcMethod, "fullnode/blockRange").Update(int64(numBlocks))
}
}

Expand Down Expand Up @@ -136,7 +138,7 @@ func (handler *EthLogsApiHandler) getLogsReorgGuard(
}

if len(logs) > int(store.MaxLogLimit) {
return nil, false, store.ErrGetLogsResultSetTooLarge
return nil, false, store.NewResultSetTooLargeError()
}

return logs, dbFilter != nil, nil
Expand Down Expand Up @@ -255,20 +257,27 @@ func (handler *EthLogsApiHandler) GetNetworkId(eth *client.RpcEthClient) (uint32
//
// Note this function assumes the log filter is valid and normalized.
func (handler *EthLogsApiHandler) checkFnEthLogFilter(filter *types.FilterQuery) error {
blkRange, valid := calculateEthBlockRange(filter)
if valid && blkRange > int64(store.MaxLogEpochRange) {
return store.ErrGetLogsQuerySetTooLarge
if blockRange, valid := calculateEthBlockRange(filter); valid {
numBlocks := blockRange.To - blockRange.From + 1
if numBlocks > uint64(store.MaxLogBlockRange) {
blockRange.To = blockRange.From + uint64(store.MaxLogBlockRange) - 1
return store.NewQuerySetTooLargeError(&blockRange)
}
}

return nil
}

// calculateEthBlockRange calculates the block range of the log filter and returns the gap and a boolean indicating success.
func calculateEthBlockRange(filter *types.FilterQuery) (int64, bool) {
func calculateEthBlockRange(filter *types.FilterQuery) (blockRange citypes.RangeUint64, ok bool) {
if filter == nil || filter.FromBlock == nil || filter.ToBlock == nil {
return 0, false
return blockRange, false
}

if *filter.FromBlock > *filter.ToBlock {
return blockRange, false
}

blkRange := *filter.ToBlock - *filter.FromBlock + 1
return int64(blkRange), true
blockRange.From, blockRange.To = uint64(*filter.FromBlock), uint64(*filter.ToBlock)
return blockRange, true
}
46 changes: 39 additions & 7 deletions store/log_filter.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package store

import (
"fmt"
"time"

citypes "github.com/Conflux-Chain/confura/types"
"github.com/Conflux-Chain/go-conflux-sdk/types"
"github.com/Conflux-Chain/go-conflux-sdk/types/cfxaddress"
"github.com/Conflux-Chain/go-conflux-util/viper"
Expand All @@ -20,17 +22,14 @@ const (
)

var ( // common errors
ErrGetLogsQuerySetTooLarge = errors.New(
"query set is too large, please narrow down your filter condition",
)
errMsgLogsQuerySetTooLarge = "the query set is too large, please narrow down your filter condition"

ErrGetLogsResultSetTooLarge = errors.Errorf(
"result set to be queried is too large with more than %v logs, %v",
MaxLogLimit, "please narrow down your filter condition",
errMsgLogsResultSetTooLarge = fmt.Sprintf(
"the result set exceeds the max limit of %v logs, please narrow down your filter conditions", MaxLogLimit,
)

ErrGetLogsTimeout = errors.Errorf(
"query timeout with duration exceeds %v(s)", TimeoutGetLogs,
"the query timed out after exceeding the maximum duration of %v seconds", TimeoutGetLogs,
)
)

Expand All @@ -43,6 +42,39 @@ var ( // Log filter constants
MaxLogBlockRange uint64
)

type DataSetTooLargeError struct {
Msg string
SuggestedRange *citypes.RangeUint64
}

var _ error = (*DataSetTooLargeError)(nil)

func NewQuerySetTooLargeError(suggestions ...*citypes.RangeUint64) *DataSetTooLargeError {
return NewDataSetTooLargeError(errMsgLogsQuerySetTooLarge, suggestions...)
}

func NewResultSetTooLargeError(suggestions ...*citypes.RangeUint64) *DataSetTooLargeError {
return NewDataSetTooLargeError(errMsgLogsResultSetTooLarge, suggestions...)
}

func NewDataSetTooLargeError(msg string, suggestions ...*citypes.RangeUint64) *DataSetTooLargeError {
var suggestion *citypes.RangeUint64
if len(suggestions) > 0 && suggestions[0] != nil {
suggestion = suggestions[0]
}
return &DataSetTooLargeError{
Msg: msg,
SuggestedRange: suggestion,
}
}

func (e *DataSetTooLargeError) Error() string {
if e.SuggestedRange == nil {
return e.Msg
}
return fmt.Sprintf("%v: suggested filter range is %s", e.Msg, *e.SuggestedRange)
}

func initLogFilter() {
var lfc struct {
MaxBlockHashCount int `default:"32"`
Expand Down
4 changes: 2 additions & 2 deletions store/mysql/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (ms *MysqlStore) GetLogs(ctx context.Context, storeFilter store.LogFilter)

// check log count
if len(result) > int(store.MaxLogLimit) {
return nil, store.ErrGetLogsResultSetTooLarge
return nil, store.NewResultSetTooLargeError()
}

continue
Expand Down Expand Up @@ -342,7 +342,7 @@ func (ms *MysqlStore) GetLogs(ctx context.Context, storeFilter store.LogFilter)

// check log count
if len(result) > int(store.MaxLogLimit) {
return nil, store.ErrGetLogsResultSetTooLarge
return nil, store.NewResultSetTooLargeError()
}
}

Expand Down
2 changes: 1 addition & 1 deletion store/mysql/store_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (ls *logStore) GetLogs(ctx context.Context, storeFilter store.LogFilter) ([

// check log count
if len(result) > int(store.MaxLogLimit) {
return nil, store.ErrGetLogsResultSetTooLarge
return nil, store.NewResultSetTooLargeError()
}
}

Expand Down
2 changes: 1 addition & 1 deletion store/mysql/store_log_big_contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func (bcls *bigContractLogStore) GetContractLogs(

// check log count
if len(result) > int(store.MaxLogLimit) {
return nil, store.ErrGetLogsResultSetTooLarge
return nil, store.NewResultSetTooLargeError()
}
}

Expand Down
Loading
Loading