Skip to content
This repository has been archived by the owner on Jan 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #54 from itzmeanjan/develop
Browse files Browse the repository at this point in the history
Improve chain reorganisation handling, pubsub broker management flow
  • Loading branch information
itzmeanjan authored Feb 10, 2021
2 parents 8946574 + 8289ecb commit 83de186
Show file tree
Hide file tree
Showing 33 changed files with 1,290 additions and 694 deletions.
20 changes: 11 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,17 @@ cd ette
- Set `Production` to `yes` before running it in production; otherwise you can simply skip it
- `ette` can be run in any of 👇 5 possible modes, which can be set by `EtteMode`

```json
{
"1": "Only Historical Data Query Allowed",
"2": "Only Real-time Subscription Allowed",
"3": "Both Historical Data Query & Real-time Subscription Allowed",
"4": "Attempt to take snapshot from data in backing DB",
"5": "Attempt to restore data from snapshot file"
}
```
---

EtteMode | Interpretation
--- | ---
1 | Only Historical Data Query Allowed
2 | Only Real-time Subscription Allowed
3 | Both Historical Data Query & Real-time Subscription Allowed
4 | Attempt to take snapshot from data in backing DB
5 | Attempt to restore data from snapshot file

---

- For testing historical data query using browser based GraphQL Playground in `ette`, you can set `EtteGraphQLPlayGround` to `yes` in config file
- For processing block(s)/ tx(s) concurrently, it'll create `ConcurrencyFactor * #-of CPUs on machine` workers, who will pick up jobs submitted to them.
Expand Down
44 changes: 22 additions & 22 deletions app/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,6 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm

}

if !HasBlockFinalized(status, packedBlock.Block.Number) {

log.Print(color.LightRed.Sprintf("[x] Non-final block %d with 0 tx(s) [ Took : %s | Latest Block : %d | In Queue : %d ]", packedBlock.Block.Number, time.Now().UTC().Sub(startingAt), status.GetLatestBlockNumber(), GetUnfinalizedQueueLength(redis)))

// Pushing into unfinalized block queue, to be picked up only when
// finality for this block has been achieved
PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", packedBlock.Block.Number))
return true

}

// If block doesn't contain any tx, we'll attempt to persist only block
if err := db.StoreBlock(_db, packedBlock, status); err != nil {

Expand All @@ -86,6 +75,17 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm

}

if !HasBlockFinalized(status, packedBlock.Block.Number) {

log.Print(color.LightRed.Sprintf("[x] Non-final block %d with 0 tx(s) [ Took : %s | Latest Block : %d | In Queue : %d ]", packedBlock.Block.Number, time.Now().UTC().Sub(startingAt), status.GetLatestBlockNumber(), GetUnfinalizedQueueLength(redis)))

// Pushing into unfinalized block queue, to be picked up only when
// finality for this block has been achieved
PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", packedBlock.Block.Number))
return true

}

// Successfully processed block
log.Print(color.Green.Sprintf("[+] Block %d with 0 tx(s) [ Took : %s ]", block.NumberU64(), time.Now().UTC().Sub(startingAt)))
status.IncrementBlocksProcessed()
Expand Down Expand Up @@ -187,17 +187,6 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm

}

if !HasBlockFinalized(status, packedBlock.Block.Number) {

log.Print(color.LightRed.Sprintf("[x] Non-final block %d with %d tx(s) [ Took : %s | Latest Block : %d | In Queue : %d ]", packedBlock.Block.Number, block.Transactions().Len(), time.Now().UTC().Sub(startingAt), status.GetLatestBlockNumber(), GetUnfinalizedQueueLength(redis)))

// Pushing into unfinalized block queue, to be picked up only when
// finality for this block has been achieved
PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", packedBlock.Block.Number))
return true

}

// If block doesn't contain any tx, we'll attempt to persist only block
if err := db.StoreBlock(_db, packedBlock, status); err != nil {

Expand All @@ -209,6 +198,17 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm

}

if !HasBlockFinalized(status, packedBlock.Block.Number) {

log.Print(color.LightRed.Sprintf("[x] Non-final block %d with %d tx(s) [ Took : %s | Latest Block : %d | In Queue : %d ]", packedBlock.Block.Number, block.Transactions().Len(), time.Now().UTC().Sub(startingAt), status.GetLatestBlockNumber(), GetUnfinalizedQueueLength(redis)))

// Pushing into unfinalized block queue, to be picked up only when
// finality for this block has been achieved
PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", packedBlock.Block.Number))
return true

}

// Successfully processed block
log.Print(color.Green.Sprintf("[+] Block %d with %d tx(s) [ Took : %s ]", block.NumberU64(), block.Transactions().Len(), time.Now().UTC().Sub(startingAt)))

