diff --git a/app/app.go b/app/app.go index e9615656..2ef767c8 100644 --- a/app/app.go +++ b/app/app.go @@ -73,9 +73,10 @@ func bootstrap(configFile, subscriptionPlansFile string) (*d.BlockChainNodeConne func Run(configFile, subscriptionPlansFile string) { _connection, _redisClient, _db, _status := bootstrap(configFile, subscriptionPlansFile) _redisInfo := d.RedisInfo{ - Client: _redisClient, - BlockRetryQueueName: "blocks_in_retry_queue", - UnfinalizedBlocksQueueName: "unfinalized_blocks", + Client: _redisClient, + BlockRetryQueue: "blocks_in_retry_queue", + BlockRetryCountTable: "attempt_count_tracker_table", + UnfinalizedBlocksQueue: "unfinalized_blocks", } // Attempting to listen to Ctrl+C signal diff --git a/app/block/block.go b/app/block/block.go index 51782c7e..38a2bb18 100644 --- a/app/block/block.go +++ b/app/block/block.go @@ -25,7 +25,7 @@ func HasBlockFinalized(status *d.StatusHolder, number uint64) bool { } // ProcessBlockContent - Processes everything inside this block i.e. block data, tx data, event data -func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm.DB, redis *d.RedisInfo, publishable bool, status *d.StatusHolder, startingAt time.Time) { +func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm.DB, redis *d.RedisInfo, publishable bool, status *d.StatusHolder, startingAt time.Time) bool { // Closure managing publishing whole block data i.e. block header, txn(s), event logs // on redis pubsub channel @@ -60,7 +60,7 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm log.Print(color.Green.Sprintf("[+] Block %d with 0 tx(s) [ Took : %s ]", block.NumberU64(), time.Now().UTC().Sub(startingAt))) status.IncrementBlocksProcessed() - return + return true } @@ -71,7 +71,7 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm // Pushing into unfinalized block queue, to be picked up only when // finality for this block has been achieved PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", packedBlock.Block.Number)) - return + return true } @@ -82,7 +82,7 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm // If failed to persist, we'll put it in retry queue PushBlockIntoRetryQueue(redis, block.Number().String()) - return + return false } @@ -90,7 +90,7 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm log.Print(color.Green.Sprintf("[+] Block %d with 0 tx(s) [ Took : %s ]", block.NumberU64(), time.Now().UTC().Sub(startingAt))) status.IncrementBlocksProcessed() - return + return true } @@ -165,7 +165,7 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm if !(result.Failure == 0) { PushBlockIntoRetryQueue(redis, block.Number().String()) - return + return false } @@ -183,7 +183,7 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm log.Print(color.Green.Sprintf("[+] Block %d with %d tx(s) [ Took : %s ]", block.NumberU64(), block.Transactions().Len(), time.Now().UTC().Sub(startingAt))) status.IncrementBlocksProcessed() - return + return true } @@ -194,7 +194,7 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm // Pushing into unfinalized block queue, to be picked up only when // finality for this block has been achieved PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", packedBlock.Block.Number)) - return + return true } @@ -205,12 +205,14 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm // If failed to persist, we'll put it in retry queue PushBlockIntoRetryQueue(redis, block.Number().String()) - return + return false } // Successfully processed block log.Print(color.Green.Sprintf("[+] Block %d with %d tx(s) [ Took : %s ]", block.NumberU64(), block.Transactions().Len(), time.Now().UTC().Sub(startingAt))) + status.IncrementBlocksProcessed() + return true } diff --git a/app/block/fetch.go b/app/block/fetch.go index 1d27ffbd..3e28c4ba 100644 --- a/app/block/fetch.go +++ b/app/block/fetch.go @@ -53,7 +53,14 @@ func FetchBlockByNumber(client *ethclient.Client, number uint64, _db *gorm.DB, r return } - ProcessBlockContent(client, block, _db, redis, publishable, _status, startingAt) + // If attempt to process block by number went successful + // we can consider removing this block number's entry from + // attempt count tracker table + if ProcessBlockContent(client, block, _db, redis, publishable, _status, startingAt) { + + RemoveBlockFromAttemptCountTrackerTable(redis, fmt.Sprintf("%d", number)) + + } } diff --git a/app/block/listener.go b/app/block/listener.go index e287227e..97e865cc 100644 --- a/app/block/listener.go +++ b/app/block/listener.go @@ -2,6 +2,7 @@ package block import ( "context" + "fmt" "log" "runtime" @@ -87,7 +88,7 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB, // // Though it'll be picked up sometime in future ( by missing block finder ), but it can be safely handled now // so that it gets processed immediately - func(blockHash common.Hash, blockNumber string) { + func(blockHash common.Hash, blockNumber uint64) { // When only processing blocks in real-time mode // no need to check what's present in unfinalized block number queue @@ -132,9 +133,15 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB, }(oldest) } else { - // If oldest block is not finalized, no meaning - // staying here, we'll revisit it some time in future - break + + // If left most block is not yet finalized, it'll attempt to + // reorganize that queue so that other blocks waiting to be processed + // can get that opportunity + // + // This situation generally occurs due to concurrent pattern implemented + // in block processor + MoveUnfinalizedOldestBlockToEnd(redis) + } } @@ -145,14 +152,14 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB, FetchBlockByHash(connection.RPC, blockHash, - blockNumber, + fmt.Sprintf("%d", blockNumber), _db, redis, status) }) - }(header.Hash(), header.Number.String()) + }(header.Hash(), header.Number.Uint64()) } } diff --git a/app/block/retry.go b/app/block/retry.go index 41c82dae..c6f534c8 100644 --- a/app/block/retry.go +++ b/app/block/retry.go @@ -37,11 +37,19 @@ func RetryQueueManager(client *ethclient.Client, _db *gorm.DB, redis *data.Redis sleep() // Popping oldest element from Redis queue - blockNumber, err := redis.Client.LPop(context.Background(), redis.BlockRetryQueueName).Result() + blockNumber, err := redis.Client.LPop(context.Background(), redis.BlockRetryQueue).Result() if err != nil { continue } + attemptCount := GetAttemptCountFromTable(redis, blockNumber) + if attemptCount != 0 && attemptCount%3 != 0 { + + PushBlockIntoRetryQueue(redis, blockNumber) + continue + + } + // Parsing string blockNumber to uint64 parsedBlockNumber, err := strconv.ParseUint(blockNumber, 10, 64) if err != nil { @@ -82,11 +90,75 @@ func PushBlockIntoRetryQueue(redis *data.RedisInfo, blockNumber string) { // Checking presence first & then deciding whether to add it or not if !CheckBlockInRetryQueue(redis, blockNumber) { - if _, err := redis.Client.RPush(context.Background(), redis.BlockRetryQueueName, blockNumber).Result(); err != nil { + if _, err := redis.Client.RPush(context.Background(), redis.BlockRetryQueue, blockNumber).Result(); err != nil { log.Print(color.Red.Sprintf("[!] Failed to push block %s into retry queue : %s", blockNumber, err.Error())) } + IncrementAttemptCountOfBlockNumber(redis, blockNumber) + + } +} + +// IncrementAttemptCountOfBlockNumber - Given block number, increments failed attempt count +// of processing this block +// +// If block doesn't yet exist in tracker table, it'll be inserted first time & counter to be set to 1 +// +// It'll be wrapped back to 0 as soon as it reaches 101 +func IncrementAttemptCountOfBlockNumber(redis *data.RedisInfo, blockNumber string) { + + wrappedAttemptCount := (GetAttemptCountFromTable(redis, blockNumber) + 1) % 101 + + if _, err := redis.Client.HSet(context.Background(), redis.BlockRetryCountTable, blockNumber, wrappedAttemptCount).Result(); err != nil { + log.Print(color.Red.Sprintf("[!] Failed to increment attempt count of block %s : %s", blockNumber, err.Error())) } + +} + +// CheckBlockInAttemptCounterTable - Checks whether given block number already exist in +// attempt count tracker table +func CheckBlockInAttemptCounterTable(redis *data.RedisInfo, blockNumber string) bool { + + if _, err := redis.Client.HGet(context.Background(), redis.BlockRetryCountTable, blockNumber).Result(); err != nil { + return false + } + + return true + +} + +// GetAttemptCountFromTable - Returns current attempt counter from table +// for given block number +func GetAttemptCountFromTable(redis *data.RedisInfo, blockNumber string) uint64 { + + count, err := redis.Client.HGet(context.Background(), redis.BlockRetryCountTable, blockNumber).Result() + if err != nil { + return 0 + } + + parsedCount, err := strconv.ParseUint(count, 10, 64) + if err != nil { + return 0 + } + + return parsedCount + +} + +// RemoveBlockFromAttemptCountTrackerTable - Attempt to delete block number's +// associated attempt count, given it already exists in table +// +// This is supposed to be invoked when a block is considered to be successfully processed +func RemoveBlockFromAttemptCountTrackerTable(redis *data.RedisInfo, blockNumber string) { + + if CheckBlockInAttemptCounterTable(redis, blockNumber) { + + if _, err := redis.Client.HDel(context.Background(), redis.BlockRetryCountTable, blockNumber).Result(); err != nil { + log.Print(color.Red.Sprintf("[!] Failed to delete attempt count of successful block %s : %s", blockNumber, err.Error())) + } + + } + } // CheckBlockInRetryQueue - Checks whether block number is already added in @@ -97,17 +169,19 @@ func PushBlockIntoRetryQueue(redis *data.RedisInfo, blockNumber string) { // Note: this feature of checking index of value in redis queue, // was added in Redis v6.0.6 : https://redis.io/commands/lpos func CheckBlockInRetryQueue(redis *data.RedisInfo, blockNumber string) bool { - if _, err := redis.Client.LPos(context.Background(), redis.BlockRetryQueueName, blockNumber, _redis.LPosArgs{}).Result(); err != nil { + + if _, err := redis.Client.LPos(context.Background(), redis.BlockRetryQueue, blockNumber, _redis.LPosArgs{}).Result(); err != nil { return false } return true + } // GetRetryQueueLength - Returns redis backed retry queue length func GetRetryQueueLength(redis *data.RedisInfo) int64 { - blockCount, err := redis.Client.LLen(context.Background(), redis.BlockRetryQueueName).Result() + blockCount, err := redis.Client.LLen(context.Background(), redis.BlockRetryQueue).Result() if err != nil { log.Printf(color.Red.Sprintf("[!] Failed to determine retry queue length : %s", err.Error())) } diff --git a/app/block/syncer.go b/app/block/syncer.go index 9b3c1691..fddebfd5 100644 --- a/app/block/syncer.go +++ b/app/block/syncer.go @@ -4,6 +4,7 @@ import ( "fmt" "log" "runtime" + "sort" "time" "github.com/ethereum/go-ethereum/ethclient" @@ -16,8 +17,32 @@ import ( "gorm.io/gorm" ) +// FindMissingBlocksInRange - Given ascending ordered block numbers read from DB +// attempts to find out which numbers are missing in [from, to] range +// where both ends are inclusive +func FindMissingBlocksInRange(found []uint64, from uint64, to uint64) []uint64 { + + // creating slice with backing array of larger size + // to avoid potential memory allocation during iteration + // over loop + absent := make([]uint64, 0, to-from+1) + + for b := from; b <= to; b++ { + + idx := sort.Search(len(found), func(j int) bool { return found[j] >= b }) + + if !(idx < len(found) && found[idx] == b) { + absent = append(absent, b) + } + + } + + return absent + +} + // Syncer - Given ascending block number range i.e. fromBlock <= toBlock -// fetches blocks in order {fromBlock, toBlock, fromBlock + 1, toBlock - 1, fromBlock + 2, toBlock - 2 ...} +// attempts to fetch missing blocks in that range // while running n workers concurrently, where n = number of cores this machine has // // Waits for all of them to complete @@ -28,8 +53,6 @@ func Syncer(client *ethclient.Client, _db *gorm.DB, redis *data.RedisInfo, fromB } wp := workerpool.New(runtime.NumCPU() * int(cfg.GetConcurrencyFactor())) - i := fromBlock - j := toBlock // Jobs need to be submitted using this interface, while // just mentioning which block needs to be fetched @@ -43,17 +66,44 @@ func Syncer(client *ethclient.Client, _db *gorm.DB, redis *data.RedisInfo, fromB }) } - for i <= j { - // This condition to be arrived at when range has odd number of elements - if i == j { - job(i) - } else { - job(i) - job(j) + // attempting to fetch X blocks ( max ) at a time, by range + // + // @note This can be improved + var step uint64 = 10000 + + for i := fromBlock; i <= toBlock; i += step { + + toShouldbe := i + step - 1 + if toShouldbe > toBlock { + toShouldbe = toBlock + } + + blocks := db.GetAllBlockNumbersInRange(_db, i, toShouldbe) + + // No blocks present in DB, in queried range + if blocks == nil || len(blocks) == 0 { + + // So submitting all of them to job processor queue + for j := i; j <= toShouldbe; j++ { + + job(j) + + } + continue + + } + + // All blocks in range present in DB ✅ + if toShouldbe-i+1 == uint64(len(blocks)) { + continue + } + + // Some blocks are missing in range, attempting to find them + // and pushing their processing request to job queue + for _, v := range FindMissingBlocksInRange(blocks, i, toShouldbe) { + job(v) } - i++ - j-- } wp.StopWait() diff --git a/app/block/unfinalized_blocks.go b/app/block/unfinalized_blocks.go index 39994c0f..4fdfd5c0 100644 --- a/app/block/unfinalized_blocks.go +++ b/app/block/unfinalized_blocks.go @@ -2,6 +2,7 @@ package block import ( "context" + "fmt" "log" "strconv" @@ -14,7 +15,7 @@ import ( // i.e. element at 0th index func GetOldestBlockFromUnfinalizedQueue(redis *data.RedisInfo) string { - blockNumber, err := redis.Client.LIndex(context.Background(), redis.UnfinalizedBlocksQueueName, 0).Result() + blockNumber, err := redis.Client.LIndex(context.Background(), redis.UnfinalizedBlocksQueue, 0).Result() if err != nil { return "" } @@ -45,7 +46,7 @@ func CheckIfOldestBlockIsConfirmed(redis *data.RedisInfo, status *data.StatusHol // queue, which can be processed now func PopOldestBlockFromUnfinalizedQueue(redis *data.RedisInfo) uint64 { - blockNumber, err := redis.Client.LPop(context.Background(), redis.UnfinalizedBlocksQueueName).Result() + blockNumber, err := redis.Client.LPop(context.Background(), redis.UnfinalizedBlocksQueue).Result() if err != nil { return 0 } @@ -66,13 +67,24 @@ func PushBlockIntoUnfinalizedQueue(redis *data.RedisInfo, blockNumber string) { // Checking presence first & then deciding whether to add it or not if !CheckBlockInUnfinalizedQueue(redis, blockNumber) { - if _, err := redis.Client.RPush(context.Background(), redis.UnfinalizedBlocksQueueName, blockNumber).Result(); err != nil { + if _, err := redis.Client.RPush(context.Background(), redis.UnfinalizedBlocksQueue, blockNumber).Result(); err != nil { log.Print(color.Red.Sprintf("[!] Failed to push block %s into non-final block queue : %s", blockNumber, err.Error())) } } } +// MoveUnfinalizedOldestBlockToEnd - Attempts to pop oldest block ( i.e. left most block ) +// from unfinalized queue & pushes it back to end of queue, so that other blocks waiting after +// this one can get be attempted to be processed by workers +// +// @note This can be improved using `LMOVE` command of Redis ( >= 6.2.0 ) +func MoveUnfinalizedOldestBlockToEnd(redis *data.RedisInfo) { + + PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", PopOldestBlockFromUnfinalizedQueue(redis))) + +} + // CheckBlockInUnfinalizedQueue - Checks whether block number is already added in // Redis backed unfinalized queue or not // @@ -81,7 +93,7 @@ func PushBlockIntoUnfinalizedQueue(redis *data.RedisInfo, blockNumber string) { // Note: this feature of checking index of value in redis queue, // was added in Redis v6.0.6 : https://redis.io/commands/lpos func CheckBlockInUnfinalizedQueue(redis *data.RedisInfo, blockNumber string) bool { - if _, err := redis.Client.LPos(context.Background(), redis.UnfinalizedBlocksQueueName, blockNumber, _redis.LPosArgs{}).Result(); err != nil { + if _, err := redis.Client.LPos(context.Background(), redis.UnfinalizedBlocksQueue, blockNumber, _redis.LPosArgs{}).Result(); err != nil { return false } @@ -91,7 +103,7 @@ func CheckBlockInUnfinalizedQueue(redis *data.RedisInfo, blockNumber string) boo // GetUnfinalizedQueueLength - Returns redis backed unfinalized block number queue length func GetUnfinalizedQueueLength(redis *data.RedisInfo) int64 { - blockCount, err := redis.Client.LLen(context.Background(), redis.UnfinalizedBlocksQueueName).Result() + blockCount, err := redis.Client.LLen(context.Background(), redis.UnfinalizedBlocksQueue).Result() if err != nil { log.Printf(color.Red.Sprintf("[!] Failed to determine non-final block queue length : %s", err.Error())) } diff --git a/app/data/data.go b/app/data/data.go index d8e3de5f..3a6ccc20 100644 --- a/app/data/data.go +++ b/app/data/data.go @@ -135,9 +135,11 @@ func (s *StatusHolder) SetLatestBlockNumber(num uint64) { // RedisInfo - Holds redis related information in this struct, to be used // when passing to functions as argument type RedisInfo struct { - Client *redis.Client // using this object `ette` will talk to Redis - BlockRetryQueueName string // retry queue name, for storing block numbers - UnfinalizedBlocksQueueName string // stores unfinalized block numbers, processes + Client *redis.Client // using this object `ette` will talk to Redis + BlockRetryQueue string // retry queue name, for storing block numbers + BlockRetryCountTable string // keeping track of how many times this block was attempted + // to be processed in past, but went unsuccessful + UnfinalizedBlocksQueue string // stores unfinalized block numbers, processes // them later after reaching finality ( as set by deployer of `ette` ) } diff --git a/app/db/query.go b/app/db/query.go index 076e439e..4de51f7b 100644 --- a/app/db/query.go +++ b/app/db/query.go @@ -14,14 +14,14 @@ func GetAllBlockNumbersInRange(db *gorm.DB, from uint64, to uint64) []uint64 { var blocks []uint64 if from < to { - if err := db.Model(&Blocks{}).Where("number >= ? and number <= ?", from, to).Select("number").Find(&blocks).Error; err != nil { + if err := db.Model(&Blocks{}).Where("number >= ? and number <= ?", from, to).Order("number asc").Select("number").Find(&blocks).Error; err != nil { log.Printf("[!] Failed to fetch block numbers by range : %s\n", err.Error()) return nil } } else { - if err := db.Model(&Blocks{}).Where("number >= ? and number <= ?", to, from).Select("number").Find(&blocks).Error; err != nil { + if err := db.Model(&Blocks{}).Where("number >= ? and number <= ?", to, from).Order("number asc").Select("number").Find(&blocks).Error; err != nil { log.Printf("[!] Failed to fetch block numbers by range : %s\n", err.Error()) return nil diff --git a/app/pubsub/subscription.go b/app/pubsub/subscription.go index cfa8d567..5ecf951b 100644 --- a/app/pubsub/subscription.go +++ b/app/pubsub/subscription.go @@ -1,6 +1,7 @@ package pubsub import ( + "fmt" "log" "regexp" "strings" @@ -96,12 +97,12 @@ func (s *SubscriptionRequest) DoesMatchWithPublishedEventData(event *data.Event) // --- Matching specific topic signature provided by client // application with received event data, published by // redis pub-sub - matchTopicXInEvent := func(topic string, x int) bool { + matchTopicXInEvent := func(topic string, x uint8) bool { // Not all topics will have 4 elements in topics array // // For those cases, if topic signature for that index is {"", "*"} // provided by consumer, then we're safely going to say it's a match - if !(x < len(event.Topics)) { + if !(int(x) < len(event.Topics)) { return topic == "" || topic == "*" } @@ -113,7 +114,7 @@ func (s *SubscriptionRequest) DoesMatchWithPublishedEventData(event *data.Event) status = true // match with specific `topic` signature default: - status = topic == event.Topics[x] + status = CheckSimilarity(topic, event.Topics[x]) } return status @@ -134,7 +135,7 @@ func (s *SubscriptionRequest) DoesMatchWithPublishedEventData(event *data.Event) status = matchTopicXInEvent(filters[1], 0) && matchTopicXInEvent(filters[2], 1) && matchTopicXInEvent(filters[3], 2) && matchTopicXInEvent(filters[4], 3) // match with provided `contract` address default: - if filters[0] == event.Origin { + if CheckSimilarity(filters[0], event.Origin) { status = matchTopicXInEvent(filters[1], 0) && matchTopicXInEvent(filters[2], 1) && matchTopicXInEvent(filters[3], 2) && matchTopicXInEvent(filters[4], 3) } } @@ -156,6 +157,20 @@ func (s *SubscriptionRequest) GetTransactionFilters() []string { return []string{matches[4], matches[6]} } +// CheckSimilarity - Performing case insensitive matching between two +// strings +func CheckSimilarity(first string, second string) bool { + + reg, err := regexp.Compile(fmt.Sprintf("(?i)^(%s)$", first)) + if err != nil { + log.Printf("[!] Failed to parse regex pattern : %s\n", err.Error()) + return false + } + + return reg.MatchString(second) + +} + // DoesMatchWithPublishedTransactionData - All `transaction` topic listeners i.e. subscribers are // going to get notified when new transaction detected, but they will only send those data to client application // ( connected over websocket ), to which client has subscribed to @@ -176,7 +191,7 @@ func (s *SubscriptionRequest) DoesMatchWithPublishedTransactionData(tx *data.Tra status = true // match with specific `to` address default: - status = to == tx.To + status = CheckSimilarity(to, tx.To) } return status @@ -196,7 +211,7 @@ func (s *SubscriptionRequest) DoesMatchWithPublishedTransactionData(tx *data.Tra status = matchToFieldInTx(filters[1]) // match with provided `from` address default: - if filters[0] == tx.From { + if CheckSimilarity(filters[0], tx.From) { status = matchToFieldInTx(filters[1]) } } diff --git a/app/snapshot/write.go b/app/snapshot/write.go index d77b4011..81bc029b 100644 --- a/app/snapshot/write.go +++ b/app/snapshot/write.go @@ -58,6 +58,8 @@ func TakeSnapshot(db *gorm.DB, file string, start uint64, end uint64, count uint go PutIntoSink(fd, count, data, done) // attempting to fetch X blocks ( max ) at a time, by range + // + // @note This can be improved var step uint64 = 10000 // stepping through blocks present in DB