diff --git a/README.md b/README.md index 66f44eb8..7214cb7e 100644 --- a/README.md +++ b/README.md @@ -95,6 +95,9 @@ cd ette ``` - For testing historical data query using browser based GraphQL Playground in `ette`, you can set `EtteGraphQLPlayGround` to `yes` in config file + - For processing block(s)/ tx(s) concurrently, it'll create `ConcurrencyFactor * #-of CPUs on machine` workers, who will pick up jobs submitted to them. + - If nothing is specified, it defaults to 1 & assuming you're running `ette` on machine with 4 CPUs, it'll spawn worker pool of size 4. But more number of jobs can be submitted, only 4 can be running at max. + - 👆 being done for controlling concurrency level, by putting more control on user's hand. ``` RPCUrl=https:// @@ -113,6 +116,7 @@ Domain=localhost Production=yes EtteMode=3 EtteGraphQLPlayGround=yes +ConcurrencyFactor=2 ``` - Create another file in same directory, named `.plans.json`, whose content will look like 👇. diff --git a/app/app.go b/app/app.go index ee4825ea..ed0bcfbe 100644 --- a/app/app.go +++ b/app/app.go @@ -3,9 +3,12 @@ package app import ( "context" "log" + "os" + "os/signal" "sync" "github.com/go-redis/redis/v8" + "github.com/gookit/color" blk "github.com/itzmeanjan/ette/app/block" cfg "github.com/itzmeanjan/ette/app/config" d "github.com/itzmeanjan/ette/app/data" @@ -16,7 +19,7 @@ import ( ) // Setting ground up -func bootstrap(configFile, subscriptionPlansFile string) (*d.BlockChainNodeConnection, *redis.Client, *gorm.DB, *sync.Mutex, *d.SyncState) { +func bootstrap(configFile, subscriptionPlansFile string) (*d.BlockChainNodeConnection, *redis.Client, *gorm.DB, *d.StatusHolder) { err := cfg.Read(configFile) if err != nil { log.Fatalf("[!] Failed to read `.env` : %s\n", err.Error()) @@ -48,19 +51,58 @@ func bootstrap(configFile, subscriptionPlansFile string) (*d.BlockChainNodeConne // for resolving graphQL queries graph.GetDatabaseConnection(_db) - _lock := &sync.Mutex{} - _synced := &d.SyncState{Done: 0, BlockCountAtStartUp: db.GetBlockCount(_db), NewBlocksInserted: 0} + _status := &d.StatusHolder{ + State: &d.SyncState{BlockCountAtStartUp: db.GetBlockCount(_db)}, + Mutex: &sync.RWMutex{}, + } - return _connection, _redisClient, _db, _lock, _synced + return _connection, _redisClient, _db, _status } // Run - Application to be invoked from main runner using this function func Run(configFile, subscriptionPlansFile string) { - _connection, _redisClient, _db, _lock, _synced := bootstrap(configFile, subscriptionPlansFile) + _connection, _redisClient, _db, _status := bootstrap(configFile, subscriptionPlansFile) + _redisInfo := d.RedisInfo{ + Client: _redisClient, + QueueName: "blocks", + } + + // Attempting to listen to Ctrl+C signal + // and when received gracefully shutting down `ette` + interruptChan := make(chan os.Signal, 1) + signal.Notify(interruptChan, os.Interrupt) + + // All resources being used gets cleaned up + // when we're returning from this function scope + go func() { + + <-interruptChan + + sql, err := _db.DB() + if err != nil { + log.Printf(color.Red.Sprintf("[!] Failed to get underlying DB connection : %s", err.Error())) + return + } + + if err := sql.Close(); err != nil { + log.Printf(color.Red.Sprintf("[!] Failed to close underlying DB connection : %s", err.Error())) + return + } + + if err := _redisInfo.Client.Close(); err != nil { + log.Printf(color.Red.Sprintf("[!] Failed to close connection to Redis : %s", err.Error())) + return + } + + // Stopping process + log.Printf(color.Magenta.Sprintf("\n[+] Gracefully shut down `ette`")) + os.Exit(0) + + }() // Pushing block header propagation listener to another thread of execution - go blk.SubscribeToNewBlocks(_connection, _db, _lock, _synced, _redisClient, "blocks") + go blk.SubscribeToNewBlocks(_connection, _db, _status, &_redisInfo) // Starting http server on main thread - rest.RunHTTPServer(_db, _lock, _synced, _redisClient) + rest.RunHTTPServer(_db, _status, _redisClient) } diff --git a/app/block/block.go b/app/block/block.go index 7e47f11b..33bdb672 100644 --- a/app/block/block.go +++ b/app/block/block.go @@ -1,16 +1,12 @@ package block import ( - "context" - "fmt" "log" - "math/big" - "sync" + "runtime" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" - "github.com/go-redis/redis/v8" + "github.com/gammazero/workerpool" "github.com/gookit/color" cfg "github.com/itzmeanjan/ette/app/config" d "github.com/itzmeanjan/ette/app/data" @@ -18,151 +14,141 @@ import ( "gorm.io/gorm" ) -// Fetching block content using blockHash -func fetchBlockByHash(client *ethclient.Client, hash common.Hash, number string, _db *gorm.DB, redisClient *redis.Client, redisKey string, _lock *sync.Mutex, _synced *d.SyncState) { - block, err := client.BlockByHash(context.Background(), hash) - if err != nil { - // Pushing block number into Redis queue for retrying later - pushBlockHashIntoRedisQueue(redisClient, redisKey, number) +// ProcessBlockContent - Processes everything inside this block i.e. block data, tx data, event data +func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm.DB, redis *d.RedisInfo, publishable bool, status *d.StatusHolder) { - log.Print(color.Red.Sprintf("[!] Failed to fetch block by hash [ block : %s] : %s", number, err.Error())) - return - } + // Closure managing publishing whole block data i.e. block header, txn(s), event logs + // on redis pubsub channel + pubsubWorker := func(txns []*db.PackedTransaction) *db.PackedBlock { - // Publishes block data to all listening parties - // on `block` channel - publishBlock := func() { - if err := redisClient.Publish(context.Background(), "block", &d.Block{ - Hash: block.Hash().Hex(), - Number: block.NumberU64(), - Time: block.Time(), - ParentHash: block.ParentHash().Hex(), - Difficulty: block.Difficulty().String(), - GasUsed: block.GasUsed(), - GasLimit: block.GasLimit(), - Nonce: block.Nonce(), - Miner: block.Coinbase().Hex(), - Size: float64(block.Size()), - TransactionRootHash: block.TxHash().Hex(), - ReceiptRootHash: block.ReceiptHash().Hex(), - }).Err(); err != nil { - log.Print(color.Red.Sprintf("[!] Failed to publish block %d in channel : %s", block.NumberU64(), err.Error())) - } - } + // Constructing block data to published & persisted + packedBlock := BuildPackedBlock(block, txns) - // Controlling behaviour of ette depending upon value of `EtteMode` - switch cfg.Get("EtteMode") { - case "1": - if !db.StoreBlock(_db, block, _lock, _synced) { - // Pushing block number into Redis queue for retrying later - // because it failed to store block in database - pushBlockHashIntoRedisQueue(redisClient, redisKey, number) - return + // Attempting to publish whole block data to redis pubsub channel + if publishable && (cfg.Get("EtteMode") == "1" || cfg.Get("EtteMode") == "3") { + PublishBlock(packedBlock, redis) } - case "2": - publishBlock() - case "3": - // Try completing task of publishing block data, first - // then we'll attempt to store it, is that fails, we'll push it to retry queue - publishBlock() - - if !db.StoreBlock(_db, block, _lock, _synced) { - // Pushing block number into Redis queue for retrying later - // because it failed to store block in database - pushBlockHashIntoRedisQueue(redisClient, redisKey, number) - return - } - } - fetchBlockContent(client, block, _db, redisClient, redisKey, true, _lock, _synced) -} + return packedBlock -// Fetching block content using block number -func fetchBlockByNumber(client *ethclient.Client, number uint64, _db *gorm.DB, redisClient *redis.Client, redisKey string, _lock *sync.Mutex, _synced *d.SyncState) { - _num := big.NewInt(0) - _num = _num.SetUint64(number) + } - block, err := client.BlockByNumber(context.Background(), _num) - if err != nil { - // Pushing block number into Redis queue for retrying later - pushBlockHashIntoRedisQueue(redisClient, redisKey, fmt.Sprintf("%d", number)) + if block.Transactions().Len() == 0 { - log.Print(color.Red.Sprintf("[!] Failed to fetch block by number [ block : %d ] : %s", number, err)) - return - } + // Constructing block data to be persisted + // + // This is what we just published on pubsub channel + packedBlock := pubsubWorker(nil) - // Either creates new entry or updates existing one - if !db.StoreBlock(_db, block, _lock, _synced) { - // Pushing block number into Redis queue for retrying later - pushBlockHashIntoRedisQueue(redisClient, redisKey, fmt.Sprintf("%d", number)) - return - } + // If block doesn't contain any tx, we'll attempt to persist only block + if err := db.StoreBlock(_db, packedBlock, status); err != nil { - fetchBlockContent(client, block, _db, redisClient, redisKey, false, _lock, _synced) -} + // If failed to persist, we'll put it in retry queue + pushBlockHashIntoRedisQueue(redis, block.Number().String()) + return -// Fetching all transactions in this block, along with their receipt -func fetchBlockContent(client *ethclient.Client, block *types.Block, _db *gorm.DB, redisClient *redis.Client, redisKey string, publishable bool, _lock *sync.Mutex, _synced *d.SyncState) { - if block.Transactions().Len() == 0 { + } + + // Successfully processed block log.Print(color.Green.Sprintf("[+] Block %d with 0 tx(s)", block.NumberU64())) + status.IncrementBlocksProcessed() - safeUpdationOfSyncState(_lock, _synced) return + } // Communication channel to be shared between multiple executing go routines // which are trying to fetch all tx(s) present in block, concurrently - returnValChan := make(chan bool) + returnValChan := make(chan *db.PackedTransaction, runtime.NumCPU()*int(cfg.GetConcurrencyFactor())) + + // -- Tx processing starting + // Creating job processor queue + // which will process all tx(s), concurrently + wp := workerpool.New(runtime.NumCPU() * int(cfg.GetConcurrencyFactor())) // Concurrently trying to process all tx(s) for this block, in hope of better performance for _, v := range block.Transactions() { // Concurrently trying to fetch multiple tx(s) present in block - // and expecting their status result to be published on shared channel + // and expecting their return value to be published on shared channel // // Which is being read 👇 - go func(tx *types.Transaction) { - fetchTransactionByHash(client, block, tx, _db, redisClient, redisKey, publishable, - _lock, _synced, returnValChan) + func(tx *types.Transaction) { + wp.Submit(func() { + + FetchTransactionByHash(client, + block, + tx, + _db, + redis, + publishable, + status, + returnValChan) + + }) }(v) } // Keeping track of how many of these tx fetchers succeded & how many of them failed result := d.ResultStatus{} + // Data received from tx fetchers, to be stored here + packedTxs := make([]*db.PackedTransaction, block.Transactions().Len()) for v := range returnValChan { - if v { + if v != nil { result.Success++ } else { result.Failure++ } + // #-of tx fetchers completed their job till now + // + // Either successfully or failed some how + total := int(result.Total()) + // Storing tx data received from just completed go routine + packedTxs[total-1] = v + // All go routines have completed their job - if result.Total() == uint64(block.Transactions().Len()) { + if total == block.Transactions().Len() { break } } - // When all tx(s) are successfully processed ( as they have informed us over go channel ), - // we're happy to exit from this context, given that none of them failed - if result.Failure == 0 { - log.Print(color.Green.Sprintf("[+] Block %d with %d tx(s)", block.NumberU64(), len(block.Transactions()))) + // Stopping job processor forcefully + // because by this time all jobs have been completed + // + // Otherwise control flow will not be able to come here + // it'll keep looping in 👆 loop, reading from channel + wp.Stop() + // -- Tx processing ending - safeUpdationOfSyncState(_lock, _synced) + // When all tx(s) aren't successfully processed ( as they have informed us over go channel ), + // we're exiting from this context, while putting this block number in retry queue + if !(result.Failure == 0) { + + // If failed to persist, we'll put it in retry queue + pushBlockHashIntoRedisQueue(redis, block.Number().String()) return + } - // Pushing block number into Redis queue for retrying later - // because it failed to complete some of its jobs 👆 - pushBlockHashIntoRedisQueue(redisClient, redisKey, block.Number().String()) -} + // Constructing block data to be persisted + // + // This is what we just published on pubsub channel + packedBlock := pubsubWorker(packedTxs) + + // If block doesn't contain any tx, we'll attempt to persist only block + if err := db.StoreBlock(_db, packedBlock, status); err != nil { + + // If failed to persist, we'll put it in retry queue + pushBlockHashIntoRedisQueue(redis, block.Number().String()) + return + + } -// Updating shared varible between worker go routines, denoting progress of -// `ette`, in terms of data syncing -func safeUpdationOfSyncState(_lock *sync.Mutex, _synced *d.SyncState) { - _lock.Lock() - defer _lock.Unlock() + // Successfully processed block + log.Print(color.Green.Sprintf("[+] Block %d with %d tx(s)", block.NumberU64(), block.Transactions().Len())) + status.IncrementBlocksProcessed() - _synced.Done++ } diff --git a/app/block/fetch.go b/app/block/fetch.go new file mode 100644 index 00000000..cd31addf --- /dev/null +++ b/app/block/fetch.go @@ -0,0 +1,78 @@ +package block + +import ( + "context" + "fmt" + "log" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/gookit/color" + d "github.com/itzmeanjan/ette/app/data" + "github.com/itzmeanjan/ette/app/db" + "gorm.io/gorm" +) + +// FetchBlockByHash - Fetching block content using blockHash +func FetchBlockByHash(client *ethclient.Client, hash common.Hash, number string, _db *gorm.DB, redis *d.RedisInfo, _status *d.StatusHolder) { + block, err := client.BlockByHash(context.Background(), hash) + if err != nil { + // Pushing block number into Redis queue for retrying later + pushBlockHashIntoRedisQueue(redis, number) + + log.Print(color.Red.Sprintf("[!] Failed to fetch block by hash [ block : %s] : %s", number, err.Error())) + return + } + + ProcessBlockContent(client, block, _db, redis, true, _status) +} + +// FetchBlockByNumber - Fetching block content using block number +func FetchBlockByNumber(client *ethclient.Client, number uint64, _db *gorm.DB, redis *d.RedisInfo, _status *d.StatusHolder) { + _num := big.NewInt(0) + _num.SetUint64(number) + + block, err := client.BlockByNumber(context.Background(), _num) + if err != nil { + // Pushing block number into Redis queue for retrying later + pushBlockHashIntoRedisQueue(redis, fmt.Sprintf("%d", number)) + + log.Print(color.Red.Sprintf("[!] Failed to fetch block by number [ block : %d ] : %s", number, err)) + return + } + + ProcessBlockContent(client, block, _db, redis, false, _status) +} + +// FetchTransactionByHash - Fetching specific transaction related data, tries to publish data if required +// & lets listener go routine know about all tx, event data it collected while processing this tx, +// which will be attempted to be stored in database +func FetchTransactionByHash(client *ethclient.Client, block *types.Block, tx *types.Transaction, _db *gorm.DB, redis *d.RedisInfo, publishable bool, _status *d.StatusHolder, returnValChan chan *db.PackedTransaction) { + + receipt, err := client.TransactionReceipt(context.Background(), tx.Hash()) + if err != nil { + log.Print(color.Red.Sprintf("[!] Failed to fetch tx receipt [ block : %d ] : %s", block.NumberU64(), err.Error())) + + // Passing nil, to denote, failed to fetch all tx data + // from blockchain node + returnValChan <- nil + return + } + + sender, err := client.TransactionSender(context.Background(), tx, block.Hash(), receipt.TransactionIndex) + if err != nil { + log.Print(color.Red.Sprintf("[!] Failed to fetch tx sender [ block : %d ] : %s", block.NumberU64(), err.Error())) + + // Passing nil, to denote, failed to fetch all tx data + // from blockchain node + returnValChan <- nil + return + } + + // Passing all tx related data to listener go routine + // so that it can attempt to store whole block data + // into database + returnValChan <- BuildPackedTx(tx, sender, receipt) +} diff --git a/app/block/listener.go b/app/block/listener.go index 179ade8c..2cdc4990 100644 --- a/app/block/listener.go +++ b/app/block/listener.go @@ -3,11 +3,11 @@ package block import ( "context" "log" - "sync" - "time" + "runtime" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" - "github.com/go-redis/redis/v8" + "github.com/gammazero/workerpool" "github.com/gookit/color" cfg "github.com/itzmeanjan/ette/app/config" d "github.com/itzmeanjan/ette/app/data" @@ -18,35 +18,29 @@ import ( // SubscribeToNewBlocks - Listen for event when new block header is // available, then fetch block content ( including all transactions ) // in different worker -func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB, _lock *sync.Mutex, _synced *d.SyncState, redisClient *redis.Client, redisKey string) { +func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB, status *d.StatusHolder, redis *d.RedisInfo) { headerChan := make(chan *types.Header) subs, err := connection.Websocket.SubscribeNewHead(context.Background(), headerChan) if err != nil { log.Fatal(color.Red.Sprintf("[!] Failed to subscribe to block headers : %s", err.Error())) } - - // Scheduling unsubscribe, to be executed when end of this block is reached + // Scheduling unsubscribe, to be executed when end of this execution scope is reached defer subs.Unsubscribe() - // Starting go routine for fetching blocks `ette` failed to process in previous attempt - // - // Uses Redis backed queue for fetching pending block hash & retries - go retryBlockFetching(connection.RPC, _db, redisClient, redisKey, _lock, _synced) - // Last time `ette` stopped syncing here currentHighestBlockNumber := db.GetCurrentBlockNumber(_db) - // Starting now, to be used for calculating system performance, uptime etc. - _lock.Lock() - _synced.StartedAt = time.Now().UTC() - _lock.Unlock() - - // Flag to check for whether this is first time block header being received - // or not + // Flag to check for whether this is first time block header being received or not // - // If yes, we'll start syncer to fetch all block starting from 0 to this block + // If yes, we'll start syncer to fetch all block in range (last block processed, latest block) first := true + // Creating a job queue of size `#-of CPUs present in machine` + // where block fetching requests to be submitted + wp := workerpool.New(runtime.NumCPU() * int(cfg.GetConcurrencyFactor())) + // Scheduling worker pool closing, to be called, + // when returning from this execution scope i.e. function + defer wp.Stop() for { select { @@ -56,6 +50,9 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB, case header := <-headerChan: if first { + // Starting now, to be used for calculating system performance, uptime etc. + status.SetStartedAt() + // If historical data query features are enabled // only then we need to sync to latest state of block chain if cfg.Get("EtteMode") == "1" || cfg.Get("EtteMode") == "3" { @@ -70,15 +67,45 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB, // So, it'll check & decide whether persisting again is required or not // // This backward traversal mechanism gives us more recent blockchain happenings to cover - go SyncBlocksByRange(connection.RPC, _db, redisClient, redisKey, header.Number.Uint64()-1, currentHighestBlockNumber, _lock, _synced) + go SyncBlocksByRange(connection.RPC, _db, redis, header.Number.Uint64()-1, currentHighestBlockNumber, status) + + // Starting go routine for fetching blocks `ette` failed to process in previous attempt + // + // Uses Redis backed queue for fetching pending block hash & retries + go retryBlockFetching(connection.RPC, _db, redis, status) + // Making sure on when next latest block header is received, it'll not // start another syncer - first = false } + first = false } - go fetchBlockByHash(connection.RPC, header.Hash(), header.Number.String(), _db, redisClient, redisKey, _lock, _synced) + // As soon as new block is mined, `ette` will try to fetch it + // and that job will be submitted in job queue + // + // Putting it in a different function scope for safety purpose + // so that job submitter gets its own copy of block number & block hash, + // otherwise it might get wrong info, if new block gets mined very soon & + // this job is not yet submitted + // + // Though it'll be picked up sometime in future ( by missing block finder ), but it can be safely handled now + // so that it gets processed immediately + func(blockHash common.Hash, blockNumber string) { + + wp.Submit(func() { + + FetchBlockByHash(connection.RPC, + blockHash, + blockNumber, + _db, + redis, + status) + + }) + + }(header.Hash(), header.Number.String()) + } } } diff --git a/app/block/pack_block.go b/app/block/pack_block.go new file mode 100644 index 00000000..438d125b --- /dev/null +++ b/app/block/pack_block.go @@ -0,0 +1,32 @@ +package block + +import ( + "github.com/ethereum/go-ethereum/core/types" + "github.com/itzmeanjan/ette/app/db" +) + +// BuildPackedBlock - Builds struct holding whole block data i.e. +// block header, block body i.e. tx(s), event log(s) +func BuildPackedBlock(block *types.Block, txs []*db.PackedTransaction) *db.PackedBlock { + + packedBlock := &db.PackedBlock{} + + packedBlock.Block = &db.Blocks{ + Hash: block.Hash().Hex(), + Number: block.NumberU64(), + Time: block.Time(), + ParentHash: block.ParentHash().Hex(), + Difficulty: block.Difficulty().String(), + GasUsed: block.GasUsed(), + GasLimit: block.GasLimit(), + Nonce: block.Nonce(), + Miner: block.Coinbase().Hex(), + Size: float64(block.Size()), + TransactionRootHash: block.TxHash().Hex(), + ReceiptRootHash: block.ReceiptHash().Hex(), + } + packedBlock.Transactions = txs + + return packedBlock + +} diff --git a/app/block/pack_tx.go b/app/block/pack_tx.go new file mode 100644 index 00000000..a6562c20 --- /dev/null +++ b/app/block/pack_tx.go @@ -0,0 +1,67 @@ +package block + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + c "github.com/itzmeanjan/ette/app/common" + "github.com/itzmeanjan/ette/app/db" +) + +// BuildPackedTx - Putting all information, `ette` will keep for one tx +// into a single structure, so that it becomes easier to pass to & from functions +func BuildPackedTx(tx *types.Transaction, sender common.Address, receipt *types.Receipt) *db.PackedTransaction { + + packedTx := &db.PackedTransaction{} + + if tx.To() == nil { + + packedTx.Tx = &db.Transactions{ + Hash: tx.Hash().Hex(), + From: sender.Hex(), + Contract: receipt.ContractAddress.Hex(), + Value: tx.Value().String(), + Data: tx.Data(), + Gas: tx.Gas(), + GasPrice: tx.GasPrice().String(), + Cost: tx.Cost().String(), + Nonce: tx.Nonce(), + State: receipt.Status, + BlockHash: receipt.BlockHash.Hex(), + } + + } else { + + packedTx.Tx = &db.Transactions{ + Hash: tx.Hash().Hex(), + From: sender.Hex(), + To: tx.To().Hex(), + Value: tx.Value().String(), + Data: tx.Data(), + Gas: tx.Gas(), + GasPrice: tx.GasPrice().String(), + Cost: tx.Cost().String(), + Nonce: tx.Nonce(), + State: receipt.Status, + BlockHash: receipt.BlockHash.Hex(), + } + + } + + packedTx.Events = make([]*db.Events, len(receipt.Logs)) + + for k, v := range receipt.Logs { + + packedTx.Events[k] = &db.Events{ + Origin: v.Address.Hex(), + Index: v.Index, + Topics: c.StringifyEventTopics(v.Topics), + Data: v.Data, + TransactionHash: v.TxHash.Hex(), + BlockHash: v.BlockHash.Hex(), + } + + } + + return packedTx + +} diff --git a/app/block/publish.go b/app/block/publish.go new file mode 100644 index 00000000..cd3ffc01 --- /dev/null +++ b/app/block/publish.go @@ -0,0 +1,137 @@ +package block + +import ( + "context" + "log" + + "github.com/gookit/color" + d "github.com/itzmeanjan/ette/app/data" + "github.com/itzmeanjan/ette/app/db" +) + +// PublishBlock - Attempts to publish block data to Redis pubsub channel +func PublishBlock(block *db.PackedBlock, redis *d.RedisInfo) { + + if err := redis.Client.Publish(context.Background(), "block", &d.Block{ + Hash: block.Block.Hash, + Number: block.Block.Number, + Time: block.Block.Time, + ParentHash: block.Block.ParentHash, + Difficulty: block.Block.Difficulty, + GasUsed: block.Block.GasUsed, + GasLimit: block.Block.GasLimit, + Nonce: block.Block.Nonce, + Miner: block.Block.Miner, + Size: block.Block.Size, + TransactionRootHash: block.Block.TransactionRootHash, + ReceiptRootHash: block.Block.ReceiptRootHash, + }).Err(); err != nil { + log.Print(color.Red.Sprintf("[!] Failed to publish block %d in channel : %s", block.Block.Number, err.Error())) + return + } + + PublishTxs(block.Block.Number, block.Transactions, redis) + +} + +// PublishTxs - Publishes all transactions in a block to redis pubsub +// channel +func PublishTxs(blockNumber uint64, txs []*db.PackedTransaction, redis *d.RedisInfo) { + + if txs == nil { + return + } + + for _, t := range txs { + PublishTx(blockNumber, t, redis) + } + +} + +// PublishTx - Publishes tx & events in tx, related data to respective +// Redis pubsub channel +func PublishTx(blockNumber uint64, tx *db.PackedTransaction, redis *d.RedisInfo) { + + if tx == nil { + return + } + + var pTx *d.Transaction + + if tx.Tx.To == "" { + // This is a contract creation tx + pTx = &d.Transaction{ + Hash: tx.Tx.Hash, + From: tx.Tx.From, + Contract: tx.Tx.Contract, + Value: tx.Tx.Value, + Data: tx.Tx.Data, + Gas: tx.Tx.Gas, + GasPrice: tx.Tx.GasPrice, + Cost: tx.Tx.Cost, + Nonce: tx.Tx.Nonce, + State: tx.Tx.State, + BlockHash: tx.Tx.BlockHash, + } + } else { + // This is a normal tx, so we keep contract field empty + pTx = &d.Transaction{ + Hash: tx.Tx.Hash, + From: tx.Tx.From, + To: tx.Tx.To, + Value: tx.Tx.Value, + Data: tx.Tx.Data, + Gas: tx.Tx.Gas, + GasPrice: tx.Tx.GasPrice, + Cost: tx.Tx.Cost, + Nonce: tx.Tx.Nonce, + State: tx.Tx.State, + BlockHash: tx.Tx.BlockHash, + } + } + + if err := redis.Client.Publish(context.Background(), "transaction", pTx).Err(); err != nil { + log.Print(color.Red.Sprintf("[!] Failed to publish transaction from block %d : %s", blockNumber, err.Error())) + return + } + + PublishEvents(blockNumber, tx.Events, redis) + +} + +// PublishEvents - Iterate over all events & try to publish them on +// redis pubsub channel +func PublishEvents(blockNumber uint64, events []*db.Events, redis *d.RedisInfo) { + + if events == nil { + return + } + + for _, e := range events { + PublishEvent(blockNumber, e, redis) + } + +} + +// PublishEvent - Publishing event/ log entry to redis pub-sub topic, to be captured by subscribers +// and sent to client application, who are interested in this piece of data +// after applying filter +func PublishEvent(blockNumber uint64, event *db.Events, redis *d.RedisInfo) { + + if event == nil { + return + } + + if err := redis.Client.Publish(context.Background(), "event", &d.Event{ + Origin: event.Origin, + Index: event.Index, + Topics: event.Topics, + Data: event.Data, + TransactionHash: event.TransactionHash, + BlockHash: event.BlockHash, + }).Err(); err != nil { + log.Print(color.Red.Sprintf("[!] Failed to publish event from block %d : %s", blockNumber, err.Error())) + return + } + +} diff --git a/app/block/retry.go b/app/block/retry.go index 85463884..9293c825 100644 --- a/app/block/retry.go +++ b/app/block/retry.go @@ -3,13 +3,16 @@ package block import ( "context" "log" + "runtime" "strconv" - "sync" "time" "github.com/ethereum/go-ethereum/ethclient" - "github.com/go-redis/redis/v8" + "github.com/gammazero/workerpool" + _redis "github.com/go-redis/redis/v8" "github.com/gookit/color" + cfg "github.com/itzmeanjan/ette/app/config" + "github.com/itzmeanjan/ette/app/data" d "github.com/itzmeanjan/ette/app/data" "gorm.io/gorm" ) @@ -19,16 +22,21 @@ import ( // Sleeps for 1000 milliseconds // // Keeps repeating -func retryBlockFetching(client *ethclient.Client, _db *gorm.DB, redisClient *redis.Client, redisKey string, _lock *sync.Mutex, _synced *d.SyncState) { +func retryBlockFetching(client *ethclient.Client, _db *gorm.DB, redis *data.RedisInfo, status *d.StatusHolder) { sleep := func() { - time.Sleep(time.Duration(1) * time.Second) + time.Sleep(time.Duration(500) * time.Millisecond) } + // Creating worker pool and submitting jobs as soon as it's determined + // there's `to be processed` blocks in retry queue + wp := workerpool.New(runtime.NumCPU() * int(cfg.GetConcurrencyFactor())) + defer wp.Stop() + for { sleep() // Popping oldest element from Redis queue - blockNumber, err := redisClient.LPop(context.Background(), redisKey).Result() + blockNumber, err := redis.Client.LPop(context.Background(), redis.QueueName).Result() if err != nil { continue } @@ -39,23 +47,33 @@ func retryBlockFetching(client *ethclient.Client, _db *gorm.DB, redisClient *red continue } - queuedBlocks, err := redisClient.LLen(context.Background(), redisKey).Result() - if err != nil { - log.Printf(color.Red.Sprintf("[!] Failed to determine Redis queue length : %s", err.Error())) - } + log.Print(color.Cyan.Sprintf("[~] Retrying block : %d [ In Queue : %d ]", parsedBlockNumber, getRetryQueueLength(redis))) - log.Print(color.Cyan.Sprintf("[~] Retrying block : %d [ In Queue : %d ]", parsedBlockNumber, queuedBlocks)) - go fetchBlockByNumber(client, parsedBlockNumber, _db, redisClient, redisKey, _lock, _synced) + // Submitting block processor job into pool + // which will be picked up & processed + // + // This will stop us from blindly creating too many go routines + func(blockNumber uint64) { + wp.Submit(func() { + + FetchBlockByNumber(client, + parsedBlockNumber, + _db, + redis, + status) + + }) + }(parsedBlockNumber) } } // Pushes failed to fetch block number at end of Redis queue // given it has not already been added -func pushBlockHashIntoRedisQueue(redisClient *redis.Client, redisKey string, blockNumber string) { +func pushBlockHashIntoRedisQueue(redis *data.RedisInfo, blockNumber string) { // Checking presence first & then deciding whether to add it or not - if !checkExistenceOfBlockNumberInRedisQueue(redisClient, redisKey, blockNumber) { + if !checkExistenceOfBlockNumberInRedisQueue(redis, blockNumber) { - if err := redisClient.RPush(context.Background(), redisKey, blockNumber).Err(); err != nil { + if err := redis.Client.RPush(context.Background(), redis.QueueName, blockNumber).Err(); err != nil { log.Print(color.Red.Sprintf("[!] Failed to push block %s : %s", blockNumber, err.Error())) } @@ -68,10 +86,22 @@ func pushBlockHashIntoRedisQueue(redisClient *redis.Client, redisKey string, blo // // Note: this feature of checking index of value in redis queue, // was added in Redis v6.0.6 : https://redis.io/commands/lpos -func checkExistenceOfBlockNumberInRedisQueue(redisClient *redis.Client, redisKey string, blockNumber string) bool { - if _, err := redisClient.LPos(context.Background(), redisKey, blockNumber, redis.LPosArgs{}).Result(); err != nil { +func checkExistenceOfBlockNumberInRedisQueue(redis *data.RedisInfo, blockNumber string) bool { + if _, err := redis.Client.LPos(context.Background(), redis.QueueName, blockNumber, _redis.LPosArgs{}).Result(); err != nil { return false } return true } + +// Returns redis backed retry queue length +func getRetryQueueLength(redis *data.RedisInfo) int64 { + + blockCount, err := redis.Client.LLen(context.Background(), redis.QueueName).Result() + if err != nil { + log.Printf(color.Red.Sprintf("[!] Failed to determine Redis queue length : %s", err.Error())) + } + + return blockCount + +} diff --git a/app/block/syncer.go b/app/block/syncer.go index 4f1ae505..3b983c0e 100644 --- a/app/block/syncer.go +++ b/app/block/syncer.go @@ -4,13 +4,13 @@ import ( "fmt" "log" "runtime" - "sync" "time" "github.com/ethereum/go-ethereum/ethclient" "github.com/gammazero/workerpool" - "github.com/go-redis/redis/v8" "github.com/gookit/color" + cfg "github.com/itzmeanjan/ette/app/config" + "github.com/itzmeanjan/ette/app/data" d "github.com/itzmeanjan/ette/app/data" "github.com/itzmeanjan/ette/app/db" "gorm.io/gorm" @@ -21,13 +21,13 @@ import ( // while running n workers concurrently, where n = number of cores this machine has // // Waits for all of them to complete -func Syncer(client *ethclient.Client, _db *gorm.DB, redisClient *redis.Client, redisKey string, fromBlock uint64, toBlock uint64, _lock *sync.Mutex, _synced *d.SyncState, jd func(*workerpool.WorkerPool, *d.Job)) { +func Syncer(client *ethclient.Client, _db *gorm.DB, redis *data.RedisInfo, fromBlock uint64, toBlock uint64, status *d.StatusHolder, jd func(*workerpool.WorkerPool, *d.Job)) { if !(fromBlock <= toBlock) { log.Print(color.Red.Sprintf("[!] Bad block range for syncer")) return } - wp := workerpool.New(runtime.NumCPU()) + wp := workerpool.New(runtime.NumCPU() * int(cfg.GetConcurrencyFactor())) i := fromBlock j := toBlock @@ -35,13 +35,11 @@ func Syncer(client *ethclient.Client, _db *gorm.DB, redisClient *redis.Client, r // just mentioning which block needs to be fetched job := func(num uint64) { jd(wp, &d.Job{ - Client: client, - DB: _db, - RedisClient: redisClient, - RedisKey: redisKey, - Block: num, - Lock: _lock, - Synced: _synced, + Client: client, + DB: _db, + Redis: redis, + Block: num, + Status: status, }) } @@ -65,7 +63,7 @@ func Syncer(client *ethclient.Client, _db *gorm.DB, redisClient *redis.Client, r // // Range can be either ascending or descending, depending upon that proper arguments to be // passed to `Syncer` function during invokation -func SyncBlocksByRange(client *ethclient.Client, _db *gorm.DB, redisClient *redis.Client, redisKey string, fromBlock uint64, toBlock uint64, _lock *sync.Mutex, _synced *d.SyncState) { +func SyncBlocksByRange(client *ethclient.Client, _db *gorm.DB, redis *data.RedisInfo, fromBlock uint64, toBlock uint64, status *d.StatusHolder) { // Job to be submitted and executed by each worker // @@ -73,7 +71,7 @@ func SyncBlocksByRange(client *ethclient.Client, _db *gorm.DB, redisClient *redi job := func(wp *workerpool.WorkerPool, j *d.Job) { wp.Submit(func() { - fetchBlockByNumber(j.Client, j.Block, j.DB, j.RedisClient, j.RedisKey, j.Lock, j.Synced) + FetchBlockByNumber(j.Client, j.Block, j.DB, j.Redis, j.Status) }) } @@ -81,9 +79,9 @@ func SyncBlocksByRange(client *ethclient.Client, _db *gorm.DB, redisClient *redi log.Printf("[*] Starting block syncer\n") if fromBlock < toBlock { - Syncer(client, _db, redisClient, redisKey, fromBlock, toBlock, _lock, _synced, job) + Syncer(client, _db, redis, fromBlock, toBlock, status, job) } else { - Syncer(client, _db, redisClient, redisKey, toBlock, fromBlock, _lock, _synced, job) + Syncer(client, _db, redis, toBlock, fromBlock, status, job) } log.Printf("[+] Stopping block syncer\n") @@ -94,12 +92,12 @@ func SyncBlocksByRange(client *ethclient.Client, _db *gorm.DB, redisClient *redi // // And this will itself run as a infinite job, completes one iteration & // takes break for 1 min, then repeats - go SyncMissingBlocksInDB(client, _db, redisClient, redisKey, _lock, _synced) + go SyncMissingBlocksInDB(client, _db, redis, status) } // SyncMissingBlocksInDB - Checks with database for what blocks are present & what are not, fetches missing // blocks & related data iteratively -func SyncMissingBlocksInDB(client *ethclient.Client, _db *gorm.DB, redisClient *redis.Client, redisKey string, _lock *sync.Mutex, _synced *d.SyncState) { +func SyncMissingBlocksInDB(client *ethclient.Client, _db *gorm.DB, redis *data.RedisInfo, status *d.StatusHolder) { // Sleep for 1 minute & then again check whether we need to fetch missing blocks or not sleep := func() { @@ -112,9 +110,7 @@ func SyncMissingBlocksInDB(client *ethclient.Client, _db *gorm.DB, redisClient * currentBlockNumber := db.GetCurrentBlockNumber(_db) // Safely reading shared variable - _lock.Lock() - blockCount := _synced.BlockCountInDB() - _lock.Unlock() + blockCount := status.BlockCountInDB() // If all blocks present in between 0 to latest block in network // `ette` sleeps for 1 minute & again get to work @@ -132,15 +128,15 @@ func SyncMissingBlocksInDB(client *ethclient.Client, _db *gorm.DB, redisClient * // Worker fetches block by number from local storage block := db.GetBlock(j.DB, j.Block) - if block == nil && !checkExistenceOfBlockNumberInRedisQueue(redisClient, redisKey, fmt.Sprintf("%d", j.Block)) { + if block == nil && !checkExistenceOfBlockNumberInRedisQueue(redis, fmt.Sprintf("%d", j.Block)) { // If not found, block fetching cycle is run, for this block - fetchBlockByNumber(j.Client, j.Block, j.DB, j.RedisClient, j.RedisKey, j.Lock, j.Synced) + FetchBlockByNumber(j.Client, j.Block, j.DB, j.Redis, j.Status) } }) } - Syncer(client, _db, redisClient, redisKey, 0, currentBlockNumber, _lock, _synced, job) + Syncer(client, _db, redis, 0, currentBlockNumber, status, job) log.Printf("[+] Stopping missing block finder\n") sleep() diff --git a/app/block/transaction.go b/app/block/transaction.go deleted file mode 100644 index 910fdafd..00000000 --- a/app/block/transaction.go +++ /dev/null @@ -1,125 +0,0 @@ -package block - -import ( - "context" - "log" - "sync" - - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/ethclient" - "github.com/go-redis/redis/v8" - "github.com/gookit/color" - c "github.com/itzmeanjan/ette/app/common" - cfg "github.com/itzmeanjan/ette/app/config" - d "github.com/itzmeanjan/ette/app/data" - "github.com/itzmeanjan/ette/app/db" - "gorm.io/gorm" -) - -// Fetching specific transaction related data & persisting in database -func fetchTransactionByHash(client *ethclient.Client, block *types.Block, tx *types.Transaction, _db *gorm.DB, redisClient *redis.Client, redisKey string, publishable bool, _lock *sync.Mutex, _synced *d.SyncState, returnValChan chan bool) { - receipt, err := client.TransactionReceipt(context.Background(), tx.Hash()) - if err != nil { - log.Print(color.Red.Sprintf("[!] Failed to fetch tx receipt [ block : %d ] : %s", block.NumberU64(), err.Error())) - - // Notifying listener go routine, about status of this executing thread - returnValChan <- false - return - } - - sender, err := client.TransactionSender(context.Background(), tx, block.Hash(), receipt.TransactionIndex) - if err != nil { - log.Print(color.Red.Sprintf("[!] Failed to fetch tx sender [ block : %d ] : %s", block.NumberU64(), err.Error())) - - // Notifying listener go routine, about status of this executing thread - returnValChan <- false - return - } - - status := true - if cfg.Get("EtteMode") == "1" || cfg.Get("EtteMode") == "3" { - - // Only if tx storing goes successful, we'll try to store - // event log, due to the fact events table has foreign key reference - // to tx table - if !db.StoreTransaction(_db, tx, receipt, sender) { - status = false - } else { - status = db.StoreEvents(_db, receipt) - } - - } - - // This is not a case when real time data is received, rather this is probably - // a sync attempt to latest state of blockchain - // - // So, in this case, we don't need to publish any data on pubsub channel - if !publishable { - // Notifying listener go routine, about status of this executing thread - returnValChan <- status - return - } - - if cfg.Get("EtteMode") == "2" || cfg.Get("EtteMode") == "3" { - - var _publishTx *d.Transaction - - if tx.To() == nil { - // This is a contract creation tx - _publishTx = &d.Transaction{ - Hash: tx.Hash().Hex(), - From: sender.Hex(), - Contract: receipt.ContractAddress.Hex(), - Value: tx.Value().String(), - Data: tx.Data(), - Gas: tx.Gas(), - GasPrice: tx.GasPrice().String(), - Cost: tx.Cost().String(), - Nonce: tx.Nonce(), - State: receipt.Status, - BlockHash: receipt.BlockHash.Hex(), - } - } else { - // This is a normal tx, so we keep contract field empty - _publishTx = &d.Transaction{ - Hash: tx.Hash().Hex(), - From: sender.Hex(), - To: tx.To().Hex(), - Value: tx.Value().String(), - Data: tx.Data(), - Gas: tx.Gas(), - GasPrice: tx.GasPrice().String(), - Cost: tx.Cost().String(), - Nonce: tx.Nonce(), - State: receipt.Status, - BlockHash: receipt.BlockHash.Hex(), - } - } - - if err := redisClient.Publish(context.Background(), "transaction", _publishTx).Err(); err != nil { - log.Print(color.Red.Sprintf("[!] Failed to publish transaction from block %d : %s", block.NumberU64(), err.Error())) - } - - // Publishing event/ log entries to redis pub-sub topic, to be captured by subscribers - // and sent to client application, who are interested in this piece of data - // after applying filter - for _, v := range receipt.Logs { - - if err := redisClient.Publish(context.Background(), "event", &d.Event{ - Origin: v.Address.Hex(), - Index: v.Index, - Topics: c.StringifyEventTopics(v.Topics), - Data: v.Data, - TransactionHash: v.TxHash.Hex(), - BlockHash: v.BlockHash.Hex(), - }).Err(); err != nil { - log.Print(color.Red.Sprintf("[!] Failed to publish event from block %d : %s", block.NumberU64(), err.Error())) - } - - } - - } - - // Notifying listener go routine, about status of this executing thread - returnValChan <- status -} diff --git a/app/config/config.go b/app/config/config.go index e3286b73..9c05c2f6 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -1,6 +1,11 @@ package config -import "github.com/spf13/viper" +import ( + "log" + "strconv" + + "github.com/spf13/viper" +) // Read - Reading .env file content, during application start up func Read(file string) error { @@ -13,3 +18,25 @@ func Read(file string) error { func Get(key string) string { return viper.GetString(key) } + +// GetConcurrencyFactor - Reads concurrency factor specified in `.env` file, during deployment +// and returns that number as unsigned integer +func GetConcurrencyFactor() uint64 { + + factor := Get("ConcurrencyFactor") + if factor == "" { + return 1 + } + + parsedFactor, err := strconv.ParseUint(factor, 10, 64) + if err != nil { + log.Printf("[!] Failed to parse concurrency factor : %s\n", err.Error()) + return 1 + } + + if !(parsedFactor > 0) { + return 1 + } + + return parsedFactor +} diff --git a/app/data/data.go b/app/data/data.go index 35b4ad77..b948d1e9 100644 --- a/app/data/data.go +++ b/app/data/data.go @@ -15,6 +15,95 @@ import ( "gorm.io/gorm" ) +// SyncState - Whether `ette` is synced with blockchain or not +type SyncState struct { + Done uint64 + StartedAt time.Time + BlockCountAtStartUp uint64 + NewBlocksInserted uint64 +} + +// BlockCountInDB - Blocks currently present in database +func (s *SyncState) BlockCountInDB() uint64 { + return s.BlockCountAtStartUp + s.NewBlocksInserted +} + +// StatusHolder - Keeps track of progress being made by `ette` over time, +// which is to be delivered when `/v1/synced` is queried +type StatusHolder struct { + State *SyncState + Mutex *sync.RWMutex +} + +// SetStartedAt - Sets started at time +func (s *StatusHolder) SetStartedAt() { + + s.Mutex.Lock() + defer s.Mutex.Unlock() + + s.State.StartedAt = time.Now().UTC() + +} + +// IncrementBlocksInserted - Increments number of blocks inserted into DB +// after `ette` started processing blocks +func (s *StatusHolder) IncrementBlocksInserted() { + + s.Mutex.Lock() + defer s.Mutex.Unlock() + + s.State.NewBlocksInserted++ + +} + +// IncrementBlocksProcessed - Increments number of blocks processed by `ette +// after it started +func (s *StatusHolder) IncrementBlocksProcessed() { + + s.Mutex.Lock() + defer s.Mutex.Unlock() + + s.State.Done++ + +} + +// BlockCountInDB - Safely reads currently present blocks in database +func (s *StatusHolder) BlockCountInDB() uint64 { + + s.Mutex.RLock() + defer s.Mutex.RUnlock() + + return s.State.BlockCountInDB() + +} + +// ElapsedTime - Uptime of `ette` +func (s *StatusHolder) ElapsedTime() time.Duration { + + s.Mutex.RLock() + defer s.Mutex.RUnlock() + + return time.Now().UTC().Sub(s.State.StartedAt) + +} + +// Done - #-of Blocks processed during `ette` uptime i.e. after last time it started +func (s *StatusHolder) Done() uint64 { + + s.Mutex.RLock() + defer s.Mutex.RUnlock() + + return s.State.Done + +} + +// RedisInfo - Holds redis related information in this struct, to be used +// when passing to functions as argument +type RedisInfo struct { + Client *redis.Client // using this object `ette` will talk to Redis + QueueName string // retry queue name, for storing block numbers +} + // ResultStatus - Keeps track of how many operations went successful // and how many of them failed type ResultStatus struct { @@ -33,13 +122,11 @@ func (r ResultStatus) Total() uint64 { // Job - For running a block fetching job, these are all the information which are required type Job struct { - Client *ethclient.Client - DB *gorm.DB - RedisClient *redis.Client - RedisKey string - Block uint64 - Lock *sync.Mutex - Synced *SyncState + Client *ethclient.Client + DB *gorm.DB + Redis *RedisInfo + Block uint64 + Status *StatusHolder } // BlockChainNodeConnection - Holds network connection object for blockchain nodes @@ -51,19 +138,6 @@ type BlockChainNodeConnection struct { Websocket *ethclient.Client } -// SyncState - Whether `ette` is synced with blockchain or not -type SyncState struct { - Done uint64 - StartedAt time.Time - BlockCountAtStartUp uint64 - NewBlocksInserted uint64 -} - -// BlockCountInDB - Blocks currently present in database -func (s *SyncState) BlockCountInDB() uint64 { - return s.BlockCountAtStartUp + s.NewBlocksInserted -} - // Block - Block related info to be delivered to client in this format type Block struct { Hash string `json:"hash" gorm:"column:hash"` diff --git a/app/db/block.go b/app/db/block.go index 94993fd5..fb353df8 100644 --- a/app/db/block.go +++ b/app/db/block.go @@ -1,10 +1,6 @@ package db import ( - "log" - "sync" - - "github.com/ethereum/go-ethereum/core/types" d "github.com/itzmeanjan/ette/app/data" "gorm.io/gorm" ) @@ -12,35 +8,78 @@ import ( // StoreBlock - Persisting block data in database, // if data already not stored // -// Also checks equality with existing data, if mismatch found +// Also checks equality with existing data, if mismatch found, // updated with latest data -func StoreBlock(_db *gorm.DB, _block *types.Block, _lock *sync.Mutex, _synced *d.SyncState) bool { - - persistedBlock := GetBlock(_db, _block.NumberU64()) - if persistedBlock == nil { - - // If we're able to successfully insert block data into table - // it's going to be considered, while calculating total block count in database - status := PutBlock(_db, _block) - if status { - // Trying to safely update inserted block count - // -- Critical section of code - _lock.Lock() - defer _lock.Unlock() - - _synced.NewBlocksInserted++ - // -- ends here +// +// Tries to wrap db modifications inside database transaction to +// guarantee consistency, other read only operations being performed without +// protection of db transaction +// +// 👆 gives us performance improvement, also taste of atomic db operation +// i.e. either whole block data is written or nothing is written +func StoreBlock(dbWOTx *gorm.DB, block *PackedBlock, status *d.StatusHolder) error { + + // -- Starting DB transaction + return dbWOTx.Transaction(func(dbWTx *gorm.DB) error { + + blockInserted := false + + persistedBlock := GetBlock(dbWOTx, block.Block.Number) + if persistedBlock == nil { + + if err := PutBlock(dbWTx, block.Block); err != nil { + return err + } + + blockInserted = true + + } else if !persistedBlock.SimilarTo(block.Block) { + + if err := UpdateBlock(dbWTx, block.Block); err != nil { + return err + } + } - return status + if block.Transactions == nil { - } + // During 👆 flow, if we've really inserted any new block into database + // count will get updated + if blockInserted { + status.IncrementBlocksInserted() + } - if !persistedBlock.SimilarTo(_block) { - return UpdateBlock(_db, _block) - } + return nil + + } + + for _, t := range block.Transactions { + + if err := StoreTransaction(dbWOTx, dbWTx, t.Tx); err != nil { + return err + } + + for _, e := range t.Events { + + if err := StoreEvent(dbWOTx, dbWTx, e); err != nil { + return err + } + + } + + } + + // During 👆 flow, if we've really inserted any new block into database + // count will get updated + if blockInserted { + status.IncrementBlocksInserted() + } + + return nil + + }) + // -- Ending DB transaction - return true } // GetBlock - Fetch block by number, from database @@ -54,52 +93,12 @@ func GetBlock(_db *gorm.DB, number uint64) *Blocks { return &block } -// PutBlock - Persisting fetched block information in database -func PutBlock(_db *gorm.DB, _block *types.Block) bool { - status := true - - if err := _db.Create(&Blocks{ - Hash: _block.Hash().Hex(), - Number: _block.NumberU64(), - Time: _block.Time(), - ParentHash: _block.ParentHash().Hex(), - Difficulty: _block.Difficulty().String(), - GasUsed: _block.GasUsed(), - GasLimit: _block.GasLimit(), - Nonce: _block.Nonce(), - Miner: _block.Coinbase().Hex(), - Size: float64(_block.Size()), - TransactionRootHash: _block.TxHash().Hex(), - ReceiptRootHash: _block.ReceiptHash().Hex(), - }).Error; err != nil { - status = false - log.Printf("[!] Failed to persist block : %d : %s\n", _block.NumberU64(), err.Error()) - } - - return status +// PutBlock - Persisting fetched block +func PutBlock(tx *gorm.DB, block *Blocks) error { + return tx.Create(block).Error } -// UpdateBlock - Updating already existing block entry with newly -// obtained info -func UpdateBlock(_db *gorm.DB, _block *types.Block) bool { - status := true - - if err := _db.Where("number = ?", _block.NumberU64()).Updates(&Blocks{ - Hash: _block.Hash().Hex(), - Time: _block.Time(), - ParentHash: _block.ParentHash().Hex(), - Difficulty: _block.Difficulty().String(), - GasUsed: _block.GasUsed(), - GasLimit: _block.GasLimit(), - Nonce: _block.Nonce(), - Miner: _block.Coinbase().Hex(), - Size: float64(_block.Size()), - TransactionRootHash: _block.TxHash().Hex(), - ReceiptRootHash: _block.ReceiptHash().Hex(), - }).Error; err != nil { - status = false - log.Printf("[!] Failed to update block : %d : %s\n", _block.NumberU64(), err.Error()) - } - - return status +// UpdateBlock - Updating already existing block +func UpdateBlock(tx *gorm.DB, block *Blocks) error { + return tx.Where("number = ?", block.Number).Updates(block).Error } diff --git a/app/db/db.go b/app/db/db.go index 4839c6e8..aa6cd2f9 100644 --- a/app/db/db.go +++ b/app/db/db.go @@ -12,7 +12,13 @@ import ( // Connect - Connecting to postgresql database func Connect() *gorm.DB { - _db, err := gorm.Open(postgres.Open(fmt.Sprintf("postgresql://%s:%s@%s:%s/%s", cfg.Get("DB_USER"), cfg.Get("DB_PASSWORD"), cfg.Get("DB_HOST"), cfg.Get("DB_PORT"), cfg.Get("DB_NAME"))), &gorm.Config{Logger: logger.Default.LogMode(logger.Silent)}) + _db, err := gorm.Open(postgres.Open(fmt.Sprintf("postgresql://%s:%s@%s:%s/%s", + cfg.Get("DB_USER"), cfg.Get("DB_PASSWORD"), cfg.Get("DB_HOST"), + cfg.Get("DB_PORT"), cfg.Get("DB_NAME"))), + &gorm.Config{ + Logger: logger.Default.LogMode(logger.Silent), + SkipDefaultTransaction: true, // all db writing to be wrapped inside transaction manually + }) if err != nil { log.Fatalf("[!] Failed to connect to db : %s\n", err.Error()) } diff --git a/app/db/event.go b/app/db/event.go index a9a12dc1..0fe753c2 100644 --- a/app/db/event.go +++ b/app/db/event.go @@ -1,67 +1,45 @@ package db import ( - "log" + "errors" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - com "github.com/itzmeanjan/ette/app/common" "gorm.io/gorm" ) -// StoreEvents - Putting event data obtained from one tx -// into database, if not existing already +// StoreEvent - Either creating new event entry or updating existing one +// depending upon whether entry exists already or not // -// Otherwise, it'll match with existing entry and decide -// whether updation is required or not -func StoreEvents(_db *gorm.DB, _txReceipt *types.Receipt) bool { - count := 0 - - for _, v := range _txReceipt.Logs { - - // Event which we're trying to persist - // but it may already have been persisted - // - // So, we'll first check its existence in database - // then try to match with this entry - // and if not matching fully, we'll - // simply try to update entry - newEvent := &Events{ - Origin: v.Address.Hex(), - Index: v.Index, - Topics: com.StringifyEventTopics(v.Topics), - Data: v.Data, - TransactionHash: v.TxHash.Hex(), - BlockHash: v.BlockHash.Hex(), - } +// This may be also case when nothing of 👆 is performed, because already +// we've updated data +// +// When reading from database, query not to be wrapped inside any transaction +// but while making any change, it'll be definitely protected using transaction +// to make whole block persistance operation little bit atomic +func StoreEvent(dbWOTx *gorm.DB, dbWTx *gorm.DB, event *Events) error { - persistedEvent := GetEvent(_db, v.Index, v.BlockHash) - if persistedEvent == nil { - if PutEvent(_db, newEvent) { - count++ - } - continue - } + if event == nil { + return errors.New("Empty event received while attempting to persist") + } - if !persistedEvent.SimilarTo(newEvent) { - if UpdateEvent(_db, newEvent) { - count++ - } - continue - } + persistedEvent := GetEvent(dbWOTx, event.Index, event.BlockHash) + if persistedEvent == nil { + return PutEvent(dbWTx, event) + } - count++ + if !persistedEvent.SimilarTo(event) { + return UpdateEvent(dbWTx, event) } - return count == len(_txReceipt.Logs) + return nil + } // GetEvent - Given event index in block & block hash, returns event which is // matching from database -func GetEvent(_db *gorm.DB, index uint, blockHash common.Hash) *Events { +func GetEvent(_db *gorm.DB, index uint, blockHash string) *Events { var event Events - if err := _db.Where("index = ? and blockhash = ?", index, blockHash.Hex()).First(&event).Error; err != nil { + if err := _db.Where("index = ? and blockhash = ?", index, blockHash).First(&event).Error; err != nil { return nil } @@ -69,26 +47,11 @@ func GetEvent(_db *gorm.DB, index uint, blockHash common.Hash) *Events { } // PutEvent - Persists event log into database -func PutEvent(_db *gorm.DB, event *Events) bool { - status := true - - if err := _db.Create(event).Error; err != nil { - status = false - log.Printf("[!] Failed to persist tx log : %s\n", err.Error()) - } - - return status +func PutEvent(tx *gorm.DB, event *Events) error { + return tx.Create(event).Error } -// UpdateEvent - Updating event, already persisted in database, -// with latest info received -func UpdateEvent(_db *gorm.DB, event *Events) bool { - status := true - - if err := _db.Where("index = ? and blockhash = ?", event.Index, event.BlockHash).Updates(event).Error; err != nil { - status = false - log.Printf("[!] Failed to update tx log : %s\n", err.Error()) - } - - return status +// UpdateEvent - Updating already existing event data +func UpdateEvent(tx *gorm.DB, event *Events) error { + return tx.Where("index = ? and blockhash = ?", event.Index, event.BlockHash).Updates(event).Error } diff --git a/app/db/model.go b/app/db/model.go index e74a80cd..401147e2 100644 --- a/app/db/model.go +++ b/app/db/model.go @@ -6,8 +6,6 @@ import ( "log" "time" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" "github.com/lib/pq" ) @@ -40,19 +38,19 @@ func (Blocks) TableName() string { } // SimilarTo - Checking whether two blocks are exactly similar or not -func (b *Blocks) SimilarTo(block *types.Block) bool { - return b.Hash == block.Hash().Hex() && - b.Number == block.NumberU64() && - b.Time == block.Time() && - b.ParentHash == block.ParentHash().Hex() && - b.Difficulty == block.Difficulty().String() && - b.GasUsed == block.GasUsed() && - b.GasLimit == block.GasLimit() && - b.Nonce == block.Nonce() && - b.Miner == block.Coinbase().Hex() && - b.Size == float64(block.Size()) && - b.TransactionRootHash == block.TxHash().Hex() && - b.ReceiptRootHash == block.ReceiptHash().Hex() +func (b *Blocks) SimilarTo(_b *Blocks) bool { + return b.Hash == _b.Hash && + b.Number == _b.Number && + b.Time == _b.Time && + b.ParentHash == _b.ParentHash && + b.Difficulty == _b.Difficulty && + b.GasUsed == _b.GasUsed && + b.GasLimit == _b.GasLimit && + b.Nonce == _b.Nonce && + b.Miner == _b.Miner && + b.Size == _b.Size && + b.TransactionRootHash == _b.TransactionRootHash && + b.ReceiptRootHash == _b.ReceiptRootHash } // Transactions - Blockchain transaction holder table model @@ -78,32 +76,32 @@ func (Transactions) TableName() string { } // SimilarTo - Checking equality of two transactions -func (t *Transactions) SimilarTo(tx *types.Transaction, txReceipt *types.Receipt, sender common.Address) bool { - if tx.To() == nil { - return t.Hash == tx.Hash().Hex() && - t.From == sender.Hex() && - t.Contract == txReceipt.ContractAddress.Hex() && - t.Value == tx.Value().String() && - bytes.Compare(t.Data, tx.Data()) == 0 && - t.Gas == tx.Gas() && - t.GasPrice == tx.GasPrice().String() && - t.Cost == tx.Cost().String() && - t.Nonce == tx.Nonce() && - t.State == txReceipt.Status && - t.BlockHash == txReceipt.BlockHash.Hex() +func (t *Transactions) SimilarTo(_t *Transactions) bool { + if _t.To == "" { + return t.Hash == _t.Hash && + t.From == _t.From && + t.Contract == _t.Contract && + t.Value == _t.Value && + bytes.Compare(t.Data, _t.Data) == 0 && + t.Gas == _t.Gas && + t.GasPrice == _t.GasPrice && + t.Cost == _t.Cost && + t.Nonce == _t.Nonce && + t.State == _t.State && + t.BlockHash == _t.BlockHash } - return t.Hash == tx.Hash().Hex() && - t.From == sender.Hex() && - t.To == tx.To().Hex() && - t.Value == tx.Value().String() && - bytes.Compare(t.Data, tx.Data()) == 0 && - t.Gas == tx.Gas() && - t.GasPrice == tx.GasPrice().String() && - t.Cost == tx.Cost().String() && - t.Nonce == tx.Nonce() && - t.State == txReceipt.Status && - t.BlockHash == txReceipt.BlockHash.Hex() + return t.Hash == _t.Hash && + t.From == _t.From && + t.To == _t.To && + t.Value == _t.Value && + bytes.Compare(t.Data, _t.Data) == 0 && + t.Gas == _t.Gas && + t.GasPrice == _t.GasPrice && + t.Cost == _t.Cost && + t.Nonce == _t.Nonce && + t.State == _t.State && + t.BlockHash == _t.BlockHash } // Events - Events emitted from smart contracts to be held in this table @@ -122,7 +120,7 @@ func (Events) TableName() string { } // SimilarTo - Checking equality of two events -func (e *Events) SimilarTo(event *Events) bool { +func (e *Events) SimilarTo(_e *Events) bool { // Given two string arrays, it'll match it's elements by index & if all of them are same // returns boolean result @@ -141,17 +139,27 @@ func (e *Events) SimilarTo(event *Events) bool { return matched } - // Given two byte slices, checks their equality - compareByteSlices := func(sliceOne []byte, sliceTwo []byte) bool { - return bytes.Compare(e.Data, event.Data) == 0 - } + return e.Origin == _e.Origin && + e.Index == _e.Index && + compareStringArrays(e.Topics, _e.Topics) && + bytes.Compare(e.Data, _e.Data) == 0 && + e.TransactionHash == _e.TransactionHash && + e.BlockHash == _e.BlockHash +} + +// PackedTransaction - All data that is stored in a tx, to be passed from +// tx data fetcher to whole block data persist handler function +type PackedTransaction struct { + Tx *Transactions + Events []*Events +} - return e.Origin == event.Origin && - e.Index == event.Index && - compareStringArrays(e.Topics, event.Topics) && - compareByteSlices(e.Data, event.Data) && - e.TransactionHash == event.TransactionHash && - e.BlockHash == event.BlockHash +// PackedBlock - Whole block data to be persisted in a single +// database transaction to ensure data consistency, if something +// goes wrong in mid, whole persisting operation will get reverted +type PackedBlock struct { + Block *Blocks + Transactions []*PackedTransaction } // Users - User address & created api key related info, holder table diff --git a/app/db/transaction.go b/app/db/transaction.go index ae5cdf7d..08cecc94 100644 --- a/app/db/transaction.go +++ b/app/db/transaction.go @@ -1,125 +1,53 @@ package db import ( - "log" + "errors" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" "gorm.io/gorm" ) -// StoreTransaction - Persisting transaction data into database -// if not present already -// -// But if present, first checks equality & then updates if required -func StoreTransaction(_db *gorm.DB, _tx *types.Transaction, _txReceipt *types.Receipt, _sender common.Address) bool { +// StoreTransaction - Stores specific tx into database, where presence of tx +// in database being performed using db handle ( i.e. `dbWOTx` ) which is not +// wrapped inside database transaction, but whenever performing any writing to database +// uses db handle which is always passed to db engine, wrapped inside a db transaction +// i.e. `dbWTx`, which protects helps us in rolling back to previous state, in case of some failure +// faced during this persistance stage +func StoreTransaction(dbWOTx *gorm.DB, dbWTx *gorm.DB, tx *Transactions) error { - persistedTx := GetTransaction(_db, _txReceipt.BlockHash, _tx.Hash()) + if tx == nil { + return errors.New("Empty transaction received while attempting to persist") + } + + persistedTx := GetTransaction(dbWOTx, tx.BlockHash, tx.BlockHash) if persistedTx == nil { - return PutTransaction(_db, _tx, _txReceipt, _sender) + return PutTransaction(dbWTx, tx) } - if !persistedTx.SimilarTo(_tx, _txReceipt, _sender) { - return UpdateTransaction(_db, _tx, _txReceipt, _sender) + if !persistedTx.SimilarTo(tx) { + return UpdateTransaction(dbWTx, tx) } - return true + return nil + } // GetTransaction - Fetches tx entry from database, given txhash & containing block hash -func GetTransaction(_db *gorm.DB, blkHash common.Hash, txHash common.Hash) *Transactions { +func GetTransaction(_db *gorm.DB, blkHash string, txHash string) *Transactions { var tx Transactions - if err := _db.Where("hash = ? and blockhash = ?", txHash.Hex(), blkHash.Hex()).First(&tx).Error; err != nil { + if err := _db.Where("hash = ? and blockhash = ?", txHash, blkHash).First(&tx).Error; err != nil { return nil } return &tx } -// PutTransaction - Persisting transactions present in a block in database -func PutTransaction(_db *gorm.DB, _tx *types.Transaction, _txReceipt *types.Receipt, _sender common.Address) bool { - var _pTx *Transactions - status := true - - // If tx creates contract, then we hold created contract address - if _tx.To() == nil { - _pTx = &Transactions{ - Hash: _tx.Hash().Hex(), - From: _sender.Hex(), - Contract: _txReceipt.ContractAddress.Hex(), - Value: _tx.Value().String(), - Data: _tx.Data(), - Gas: _tx.Gas(), - GasPrice: _tx.GasPrice().String(), - Cost: _tx.Cost().String(), - Nonce: _tx.Nonce(), - State: _txReceipt.Status, - BlockHash: _txReceipt.BlockHash.Hex(), - } - } else { - // This is a normal tx, so we keep contract field empty - _pTx = &Transactions{ - Hash: _tx.Hash().Hex(), - From: _sender.Hex(), - To: _tx.To().Hex(), - Value: _tx.Value().String(), - Data: _tx.Data(), - Gas: _tx.Gas(), - GasPrice: _tx.GasPrice().String(), - Cost: _tx.Cost().String(), - Nonce: _tx.Nonce(), - State: _txReceipt.Status, - BlockHash: _txReceipt.BlockHash.Hex(), - } - } - - if err := _db.Create(_pTx).Error; err != nil { - status = false - log.Printf("[!] Failed to persist tx [ block : %s ] : %s\n", _txReceipt.BlockNumber.String(), err.Error()) - } - - return status +// PutTransaction - Persisting transaction +func PutTransaction(tx *gorm.DB, txn *Transactions) error { + return tx.Create(txn).Error } -// UpdateTransaction - Updating already persisted transaction in database with -// new data -func UpdateTransaction(_db *gorm.DB, _tx *types.Transaction, _txReceipt *types.Receipt, _sender common.Address) bool { - var _pTx *Transactions - status := true - - // If tx creates contract, then we hold created contract address - if _tx.To() == nil { - _pTx = &Transactions{ - From: _sender.Hex(), - Contract: _txReceipt.ContractAddress.Hex(), - Value: _tx.Value().String(), - Data: _tx.Data(), - Gas: _tx.Gas(), - GasPrice: _tx.GasPrice().String(), - Cost: _tx.Cost().String(), - Nonce: _tx.Nonce(), - State: _txReceipt.Status, - } - } else { - // This is a normal tx, so we keep contract field empty - _pTx = &Transactions{ - From: _sender.Hex(), - To: _tx.To().Hex(), - Value: _tx.Value().String(), - Data: _tx.Data(), - Gas: _tx.Gas(), - GasPrice: _tx.GasPrice().String(), - Cost: _tx.Cost().String(), - Nonce: _tx.Nonce(), - State: _txReceipt.Status, - } - } - - if err := _db.Where("hash = ? and blockhash = ?", _tx.Hash().Hex(), _txReceipt.BlockHash.Hex()).Updates(_pTx).Error; err != nil { - status = false - log.Printf("[!] Failed to update tx [ block : %s ] : %s\n", _txReceipt.BlockNumber.String(), err.Error()) - } - - return status +// UpdateTransaction - Updating already persisted transaction +func UpdateTransaction(tx *gorm.DB, txn *Transactions) error { + return tx.Where("hash = ? and blockhash = ?", txn.Hash, txn.BlockHash).Updates(txn).Error } diff --git a/app/pubsub/block.go b/app/pubsub/block.go index 8afbaada..f2d9be75 100644 --- a/app/pubsub/block.go +++ b/app/pubsub/block.go @@ -178,6 +178,6 @@ func (b *BlockConsumer) SendData(data interface{}) bool { return false } - log.Printf("[!] Delivered `block` data to client\n") + log.Printf("[+] Delivered `block` data to client\n") return true } diff --git a/app/pubsub/event.go b/app/pubsub/event.go index a76e2271..794a0286 100644 --- a/app/pubsub/event.go +++ b/app/pubsub/event.go @@ -213,6 +213,6 @@ func (e *EventConsumer) SendData(data interface{}) bool { return false } - log.Printf("[!] Delivered `event` data to client\n") + log.Printf("[+] Delivered `event` data to client\n") return true } diff --git a/app/pubsub/transaction.go b/app/pubsub/transaction.go index b5f9c4c6..907e1320 100644 --- a/app/pubsub/transaction.go +++ b/app/pubsub/transaction.go @@ -231,6 +231,6 @@ func (t *TransactionConsumer) SendData(data interface{}) bool { return false } - log.Printf("[!] Delivered `transaction` data to client\n") + log.Printf("[+] Delivered `transaction` data to client\n") return true } diff --git a/app/rest/rest.go b/app/rest/rest.go index aa892d08..8bb123d1 100644 --- a/app/rest/rest.go +++ b/app/rest/rest.go @@ -9,7 +9,6 @@ import ( "net/http" "strconv" "strings" - "sync" "time" "github.com/foolin/goview" @@ -33,7 +32,7 @@ import ( ) // RunHTTPServer - Holds definition for all REST API(s) to be exposed -func RunHTTPServer(_db *gorm.DB, _lock *sync.Mutex, _synced *d.SyncState, _redisClient *redis.Client) { +func RunHTTPServer(_db *gorm.DB, _status *d.StatusHolder, _redisClient *redis.Client) { // Extracted from, to field of range based block query ( using block numbers/ time stamps ) // gets parsed into unsigned integers @@ -473,23 +472,27 @@ func RunHTTPServer(_db *gorm.DB, _lock *sync.Mutex, _synced *d.SyncState, _redis grp.GET("/synced", func(c *gin.Context) { currentBlockNumber := db.GetCurrentBlockNumber(_db) - - _lock.Lock() - defer _lock.Unlock() - - blockCountInDB := _synced.BlockCountInDB() + blockCountInDB := _status.BlockCountInDB() remaining := (currentBlockNumber + 1) - blockCountInDB - elapsed := time.Now().UTC().Sub(_synced.StartedAt) + elapsed := _status.ElapsedTime() + + if cfg.Get("EtteMode") == "2" { + c.JSON(http.StatusOK, gin.H{ + "processed": _status.Done(), + "elapsed": elapsed.String(), + }) + return + } status := fmt.Sprintf("%.2f %%", (float64(blockCountInDB)/float64(currentBlockNumber+1))*100) eta := "0s" if remaining > 0 { - eta = (time.Duration((elapsed.Seconds()/float64(_synced.Done))*float64(remaining)) * time.Second).String() + eta = (time.Duration((elapsed.Seconds()/float64(_status.Done()))*float64(remaining)) * time.Second).String() } c.JSON(http.StatusOK, gin.H{ "synced": status, - "processed": _synced.Done, + "processed": _status.Done(), "elapsed": elapsed.String(), "eta": eta, }) diff --git a/example/package-lock.json b/example/package-lock.json index 53950c2b..618ed211 100644 --- a/example/package-lock.json +++ b/example/package-lock.json @@ -1,162 +1,13 @@ { "name": "example-ette", "version": "1.0.0", - "lockfileVersion": 2, + "lockfileVersion": 1, "requires": true, - "packages": { - "": { - "name": "example-ette", - "version": "1.0.0", - "license": "CC0-1.0", - "dependencies": { - "websocket": "^1.0.32" - } - }, - "node_modules/bufferutil": { - "version": "4.0.2", - "resolved": "https://registry.npmjs.org/bufferutil/-/bufferutil-4.0.2.tgz", - "integrity": "sha512-AtnG3W6M8B2n4xDQ5R+70EXvOpnXsFYg/AK2yTZd+HQ/oxAdz+GI+DvjmhBw3L0ole+LJ0ngqY4JMbDzkfNzhA==", - "hasInstallScript": true, - "dependencies": { - "node-gyp-build": "^4.2.0" - } - }, - "node_modules/d": { - "version": "1.0.1", - "resolved": "https://registry.npmjs.org/d/-/d-1.0.1.tgz", - "integrity": "sha512-m62ShEObQ39CfralilEQRjH6oAMtNCV1xJyEx5LpRYUVN+EviphDgUc/F3hnYbADmkiNs67Y+3ylmlG7Lnu+FA==", - "dependencies": { - "es5-ext": "^0.10.50", - "type": "^1.0.1" - } - }, - "node_modules/debug": { - "version": "2.6.9", - "resolved": "https://registry.npmjs.org/debug/-/debug-2.6.9.tgz", - "integrity": "sha512-bC7ElrdJaJnPbAP+1EotYvqZsb3ecl5wi6Bfi6BJTUcNowp6cvspg0jXznRTKDjm/E7AdgFBVeAPVMNcKGsHMA==", - "dependencies": { - "ms": "2.0.0" - } - }, - "node_modules/es5-ext": { - "version": "0.10.53", - "resolved": "https://registry.npmjs.org/es5-ext/-/es5-ext-0.10.53.tgz", - "integrity": "sha512-Xs2Stw6NiNHWypzRTY1MtaG/uJlwCk8kH81920ma8mvN8Xq1gsfhZvpkImLQArw8AHnv8MT2I45J3c0R8slE+Q==", - "dependencies": { - "es6-iterator": "~2.0.3", - "es6-symbol": "~3.1.3", - "next-tick": "~1.0.0" - } - }, - "node_modules/es6-iterator": { - "version": "2.0.3", - "resolved": "https://registry.npmjs.org/es6-iterator/-/es6-iterator-2.0.3.tgz", - "integrity": "sha1-p96IkUGgWpSwhUQDstCg+/qY87c=", - "dependencies": { - "d": "1", - "es5-ext": "^0.10.35", - "es6-symbol": "^3.1.1" - } - }, - "node_modules/es6-symbol": { - "version": "3.1.3", - "resolved": "https://registry.npmjs.org/es6-symbol/-/es6-symbol-3.1.3.tgz", - "integrity": "sha512-NJ6Yn3FuDinBaBRWl/q5X/s4koRHBrgKAu+yGI6JCBeiu3qrcbJhwT2GeR/EXVfylRk8dpQVJoLEFhK+Mu31NA==", - "dependencies": { - "d": "^1.0.1", - "ext": "^1.1.2" - } - }, - "node_modules/ext": { - "version": "1.4.0", - "resolved": "https://registry.npmjs.org/ext/-/ext-1.4.0.tgz", - "integrity": "sha512-Key5NIsUxdqKg3vIsdw9dSuXpPCQ297y6wBjL30edxwPgt2E44WcWBZey/ZvUc6sERLTxKdyCu4gZFmUbk1Q7A==", - "dependencies": { - "type": "^2.0.0" - } - }, - "node_modules/ext/node_modules/type": { - "version": "2.1.0", - "resolved": "https://registry.npmjs.org/type/-/type-2.1.0.tgz", - "integrity": "sha512-G9absDWvhAWCV2gmF1zKud3OyC61nZDwWvBL2DApaVFogI07CprggiQAOOjvp2NRjYWFzPyu7vwtDrQFq8jeSA==" - }, - "node_modules/is-typedarray": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/is-typedarray/-/is-typedarray-1.0.0.tgz", - "integrity": "sha1-5HnICFjfDBsR3dppQPlgEfzaSpo=" - }, - "node_modules/ms": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz", - "integrity": "sha1-VgiurfwAvmwpAd9fmGF4jeDVl8g=" - }, - "node_modules/next-tick": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/next-tick/-/next-tick-1.0.0.tgz", - "integrity": "sha1-yobR/ogoFpsBICCOPchCS524NCw=" - }, - "node_modules/node-gyp-build": { - "version": "4.2.3", - "resolved": "https://registry.npmjs.org/node-gyp-build/-/node-gyp-build-4.2.3.tgz", - "integrity": "sha512-MN6ZpzmfNCRM+3t57PTJHgHyw/h4OWnZ6mR8P5j/uZtqQr46RRuDE/P+g3n0YR/AiYXeWixZZzaip77gdICfRg==", - "bin": { - "node-gyp-build": "bin.js", - "node-gyp-build-optional": "optional.js", - "node-gyp-build-test": "build-test.js" - } - }, - "node_modules/type": { - "version": "1.2.0", - "resolved": "https://registry.npmjs.org/type/-/type-1.2.0.tgz", - "integrity": "sha512-+5nt5AAniqsCnu2cEQQdpzCAh33kVx8n0VoFidKpB1dVVLAN/F+bgVOqOJqOnEnrhp222clB5p3vUlD+1QAnfg==" - }, - "node_modules/typedarray-to-buffer": { - "version": "3.1.5", - "resolved": "https://registry.npmjs.org/typedarray-to-buffer/-/typedarray-to-buffer-3.1.5.tgz", - "integrity": "sha512-zdu8XMNEDepKKR+XYOXAVPtWui0ly0NtohUscw+UmaHiAWT8hrV1rr//H6V+0DvJ3OQ19S979M0laLfX8rm82Q==", - "dependencies": { - "is-typedarray": "^1.0.0" - } - }, - "node_modules/utf-8-validate": { - "version": "5.0.3", - "resolved": "https://registry.npmjs.org/utf-8-validate/-/utf-8-validate-5.0.3.tgz", - "integrity": "sha512-jtJM6fpGv8C1SoH4PtG22pGto6x+Y8uPprW0tw3//gGFhDDTiuksgradgFN6yRayDP4SyZZa6ZMGHLIa17+M8A==", - "hasInstallScript": true, - "dependencies": { - "node-gyp-build": "^4.2.0" - } - }, - "node_modules/websocket": { - "version": "1.0.32", - "resolved": "https://registry.npmjs.org/websocket/-/websocket-1.0.32.tgz", - "integrity": "sha512-i4yhcllSP4wrpoPMU2N0TQ/q0O94LRG/eUQjEAamRltjQ1oT1PFFKOG4i877OlJgCG8rw6LrrowJp+TYCEWF7Q==", - "dependencies": { - "bufferutil": "^4.0.1", - "debug": "^2.2.0", - "es5-ext": "^0.10.50", - "typedarray-to-buffer": "^3.1.5", - "utf-8-validate": "^5.0.2", - "yaeti": "^0.0.6" - }, - "engines": { - "node": ">=4.0.0" - } - }, - "node_modules/yaeti": { - "version": "0.0.6", - "resolved": "https://registry.npmjs.org/yaeti/-/yaeti-0.0.6.tgz", - "integrity": "sha1-8m9ITXJoTPQr7ft2lwqhYI+/lXc=", - "engines": { - "node": ">=0.10.32" - } - } - }, "dependencies": { "bufferutil": { - "version": "4.0.2", - "resolved": "https://registry.npmjs.org/bufferutil/-/bufferutil-4.0.2.tgz", - "integrity": "sha512-AtnG3W6M8B2n4xDQ5R+70EXvOpnXsFYg/AK2yTZd+HQ/oxAdz+GI+DvjmhBw3L0ole+LJ0ngqY4JMbDzkfNzhA==", + "version": "4.0.3", + "resolved": "https://registry.npmjs.org/bufferutil/-/bufferutil-4.0.3.tgz", + "integrity": "sha512-yEYTwGndELGvfXsImMBLop58eaGW+YdONi1fNjTINSY98tmMmFijBG6WXgdkfuLNt4imzQNtIE+eBp1PVpMCSw==", "requires": { "node-gyp-build": "^4.2.0" } @@ -256,17 +107,17 @@ } }, "utf-8-validate": { - "version": "5.0.3", - "resolved": "https://registry.npmjs.org/utf-8-validate/-/utf-8-validate-5.0.3.tgz", - "integrity": "sha512-jtJM6fpGv8C1SoH4PtG22pGto6x+Y8uPprW0tw3//gGFhDDTiuksgradgFN6yRayDP4SyZZa6ZMGHLIa17+M8A==", + "version": "5.0.4", + "resolved": "https://registry.npmjs.org/utf-8-validate/-/utf-8-validate-5.0.4.tgz", + "integrity": "sha512-MEF05cPSq3AwJ2C7B7sHAA6i53vONoZbMGX8My5auEVm6W+dJ2Jd/TZPyGJ5CH42V2XtbI5FD28HeHeqlPzZ3Q==", "requires": { "node-gyp-build": "^4.2.0" } }, "websocket": { - "version": "1.0.32", - "resolved": "https://registry.npmjs.org/websocket/-/websocket-1.0.32.tgz", - "integrity": "sha512-i4yhcllSP4wrpoPMU2N0TQ/q0O94LRG/eUQjEAamRltjQ1oT1PFFKOG4i877OlJgCG8rw6LrrowJp+TYCEWF7Q==", + "version": "1.0.33", + "resolved": "https://registry.npmjs.org/websocket/-/websocket-1.0.33.tgz", + "integrity": "sha512-XwNqM2rN5eh3G2CUQE3OHZj+0xfdH42+OFK6LdC2yqiC0YU8e5UK0nYre220T0IyyN031V/XOvtHvXozvJYFWA==", "requires": { "bufferutil": "^4.0.1", "debug": "^2.2.0", diff --git a/example/package.json b/example/package.json index 2f9f60dd..139dfdc8 100644 --- a/example/package.json +++ b/example/package.json @@ -9,6 +9,6 @@ "author": "Anjan Roy ", "license": "CC0-1.0", "dependencies": { - "websocket": "^1.0.32" + "websocket": "^1.0.33" } } diff --git a/sc/banner.gif b/sc/banner.gif index 9e84995a..1d785cf1 100644 Binary files a/sc/banner.gif and b/sc/banner.gif differ diff --git a/views/dashboard.html b/views/dashboard.html index 8fb7cab1..a03d05c2 100644 --- a/views/dashboard.html +++ b/views/dashboard.html @@ -117,6 +117,15 @@ }">Create new app {{end}}