Expand Down
40 changes: 37 additions & 3 deletions app/block/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,46 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB,
for {
select {
case err := <-subs.Err():

log.Fatal(color.Red.Sprintf("[!] Listener stopped : %s", err.Error()))
break

case header := <-headerChan:

// Latest block number seen, is getting safely updated, as
// soon as new block mined data gets propagated to network
// At very beginning iteration, newly mined block number
// should be greater than max block number obtained from DB
if first && !(header.Number.Uint64() > status.MaxBlockNumberAtStartUp()) {

log.Fatal(color.Red.Sprintf("[!] Bad block received : expected > `%d`\n", status.MaxBlockNumberAtStartUp()))

}

// At any iteration other than first one, if received block number
// is more than latest block number + 1, it's definite that we've some
// block ( >=1 ) missed & the RPC node we're relying on might be feeding us with
// wrong data
//
// It's better stop relying on it, we crash the program
// @note This is not the state-of-the art solution, but this is it, as of now
// It can be improved.
if !first && header.Number.Uint64() > status.GetLatestBlockNumber()+1 {

log.Fatal(color.Red.Sprintf("[!] Bad block received %d, expected %d", header.Number.Uint64(), status.GetLatestBlockNumber()))

}

// At any iteration other than first one, if received block number
// not exactly current latest block number + 1, then it's probably one
// reorganization, we'll attempt to process this new block
if !first && !(header.Number.Uint64() == status.GetLatestBlockNumber()+1) {

log.Printf(color.Blue.Sprintf("[*] Received block %d again, expected %d, attempting to process", header.Number.Uint64(), status.GetLatestBlockNumber()+1))

} else {

log.Printf(color.Blue.Sprintf("[*] Received block %d, attempting to process", header.Number.Uint64()))

}

status.SetLatestBlockNumber(header.Number.Uint64())

if first {
Expand Down
6 changes: 5 additions & 1 deletion app/block/pack_block.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package block

import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/itzmeanjan/ette/app/db"
)
Expand All @@ -19,11 +20,14 @@ func BuildPackedBlock(block *types.Block, txs []*db.PackedTransaction) *db.Packe
Difficulty: block.Difficulty().String(),
GasUsed: block.GasUsed(),
GasLimit: block.GasLimit(),
Nonce: block.Nonce(),
Nonce: hexutil.EncodeUint64(block.Nonce()),
Miner: block.Coinbase().Hex(),
Size: float64(block.Size()),
StateRootHash: block.Root().Hex(),
UncleHash: block.UncleHash().Hex(),
TransactionRootHash: block.TxHash().Hex(),
ReceiptRootHash: block.ReceiptHash().Hex(),
ExtraData: block.Extra(),
}
packedBlock.Transactions = txs

Expand Down
3 changes: 3 additions & 0 deletions app/block/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ func PublishBlock(block *db.PackedBlock, redis *d.RedisInfo) {
Nonce: block.Block.Nonce,
Miner: block.Block.Miner,
Size: block.Block.Size,
StateRootHash: block.Block.StateRootHash,
UncleHash: block.Block.UncleHash,
TransactionRootHash: block.Block.TransactionRootHash,
ReceiptRootHash: block.Block.ReceiptRootHash,
ExtraData: block.Block.ExtraData,
}).Err(); err != nil {
log.Print(color.Red.Sprintf("[!] Failed to publish block %d in channel : %s", block.Block.Number, err.Error()))
return
Expand Down
29 changes: 19 additions & 10 deletions app/block/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
// Keeps repeating
func RetryQueueManager(client *ethclient.Client, _db *gorm.DB, redis *data.RedisInfo, status *d.StatusHolder) {
sleep := func() {
time.Sleep(time.Duration(1000) * time.Millisecond)
time.Sleep(time.Duration(100) * time.Millisecond)
}

// Creating worker pool and submitting jobs as soon as it's determined
Expand All @@ -42,8 +42,8 @@ func RetryQueueManager(client *ethclient.Client, _db *gorm.DB, redis *data.Redis
continue
}

attemptCount := GetAttemptCountFromTable(redis, blockNumber)
if attemptCount != 0 && attemptCount%3 != 0 {
attemptCount, _ := GetAttemptCountFromTable(redis, blockNumber)
if attemptCount != 0 && attemptCount%2 != 0 {

PushBlockIntoRetryQueue(redis, blockNumber)
continue
Expand Down Expand Up @@ -102,12 +102,21 @@ func PushBlockIntoRetryQueue(redis *data.RedisInfo, blockNumber string) {
// IncrementAttemptCountOfBlockNumber - Given block number, increments failed attempt count
// of processing this block
//
// If block doesn't yet exist in tracker table, it'll be inserted first time & counter to be set to 1
// If block doesn't yet exist in tracker table, it'll be inserted first time & counter to be set to 0
//
// It'll be wrapped back to 0 as soon as it reaches 101
// It'll be wrapped back to 0 as soon as it reaches 100
func IncrementAttemptCountOfBlockNumber(redis *data.RedisInfo, blockNumber string) {

wrappedAttemptCount := (GetAttemptCountFromTable(redis, blockNumber) + 1) % 101
var wrappedAttemptCount int

// Attempting to increment 👇, only when it's not first time
// when this attempt counter for block number being initialized
//
// So this ensures for first time it gets initialized to 0
attemptCount, err := GetAttemptCountFromTable(redis, blockNumber)
if err == nil {
wrappedAttemptCount = (int(attemptCount) + 1) % 100
}

if _, err := redis.Client.HSet(context.Background(), redis.BlockRetryCountTable, blockNumber, wrappedAttemptCount).Result(); err != nil {
log.Print(color.Red.Sprintf("[!] Failed to increment attempt count of block %s : %s", blockNumber, err.Error()))
Expand All @@ -129,19 +138,19 @@ func CheckBlockInAttemptCounterTable(redis *data.RedisInfo, blockNumber string)

// GetAttemptCountFromTable - Returns current attempt counter from table
// for given block number
func GetAttemptCountFromTable(redis *data.RedisInfo, blockNumber string) uint64 {
func GetAttemptCountFromTable(redis *data.RedisInfo, blockNumber string) (uint64, error) {

count, err := redis.Client.HGet(context.Background(), redis.BlockRetryCountTable, blockNumber).Result()
if err != nil {
return 0
return 0, err
}

parsedCount, err := strconv.ParseUint(count, 10, 64)
if err != nil {
return 0
return 0, err
}

return parsedCount
return parsedCount, nil

}

Expand Down
70 changes: 69 additions & 1 deletion app/common/common.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package common

import "github.com/ethereum/go-ethereum/common"
import (
"errors"
"strconv"

"github.com/ethereum/go-ethereum/common"
)

// StringifyEventTopics - Given array of event topic signatures,
// returns their stringified form, to be required for publishing data to subscribers
Expand All @@ -14,3 +19,66 @@ func StringifyEventTopics(data []common.Hash) []string {

return buffer
}

// CreateEventTopicMap - Given array of event topics, returns map
// of valid event topics, to be used when performing selective field based
// queries on event topics
func CreateEventTopicMap(topics []string) map[uint8]string {

_topics := make(map[uint8]string)

if topics[0] != "" {
_topics[0] = topics[0]
}

if topics[1] != "" {
_topics[1] = topics[1]
}

if topics[2] != "" {
_topics[2] = topics[2]
}

if topics[3] != "" {
_topics[3] = topics[3]
}

return _topics

}

// ParseNumber - Given an integer as string, attempts to parse it
func ParseNumber(number string) (uint64, error) {

_num, err := strconv.ParseUint(number, 10, 64)
if err != nil {

return 0, errors.New("Failed to parse integer")

}

return _num, nil

}

// RangeChecker - Checks whether given number range is at max
// `limit` far away
func RangeChecker(from string, to string, limit uint64) (uint64, uint64, error) {

_from, err := ParseNumber(from)
if err != nil {
return 0, 0, errors.New("Failed to parse integer")
}

_to, err := ParseNumber(to)
if err != nil {
return 0, 0, errors.New("Failed to parse integer")
}

if !(_to-_from < limit) {
return 0, 0, errors.New("Range too long")
}

return _from, _to, nil

}
32 changes: 31 additions & 1 deletion app/data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,14 @@ type Block struct {
Difficulty string `json:"difficulty" gorm:"column:difficulty"`
GasUsed uint64 `json:"gasUsed" gorm:"column:gasused"`
GasLimit uint64 `json:"gasLimit" gorm:"column:gaslimit"`
Nonce uint64 `json:"nonce" gorm:"column:nonce"`
Nonce string `json:"nonce" gorm:"column:nonce"`
Miner string `json:"miner" gorm:"column:miner"`
Size float64 `json:"size" gorm:"column:size"`
StateRootHash string `json:"stateRootHash" gorm:"column:stateroothash"`
UncleHash string `json:"uncleHash" gorm:"column:unclehash"`
TransactionRootHash string `json:"txRootHash" gorm:"column:txroothash"`
ReceiptRootHash string `json:"receiptRootHash" gorm:"column:receiptroothash"`
ExtraData []byte `json:"extraData" gorm:"column:extradata"`
}

// MarshalBinary - Implementing binary marshalling function, to be invoked
Expand All @@ -199,6 +202,33 @@ func (b *Block) MarshalBinary() ([]byte, error) {
return json.Marshal(b)
}

// MarshalJSON - Custom JSON encoder
func (b *Block) MarshalJSON() ([]byte, error) {

extraData := ""
if _h := hex.EncodeToString(b.ExtraData); _h != "" {
extraData = fmt.Sprintf("0x%s", _h)
}

return []byte(fmt.Sprintf(`{"hash":%q,"number":%d,"time":%d,"parentHash":%q,"difficulty":%q,"gasUsed":%d,"gasLimit":%d,"nonce":%q,"miner":%q,"size":%f,"stateRootHash":%q,"uncleHash":%q,"txRootHash":%q,"receiptRootHash":%q,"extraData":%q}`,
b.Hash,
b.Number,
b.Time,
b.ParentHash,
b.Difficulty,
b.GasUsed,
b.GasLimit,
b.Nonce,
b.Miner,
b.Size,
b.StateRootHash,
b.UncleHash,
b.TransactionRootHash,
b.ReceiptRootHash,
extraData)), nil

}

// ToJSON - Encodes into JSON, to be supplied when queried for block data
func (b *Block) ToJSON() []byte {
data, err := json.Marshal(b)
Expand Down
Loading

0 comments on commit 83de186

Please sign in to comment.