diff --git a/blockchain-configs/afan-shard/node_params.json b/blockchain-configs/afan-shard/node_params.json index 7c34fee9a..68df39e95 100644 --- a/blockchain-configs/afan-shard/node_params.json +++ b/blockchain-configs/afan-shard/node_params.json @@ -28,8 +28,9 @@ "ENABLE_REST_FUNCTION_CALL": false, "ENABLE_STATUS_REPORT_TO_TRACKER": true, "ENABLE_TX_SIG_VERIF_WORKAROUND": false, - "EVENT_HANDLER_PORT": 6000, + "EVENT_HANDLER_FILTER_DELETION_TIMEOUT_MS": 15000, "EVENT_HANDLER_HEARTBEAT_INTERVAL_MS": 15000, + "EVENT_HANDLER_PORT": 6000, "EXPRESS_RATE_LIMIT_WINDOW_SECS": 60, "FREE_TX_POOL_SIZE_LIMIT_RATIO": 0.1, "FREE_TX_POOL_SIZE_LIMIT_RATIO_PER_ACCOUNT": 0.1, diff --git a/blockchain-configs/base/node_params.json b/blockchain-configs/base/node_params.json index f39b47d42..5520454eb 100644 --- a/blockchain-configs/base/node_params.json +++ b/blockchain-configs/base/node_params.json @@ -28,8 +28,9 @@ "ENABLE_REST_FUNCTION_CALL": false, "ENABLE_STATUS_REPORT_TO_TRACKER": true, "ENABLE_TX_SIG_VERIF_WORKAROUND": false, - "EVENT_HANDLER_PORT": 6000, + "EVENT_HANDLER_FILTER_DELETION_TIMEOUT_MS": 100000, "EVENT_HANDLER_HEARTBEAT_INTERVAL_MS": 15000, + "EVENT_HANDLER_PORT": 6000, "EXPRESS_RATE_LIMIT_WINDOW_SECS": 60, "FREE_TX_POOL_SIZE_LIMIT_RATIO": 0.1, "FREE_TX_POOL_SIZE_LIMIT_RATIO_PER_ACCOUNT": 0.1, diff --git a/blockchain-configs/he-shard/node_params.json b/blockchain-configs/he-shard/node_params.json index 0916334fb..6fff7b426 100644 --- a/blockchain-configs/he-shard/node_params.json +++ b/blockchain-configs/he-shard/node_params.json @@ -28,8 +28,9 @@ "ENABLE_REST_FUNCTION_CALL": false, "ENABLE_STATUS_REPORT_TO_TRACKER": true, "ENABLE_TX_SIG_VERIF_WORKAROUND": false, - "EVENT_HANDLER_PORT": 6000, + "EVENT_HANDLER_FILTER_DELETION_TIMEOUT_MS": 15000, "EVENT_HANDLER_HEARTBEAT_INTERVAL_MS": 15000, + "EVENT_HANDLER_PORT": 6000, "EXPRESS_RATE_LIMIT_WINDOW_SECS": 60, "FREE_TX_POOL_SIZE_LIMIT_RATIO": 0.1, "FREE_TX_POOL_SIZE_LIMIT_RATIO_PER_ACCOUNT": 0.1, diff --git a/blockchain-configs/mainnet-prod/node_params.json b/blockchain-configs/mainnet-prod/node_params.json index d5d40942d..9fc358cf1 100644 --- a/blockchain-configs/mainnet-prod/node_params.json +++ b/blockchain-configs/mainnet-prod/node_params.json @@ -27,8 +27,9 @@ "ENABLE_REST_FUNCTION_CALL": false, "ENABLE_STATUS_REPORT_TO_TRACKER": true, "ENABLE_TX_SIG_VERIF_WORKAROUND": false, - "EVENT_HANDLER_PORT": 6000, + "EVENT_HANDLER_FILTER_DELETION_TIMEOUT_MS": 100000, "EVENT_HANDLER_HEARTBEAT_INTERVAL_MS": 15000, + "EVENT_HANDLER_PORT": 6000, "EXPRESS_RATE_LIMIT_WINDOW_SECS": 60, "FREE_TX_POOL_SIZE_LIMIT_RATIO": 0.1, "FREE_TX_POOL_SIZE_LIMIT_RATIO_PER_ACCOUNT": 0.1, diff --git a/blockchain-configs/sim-shard/node_params.json b/blockchain-configs/sim-shard/node_params.json index 6b5130014..813346a50 100644 --- a/blockchain-configs/sim-shard/node_params.json +++ b/blockchain-configs/sim-shard/node_params.json @@ -28,8 +28,9 @@ "ENABLE_REST_FUNCTION_CALL": false, "ENABLE_STATUS_REPORT_TO_TRACKER": true, "ENABLE_TX_SIG_VERIF_WORKAROUND": false, - "EVENT_HANDLER_PORT": 6000, + "EVENT_HANDLER_FILTER_DELETION_TIMEOUT_MS": 100000, "EVENT_HANDLER_HEARTBEAT_INTERVAL_MS": 15000, + "EVENT_HANDLER_PORT": 6000, "EXPRESS_RATE_LIMIT_WINDOW_SECS": 60, "FREE_TX_POOL_SIZE_LIMIT_RATIO": 0.1, "FREE_TX_POOL_SIZE_LIMIT_RATIO_PER_ACCOUNT": 0.1, diff --git a/blockchain-configs/testnet-dev/node_params.json b/blockchain-configs/testnet-dev/node_params.json index d2490a369..a7d52b65c 100644 --- a/blockchain-configs/testnet-dev/node_params.json +++ b/blockchain-configs/testnet-dev/node_params.json @@ -28,8 +28,9 @@ "ENABLE_REST_FUNCTION_CALL": false, "ENABLE_STATUS_REPORT_TO_TRACKER": true, "ENABLE_TX_SIG_VERIF_WORKAROUND": false, - "EVENT_HANDLER_PORT": 6000, + "EVENT_HANDLER_FILTER_DELETION_TIMEOUT_MS": 100000, "EVENT_HANDLER_HEARTBEAT_INTERVAL_MS": 15000, + "EVENT_HANDLER_PORT": 6000, "EXPRESS_RATE_LIMIT_WINDOW_SECS": 60, "FREE_TX_POOL_SIZE_LIMIT_RATIO": 0.1, "FREE_TX_POOL_SIZE_LIMIT_RATIO_PER_ACCOUNT": 0.1, diff --git a/blockchain-configs/testnet-exp/node_params.json b/blockchain-configs/testnet-exp/node_params.json index 1c3e53dea..043347288 100644 --- a/blockchain-configs/testnet-exp/node_params.json +++ b/blockchain-configs/testnet-exp/node_params.json @@ -28,8 +28,9 @@ "ENABLE_REST_FUNCTION_CALL": false, "ENABLE_STATUS_REPORT_TO_TRACKER": true, "ENABLE_TX_SIG_VERIF_WORKAROUND": false, - "EVENT_HANDLER_PORT": 6000, + "EVENT_HANDLER_FILTER_DELETION_TIMEOUT_MS": 100000, "EVENT_HANDLER_HEARTBEAT_INTERVAL_MS": 15000, + "EVENT_HANDLER_PORT": 6000, "EXPRESS_RATE_LIMIT_WINDOW_SECS": 60, "FREE_TX_POOL_SIZE_LIMIT_RATIO": 0.1, "FREE_TX_POOL_SIZE_LIMIT_RATIO_PER_ACCOUNT": 0.1, diff --git a/blockchain-configs/testnet-prod/node_params.json b/blockchain-configs/testnet-prod/node_params.json index d187863ad..010a369c9 100644 --- a/blockchain-configs/testnet-prod/node_params.json +++ b/blockchain-configs/testnet-prod/node_params.json @@ -28,8 +28,9 @@ "ENABLE_REST_FUNCTION_CALL": false, "ENABLE_STATUS_REPORT_TO_TRACKER": true, "ENABLE_TX_SIG_VERIF_WORKAROUND": false, - "EVENT_HANDLER_PORT": 6000, + "EVENT_HANDLER_FILTER_DELETION_TIMEOUT_MS": 100000, "EVENT_HANDLER_HEARTBEAT_INTERVAL_MS": 15000, + "EVENT_HANDLER_PORT": 6000, "EXPRESS_RATE_LIMIT_WINDOW_SECS": 60, "FREE_TX_POOL_SIZE_LIMIT_RATIO": 0.1, "FREE_TX_POOL_SIZE_LIMIT_RATIO_PER_ACCOUNT": 0.1, diff --git a/blockchain-configs/testnet-sandbox/node_params.json b/blockchain-configs/testnet-sandbox/node_params.json index b6594a885..f6598400f 100644 --- a/blockchain-configs/testnet-sandbox/node_params.json +++ b/blockchain-configs/testnet-sandbox/node_params.json @@ -28,8 +28,9 @@ "ENABLE_REST_FUNCTION_CALL": false, "ENABLE_STATUS_REPORT_TO_TRACKER": true, "ENABLE_TX_SIG_VERIF_WORKAROUND": false, - "EVENT_HANDLER_PORT": 6000, + "EVENT_HANDLER_FILTER_DELETION_TIMEOUT_MS": 100000, "EVENT_HANDLER_HEARTBEAT_INTERVAL_MS": 15000, + "EVENT_HANDLER_PORT": 6000, "EXPRESS_RATE_LIMIT_WINDOW_SECS": 60, "FREE_TX_POOL_SIZE_LIMIT_RATIO": 0.1, "FREE_TX_POOL_SIZE_LIMIT_RATIO_PER_ACCOUNT": 0.1, diff --git a/blockchain-configs/testnet-staging/node_params.json b/blockchain-configs/testnet-staging/node_params.json index 8cac326a2..da3ec77d2 100644 --- a/blockchain-configs/testnet-staging/node_params.json +++ b/blockchain-configs/testnet-staging/node_params.json @@ -28,8 +28,9 @@ "ENABLE_REST_FUNCTION_CALL": false, "ENABLE_STATUS_REPORT_TO_TRACKER": true, "ENABLE_TX_SIG_VERIF_WORKAROUND": false, - "EVENT_HANDLER_PORT": 6000, + "EVENT_HANDLER_FILTER_DELETION_TIMEOUT_MS": 100000, "EVENT_HANDLER_HEARTBEAT_INTERVAL_MS": 15000, + "EVENT_HANDLER_PORT": 6000, "EXPRESS_RATE_LIMIT_WINDOW_SECS": 60, "FREE_TX_POOL_SIZE_LIMIT_RATIO": 0.1, "FREE_TX_POOL_SIZE_LIMIT_RATIO_PER_ACCOUNT": 0.1, diff --git a/blockchain/index.js b/blockchain/index.js index cb2a39596..210a9af55 100644 --- a/blockchain/index.js +++ b/blockchain/index.js @@ -46,7 +46,7 @@ class Blockchain { // NOTE(platfowner): This write is not awaited. FileUtil.writeSnapshotFile(snapshotDir, blockNumber, snapshot, snapshotChunkSize); // Write the block from the snapshot to the blockchain dir. - this.writeBlock(snapshot[BlockchainSnapshotProperties.BLOCK]); + this.writeBlock(snapshot[BlockchainSnapshotProperties.BLOCK], 'initBlockchain:snapshot'); } } const wasBlockDirEmpty = FileUtil.createBlockchainDir(this.blockchainPath); @@ -58,7 +58,7 @@ class Blockchain { logger.info('############################################################'); logger.info('\n'); // Copy the genesis block from the genesis configs dir to the blockchain dir. - this.writeBlock(this.genesisBlock); + this.writeBlock(this.genesisBlock, 'initBlockchain:genesis'); } else { logger.info('\n'); logger.info('#############################################################'); @@ -212,7 +212,7 @@ class Blockchain { this.addBlockToChain(block); this.updateNumberToBlockInfo(block); if (writeToDisk) { - this.writeBlock(block); + this.writeBlock(block, 'addBlockToChainAndWriteToDisk'); } return true; } @@ -243,8 +243,11 @@ class Blockchain { return true; } - writeBlock(block) { + writeBlock(block, from = '') { const LOG_HEADER = 'writeBlock'; + if (!block) { + logger.error(`[${LOG_HEADER}] Writing an empty block ${block} from ${from}`); + } if (FileUtil.hasBlockFile(this.blockchainPath, block)) { logger.error( diff --git a/client/index.js b/client/index.js index ad3ea84f7..29bb701c4 100755 --- a/client/index.js +++ b/client/index.js @@ -8,7 +8,6 @@ const express = require('express'); const jayson = require('jayson/promise'); const BlockchainNode = require('../node'); const P2pClient = require('../p2p'); -const EventHandler = require('../event-handler'); const CommonUtil = require('../common/common-util'); const VersionUtil = require('../common/version-util'); const { sendGetRequest } = require('../common/network-util'); @@ -33,8 +32,7 @@ app.use(middleware.expressUrlencdedRequestBodySizeLimiter()); app.use(middleware.corsLimiter()); app.use(middleware.blockchainApiRateLimiter); -const eventHandler = NodeConfigs.ENABLE_EVENT_HANDLER === true ? new EventHandler() : null; -const node = new BlockchainNode(null, eventHandler); +const node = new BlockchainNode(null); // NOTE(platfowner): This is very useful when the server dies without any logs. process.on('uncaughtException', function(err) { logger.error(err); @@ -54,7 +52,7 @@ const p2pClient = new P2pClient(node, minProtocolVersion, maxProtocolVersion); const p2pServer = p2pClient.server; const jsonRpcApis = require('../json_rpc')( - node, p2pServer, eventHandler, minProtocolVersion, maxProtocolVersion); + node, p2pServer, minProtocolVersion, maxProtocolVersion); app.post( '/json-rpc', @@ -823,10 +821,10 @@ if (NodeConfigs.ENABLE_DEV_CLIENT_SET_API) { }); } -if (eventHandler) { +if (node.eh) { // NOTE(cshcomcom): For event handler load balancer! It doesn't mean healthy. app.get('/eh_load_balancer_health_check', (req, res, next) => { - const result = eventHandler.getEventHandlerHealth(); + const result = node.eh.getEventHandlerHealth(); res.status(200) .set('Content-Type', 'text/plain') .send(result) diff --git a/client/protocol_versions.json b/client/protocol_versions.json index de267a5c9..c226f3b81 100644 --- a/client/protocol_versions.json +++ b/client/protocol_versions.json @@ -101,5 +101,8 @@ }, "1.0.8": { "min": "1.0.0" + }, + "1.0.9": { + "min": "1.0.0" } } \ No newline at end of file diff --git a/common/constants.js b/common/constants.js index 85e4d70d4..38e8f175a 100644 --- a/common/constants.js +++ b/common/constants.js @@ -572,6 +572,13 @@ function isTxInBlock(state) { return state === TransactionStates.FINALIZED || state === TransactionStates.REVERTED; } +function isEndState(state) { + return state === TransactionStates.FINALIZED || + state === TransactionStates.REVERTED || + state === TransactionStates.FAILED || + state === TransactionStates.TIMED_OUT; +} + /** * State versions. * @@ -650,6 +657,8 @@ const TrafficEventTypes = { const BlockchainEventTypes = { BLOCK_FINALIZED: 'BLOCK_FINALIZED', VALUE_CHANGED: 'VALUE_CHANGED', + TX_STATE_CHANGED: 'TX_STATE_CHANGED', + FILTER_DELETED: 'FILTER_DELETED', }; const BlockchainEventMessageTypes = { @@ -664,6 +673,11 @@ const ValueChangedEventSources = { USER: 'USER', }; +const FilterDeletionReasons = { + FILTER_TIMEOUT: 'FILTER_TIMEOUT', + END_STATE_REACHED: 'END_STATE_REACHED', +} + // ** Lists & Sets ** /** @@ -825,6 +839,7 @@ module.exports = { WriteDbOperations, TransactionStates, isTxInBlock, + isEndState, StateVersions, getBlockchainConfig, SyncModeOptions, @@ -832,6 +847,7 @@ module.exports = { BlockchainEventTypes, BlockchainEventMessageTypes, ValueChangedEventSources, + FilterDeletionReasons, isServiceType, isServiceAccountServiceType, isReservedServiceName, diff --git a/common/result-code.js b/common/result-code.js index 3a1d2428a..3d7bbf775 100644 --- a/common/result-code.js +++ b/common/result-code.js @@ -233,6 +233,10 @@ const EventHandlerErrorCode = { MISSING_FILTER_ID_IN_FILTER_ID_TO_PARSED_PATH: 70250, MISSING_FILTER_ID_SET: 70251, MISSING_FILTER_ID_IN_FILTER_ID_SET: 70252, + // TX_STATE_CHANGED (703XX) + MISSING_TX_HASH_IN_CONFIG: 70300, + INVALID_TX_HASH: 70301, + INVALID_TIMEOUT: 70302, }; /** diff --git a/deploy_blockchain_genesis_gcp.sh b/deploy_blockchain_genesis_gcp.sh index d2360562c..5178c8bfa 100644 --- a/deploy_blockchain_genesis_gcp.sh +++ b/deploy_blockchain_genesis_gcp.sh @@ -1,8 +1,8 @@ #!/bin/bash -if [[ $# -lt 3 ]] || [[ $# -gt 8 ]]; then - printf "Usage: bash deploy_blockchain_genesis_gcp.sh [dev|staging|sandbox|exp|spring|summer|mainnet] <# of Shards> [--setup] [--keystore|--mnemonic|--private-key] [--keep-code|--no-keep-code] [--keep-data|--no-keep-data] [--full-sync|--fast-sync] [--kill-only|--skip-kill]\n" - printf "Example: bash deploy_blockchain_genesis_gcp.sh dev my_username 0 --setup --keystore --no-keep-code\n" +if [[ $# -lt 3 ]] || [[ $# -gt 9 ]]; then + printf "Usage: bash deploy_blockchain_genesis_gcp.sh [dev|staging|sandbox|exp|spring|summer|mainnet] <# of Shards> [--setup] [--keystore|--mnemonic|--private-key] [--keep-code|--no-keep-code] [--keep-data|--no-keep-data] [--full-sync|--fast-sync] [--chown-data|--no-chown-data] [--kill-only|--skip-kill]\n" + printf "Example: bash deploy_blockchain_genesis_gcp.sh dev gcp_user 0 --setup --keystore --no-keep-code --no-chown-data\n" printf "\n" exit fi @@ -59,6 +59,10 @@ function parse_options() { SYNC_MODE_OPTION="$option" elif [[ $option = '--fast-sync' ]]; then SYNC_MODE_OPTION="$option" + elif [[ $option = '--chown-data' ]]; then + CHOWN_DATA_OPTION="$option" + elif [[ $option = '--no-chown-data' ]]; then + CHOWN_DATA_OPTION="$option" elif [[ $option = '--kill-only' ]]; then if [[ "$KILL_OPTION" ]]; then printf "You cannot use both --skip-kill and --kill-only\n" @@ -83,6 +87,7 @@ ACCOUNT_INJECTION_OPTION="--private-key" KEEP_CODE_OPTION="--keep-code" KEEP_DATA_OPTION="--keep-data" SYNC_MODE_OPTION="--fast-sync" +CHOWN_DATA_OPTION="--chown-data" KILL_OPTION="" ARG_INDEX=4 @@ -95,6 +100,7 @@ printf "ACCOUNT_INJECTION_OPTION=$ACCOUNT_INJECTION_OPTION\n" printf "KEEP_CODE_OPTION=$KEEP_CODE_OPTION\n" printf "KEEP_DATA_OPTION=$KEEP_DATA_OPTION\n" printf "SYNC_MODE_OPTION=$SYNC_MODE_OPTION\n" +printf "CHOWN_DATA_OPTION=$CHOWN_DATA_OPTION\n" printf "KILL_OPTION=$KILL_OPTION\n" if [[ "$ACCOUNT_INJECTION_OPTION" = "" ]]; then @@ -242,14 +248,14 @@ fi # install node modules on GCP instances if [[ $KEEP_CODE_OPTION = "--no-keep-code" ]]; then printf "\n* >> Installing node modules for parent tracker (${TRACKER_TARGET_ADDR}) *********************************************************\n\n" - gcloud compute ssh $TRACKER_TARGET_ADDR --command "cd ./ain-blockchain; sudo yarn install --ignore-engines" --project $PROJECT_ID --zone $TRACKER_ZONE + gcloud compute ssh $TRACKER_TARGET_ADDR --command "cd ./ain-blockchain; yarn install --ignore-engines" --project $PROJECT_ID --zone $TRACKER_ZONE for node_index in `seq 0 $(( $NUM_NODES - 1 ))`; do NODE_TARGET_ADDR=NODE_${node_index}_TARGET_ADDR NODE_ZONE=NODE_${node_index}_ZONE printf "\n* >> Installing node modules for parent node $node_index (${!NODE_TARGET_ADDR}) *********************************************************\n\n" - gcloud compute ssh ${!NODE_TARGET_ADDR} --command "cd ./ain-blockchain; sudo yarn install --ignore-engines" --project $PROJECT_ID --zone ${!NODE_ZONE} + gcloud compute ssh ${!NODE_TARGET_ADDR} --command "cd ./ain-blockchain; yarn install --ignore-engines" --project $PROJECT_ID --zone ${!NODE_ZONE} done fi @@ -319,7 +325,7 @@ printf "\n* >> Starting parent tracker (${TRACKER_TARGET_ADDR}) **************** printf "\n" printf "KEEP_CODE_OPTION=$KEEP_CODE_OPTION\n" printf "KEEP_DATA_OPTION=$KEEP_DATA_OPTION\n" -START_TRACKER_CMD="gcloud compute ssh $TRACKER_TARGET_ADDR --command '$START_TRACKER_CMD_BASE $KEEP_CODE_OPTION' --project $PROJECT_ID --zone $TRACKER_ZONE" +START_TRACKER_CMD="gcloud compute ssh $TRACKER_TARGET_ADDR --command '$START_TRACKER_CMD_BASE $GCP_USER $KEEP_CODE_OPTION' --project $PROJECT_ID --zone $TRACKER_ZONE" printf "START_TRACKER_CMD=$START_TRACKER_CMD\n" eval $START_TRACKER_CMD @@ -350,15 +356,17 @@ for node_index in `seq 0 $(( $NUM_NODES - 1 ))`; do printf "KEEP_CODE_OPTION=$KEEP_CODE_OPTION\n" printf "KEEP_DATA_OPTION=$KEEP_DATA_OPTION\n" printf "SYNC_MODE_OPTION=$SYNC_MODE_OPTION\n" + printf "CHOWN_DATA_OPTION=$CHOWN_DATA_OPTION\n" printf "JSON_RPC_OPTION=$JSON_RPC_OPTION\n" printf "UPDATE_FRONT_DB_OPTION=$UPDATE_FRONT_DB_OPTION\n" printf "REST_FUNC_OPTION=$REST_FUNC_OPTION\n" printf "EVENT_HANDLER_OPTION=$EVENT_HANDLER_OPTION\n" printf "\n" - START_NODE_CMD="gcloud compute ssh ${!NODE_TARGET_ADDR} --command '$START_NODE_CMD_BASE $SEASON 0 $node_index $KEEP_CODE_OPTION $KEEP_DATA_OPTION $SYNC_MODE_OPTION $ACCOUNT_INJECTION_OPTION $JSON_RPC_OPTION $UPDATE_FRONT_DB_OPTION $REST_FUNC_OPTION $EVENT_HANDLER_OPTION' --project $PROJECT_ID --zone ${!NODE_ZONE}" + START_NODE_CMD="gcloud compute ssh ${!NODE_TARGET_ADDR} --command '$START_NODE_CMD_BASE $SEASON $GCP_USER 0 $node_index $KEEP_CODE_OPTION $KEEP_DATA_OPTION $SYNC_MODE_OPTION $CHOWN_DATA_OPTION $ACCOUNT_INJECTION_OPTION $JSON_RPC_OPTION $UPDATE_FRONT_DB_OPTION $REST_FUNC_OPTION $EVENT_HANDLER_OPTION' --project $PROJECT_ID --zone ${!NODE_ZONE}" printf "START_NODE_CMD=$START_NODE_CMD\n" eval $START_NODE_CMD + sleep 5 inject_account "$node_index" done @@ -411,30 +419,30 @@ if [[ $NUM_SHARDS -gt 0 ]]; then # install node modules on GCP instances if [[ $KEEP_CODE_OPTION = "--no-keep-code" ]]; then printf "\n* >> Installing node modules for shard_$i tracker (${SHARD_TRACKER_TARGET_ADDR}) *********************************************************\n\n" - gcloud compute ssh $SHARD_TRACKER_TARGET_ADDR --command "cd ./ain-blockchain; sudo yarn install --ignore-engines" --project $PROJECT_ID --zone $TRACKER_ZONE + gcloud compute ssh $SHARD_TRACKER_TARGET_ADDR --command "cd ./ain-blockchain; yarn install --ignore-engines" --project $PROJECT_ID --zone $TRACKER_ZONE printf "\n* >> Installing node modules for shard_$i node 0 (${SHARD_NODE_0_TARGET_ADDR}) *********************************************************\n\n" - gcloud compute ssh $SHARD_NODE_0_TARGET_ADDR --command "cd ./ain-blockchain; sudo yarn install --ignore-engines" --project $PROJECT_ID --zone $NODE_0_ZONE + gcloud compute ssh $SHARD_NODE_0_TARGET_ADDR --command "cd ./ain-blockchain; yarn install --ignore-engines" --project $PROJECT_ID --zone $NODE_0_ZONE printf "\n* >> Installing node modules for shard_$i node 1 (${SHARD_NODE_1_TARGET_ADDR}) *********************************************************\n\n" - gcloud compute ssh $SHARD_NODE_1_TARGET_ADDR --command "cd ./ain-blockchain; sudo yarn install --ignore-engines" --project $PROJECT_ID --zone $NODE_1_ZONE + gcloud compute ssh $SHARD_NODE_1_TARGET_ADDR --command "cd ./ain-blockchain; yarn install --ignore-engines" --project $PROJECT_ID --zone $NODE_1_ZONE printf "\n* >> Installing node modules for shard_$i node 2 (${SHARD_NODE_2_TARGET_ADDR}) *********************************************************\n\n" - gcloud compute ssh $SHARD_NODE_2_TARGET_ADDR --command "cd ./ain-blockchain; sudo yarn install --ignore-engines" --project $PROJECT_ID --zone $NODE_2_ZONE + gcloud compute ssh $SHARD_NODE_2_TARGET_ADDR --command "cd ./ain-blockchain; yarn install --ignore-engines" --project $PROJECT_ID --zone $NODE_2_ZONE fi # ssh into each instance, install packages and start up the server printf "\n* >> Starting shard_$i tracker (${SHARD_TRACKER_TARGET_ADDR}) *********************************************************\n\n" - START_TRACKER_CMD="gcloud compute ssh $SHARD_TRACKER_TARGET_ADDR --command '$START_TRACKER_CMD_BASE $KEEP_CODE_OPTION' --project $PROJECT_ID --zone $TRACKER_ZONE" + START_TRACKER_CMD="gcloud compute ssh $SHARD_TRACKER_TARGET_ADDR --command '$START_TRACKER_CMD_BASE $GCP_USER $KEEP_CODE_OPTION' --project $PROJECT_ID --zone $TRACKER_ZONE" printf "START_TRACKER_CMD=$START_TRACKER_CMD\n" eval $START_TRACKER_CMD printf "\n* >> Starting shard_$i node 0 (${SHARD_NODE_0_TARGET_ADDR}) *********************************************************\n\n" - START_NODE_CMD="gcloud compute ssh $SHARD_NODE_0_TARGET_ADDR --command '$START_NODE_CMD_BASE $SEASON $SEASON $i 0 $KEEP_CODE_OPTION $KEEP_DATA_OPTION' --project $PROJECT_ID --zone $NODE_0_ZONE" + START_NODE_CMD="gcloud compute ssh $SHARD_NODE_0_TARGET_ADDR --command '$START_NODE_CMD_BASE $SEASON $GCP_USER $i 0 $KEEP_CODE_OPTION $KEEP_DATA_OPTION $CHOWN_DATA_OPTION' --project $PROJECT_ID --zone $NODE_0_ZONE" printf "START_NODE_CMD=$START_NODE_CMD\n" eval $START_NODE_CMD printf "\n* >> Starting shard_$i node 1 (${SHARD_NODE_1_TARGET_ADDR}) *********************************************************\n\n" - START_NODE_CMD="gcloud compute ssh $SHARD_NODE_1_TARGET_ADDR --command '$START_NODE_CMD_BASE $SEASON $SEASON $i 0 $KEEP_CODE_OPTION $KEEP_DATA_OPTION' --project $PROJECT_ID --zone $NODE_1_ZONE" + START_NODE_CMD="gcloud compute ssh $SHARD_NODE_1_TARGET_ADDR --command '$START_NODE_CMD_BASE $SEASON $GCP_USER $i 0 $KEEP_CODE_OPTION $KEEP_DATA_OPTION $CHOWN_DATA_OPTION' --project $PROJECT_ID --zone $NODE_1_ZONE" printf "START_NODE_CMD=$START_NODE_CMD\n" eval $START_NODE_CMD printf "\n* >> Starting shard_$i node 2 (${SHARD_NODE_2_TARGET_ADDR}) *********************************************************\n\n" - START_NODE_CMD="gcloud compute ssh $SHARD_NODE_2_TARGET_ADDR --command '$START_NODE_CMD_BASE $SEASON $SEASON $i 0 $KEEP_CODE_OPTION $KEEP_DATA_OPTION' --project $PROJECT_ID --zone $NODE_2_ZONE" + START_NODE_CMD="gcloud compute ssh $SHARD_NODE_2_TARGET_ADDR --command '$START_NODE_CMD_BASE $SEASON $GCP_USER $i 0 $KEEP_CODE_OPTION $KEEP_DATA_OPTION $CHOWN_DATA_OPTION' --project $PROJECT_ID --zone $NODE_2_ZONE" printf "START_NODE_CMD=$START_NODE_CMD\n" eval $START_NODE_CMD done diff --git a/deploy_blockchain_incremental_gcp.sh b/deploy_blockchain_incremental_gcp.sh index 9c7385b24..8b5197380 100644 --- a/deploy_blockchain_incremental_gcp.sh +++ b/deploy_blockchain_incremental_gcp.sh @@ -1,8 +1,8 @@ #!/bin/bash -if [[ $# -lt 5 ]] || [[ $# -gt 11 ]]; then - printf "Usage: bash deploy_blockchain_incremental_gcp.sh [dev|staging|sandbox|exp|spring|summer|mainnet] <# of Shards> [--setup] [--keystore|--mnemonic|--private-key] [--keep-code|--no-keep-code] [--keep-data|--no-keep-data] [--full-sync|--fast-sync]\n" - printf "Example: bash deploy_blockchain_incremental_gcp.sh dev my_username 0 -1 1 --setup --keystore --no-keep-code --full-sync\n" +if [[ $# -lt 5 ]] || [[ $# -gt 12 ]]; then + printf "Usage: bash deploy_blockchain_incremental_gcp.sh [dev|staging|sandbox|exp|spring|summer|mainnet] <# of Shards> [--setup] [--keystore|--mnemonic|--private-key] [--keep-code|--no-keep-code] [--keep-data|--no-keep-data] [--full-sync|--fast-sync] [--chown-data|--no-chown-data]\n" + printf "Example: bash deploy_blockchain_incremental_gcp.sh dev gcp_user 0 -1 1 --setup --keystore --no-keep-code --full-sync --no-chown-data\n" printf "Note: = -1 is for tracker\n" printf "Note: is inclusive\n" printf "\n" @@ -64,6 +64,10 @@ function parse_options() { SYNC_MODE_OPTION="$option" elif [[ $option = '--fast-sync' ]]; then SYNC_MODE_OPTION="$option" + elif [[ $option = '--chown-data' ]]; then + CHOWN_DATA_OPTION="$option" + elif [[ $option = '--no-chown-data' ]]; then + CHOWN_DATA_OPTION="$option" else printf "Invalid option: $option\n" exit @@ -76,6 +80,7 @@ ACCOUNT_INJECTION_OPTION="--private-key" KEEP_CODE_OPTION="--keep-code" KEEP_DATA_OPTION="--keep-data" SYNC_MODE_OPTION="--fast-sync" +CHOWN_DATA_OPTION="--chown-data" ARG_INDEX=6 while [ $ARG_INDEX -le $# ]; do @@ -88,6 +93,7 @@ printf "ACCOUNT_INJECTION_OPTION=$ACCOUNT_INJECTION_OPTION\n" printf "KEEP_CODE_OPTION=$KEEP_CODE_OPTION\n" printf "KEEP_DATA_OPTION=$KEEP_DATA_OPTION\n" printf "SYNC_MODE_OPTION=$SYNC_MODE_OPTION\n" +printf "CHOWN_DATA_OPTION=$CHOWN_DATA_OPTION\n" if [[ "$ACCOUNT_INJECTION_OPTION" = "" ]]; then printf "Must provide an ACCOUNT_INJECTION_OPTION\n" @@ -180,7 +186,7 @@ function deploy_tracker() { printf "KEEP_CODE_OPTION=$KEEP_CODE_OPTION\n" printf "\n" - START_TRACKER_CMD="gcloud compute ssh $TRACKER_TARGET_ADDR --command '$START_TRACKER_CMD_BASE $KEEP_CODE_OPTION' --project $PROJECT_ID --zone $TRACKER_ZONE" + START_TRACKER_CMD="gcloud compute ssh $TRACKER_TARGET_ADDR --command '$START_TRACKER_CMD_BASE $GCP_USER $KEEP_CODE_OPTION' --project $PROJECT_ID --zone $TRACKER_ZONE" printf "START_TRACKER_CMD=$START_TRACKER_CMD\n\n" eval $START_TRACKER_CMD } @@ -237,17 +243,19 @@ function deploy_node() { printf "KEEP_CODE_OPTION=$KEEP_CODE_OPTION\n" printf "KEEP_DATA_OPTION=$KEEP_DATA_OPTION\n" printf "SYNC_MODE_OPTION=$SYNC_MODE_OPTION\n" + printf "CHOWN_DATA_OPTION=$CHOWN_DATA_OPTION\n" printf "JSON_RPC_OPTION=$JSON_RPC_OPTION\n" printf "UPDATE_FRONT_DB_OPTION=$UPDATE_FRONT_DB_OPTION\n" printf "REST_FUNC_OPTION=$REST_FUNC_OPTION\n" printf "EVENT_HANDLER_OPTION=$EVENT_HANDLER_OPTION\n" printf "\n" - START_NODE_CMD="gcloud compute ssh $node_target_addr --command '$START_NODE_CMD_BASE $SEASON 0 $node_index $KEEP_CODE_OPTION $KEEP_DATA_OPTION $SYNC_MODE_OPTION $ACCOUNT_INJECTION_OPTION $JSON_RPC_OPTION $UPDATE_FRONT_DB_OPTION $REST_FUNC_OPTION $EVENT_HANDLER_OPTION' --project $PROJECT_ID --zone $node_zone" + START_NODE_CMD="gcloud compute ssh $node_target_addr --command '$START_NODE_CMD_BASE $SEASON $GCP_USER 0 $node_index $KEEP_CODE_OPTION $KEEP_DATA_OPTION $SYNC_MODE_OPTION $CHOWN_DATA_OPTION $ACCOUNT_INJECTION_OPTION $JSON_RPC_OPTION $UPDATE_FRONT_DB_OPTION $REST_FUNC_OPTION $EVENT_HANDLER_OPTION' --project $PROJECT_ID --zone $node_zone" printf "START_NODE_CMD=$START_NODE_CMD\n\n" eval $START_NODE_CMD # 4. Inject node account + sleep 5 if [[ $ACCOUNT_INJECTION_OPTION = "--keystore" ]]; then local node_ip_addr=${IP_ADDR_LIST[${node_index}]} printf "\n* >> Initializing account for node $node_index ($node_target_addr) ********************\n\n" diff --git a/deploy_blockchain_sandbox_gcp.sh b/deploy_blockchain_sandbox_gcp.sh index e232deda5..085e893f8 100644 --- a/deploy_blockchain_sandbox_gcp.sh +++ b/deploy_blockchain_sandbox_gcp.sh @@ -1,8 +1,8 @@ #!/bin/bash -if [[ $# -lt 3 ]] || [[ $# -gt 7 ]]; then - printf "Usage: bash deploy_blockchain_sandbox_gcp.sh <# start node> <# end node> [--setup] [--keep-code|--no-keep-code] [--keep-data|--no-keep-data] [--kill-only|--skip-kill]\n" - printf "Example: bash deploy_blockchain_sandbox_gcp.sh my_username 10 99 --setup\n" +if [[ $# -lt 3 ]] || [[ $# -gt 8 ]]; then + printf "Usage: bash deploy_blockchain_sandbox_gcp.sh <# start node> <# end node> [--setup] [--keep-code|--no-keep-code] [--keep-data|--no-keep-data] [--chown-data|--no-chown-data] [--kill-only|--skip-kill]\n" + printf "Example: bash deploy_blockchain_sandbox_gcp.sh gcp_user 10 99 --setup --no-chown-data\n" printf "\n" exit fi @@ -17,6 +17,7 @@ GCP_USER="$1" START_NODE_IDX="$2" END_NODE_IDX="$3" ACCOUNT_INJECTION_OPTION="--private-key" # always use the private keys + printf "GCP_USER=$GCP_USER\n" printf "START_NODE_IDX=$START_NODE_IDX\n" printf "END_NODE_IDX=$END_NODE_IDX\n" @@ -39,6 +40,10 @@ function parse_options() { KEEP_DATA_OPTION="$option" elif [[ $option = '--no-keep-data' ]]; then KEEP_DATA_OPTION="$option" + elif [[ $option = '--chown-data' ]]; then + CHOWN_DATA_OPTION="$option" + elif [[ $option = '--no-chown-data' ]]; then + CHOWN_DATA_OPTION="$option" elif [[ $option = '--kill-only' ]]; then if [[ "$KILL_OPTION" ]]; then printf "You cannot use both --skip-kill and --kill-only\n" @@ -61,6 +66,7 @@ function parse_options() { SETUP_OPTION="" KEEP_CODE_OPTION="--no-keep-code" KEEP_DATA_OPTION="--no-keep-data" +CHOWN_DATA_OPTION="--chown-data" KILL_OPTION="" ARG_INDEX=4 @@ -71,6 +77,7 @@ done printf "SETUP_OPTION=$SETUP_OPTION\n" printf "KEEP_CODE_OPTION=$KEEP_CODE_OPTION\n" printf "KEEP_DATA_OPTION=$KEEP_DATA_OPTION\n" +printf "CHOWN_DATA_OPTION=$CHOWN_DATA_OPTION\n" printf "KILL_OPTION=$KILL_OPTION\n" @@ -407,6 +414,7 @@ printf "\n" printf "START_NODE_CMD_BASE=$START_NODE_CMD_BASE\n" printf "KEEP_CODE_OPTION=$KEEP_CODE_OPTION\n" printf "KEEP_DATA_OPTION=$KEEP_DATA_OPTION\n" +printf "CHOWN_DATA_OPTION=$CHOWN_DATA_OPTION\n" node_index=$START_NODE_IDX while [ $node_index -le $END_NODE_IDX ]; do @@ -429,15 +437,17 @@ while [ $node_index -le $END_NODE_IDX ]; do printf "KEEP_CODE_OPTION=$KEEP_CODE_OPTION\n" printf "KEEP_DATA_OPTION=$KEEP_DATA_OPTION\n" + printf "CHOWN_DATA_OPTION=$CHOWN_DATA_OPTION\n" printf "JSON_RPC_OPTION=$JSON_RPC_OPTION\n" printf "UPDATE_FRONT_DB_OPTION=$UPDATE_FRONT_DB_OPTION\n" printf "REST_FUNC_OPTION=$REST_FUNC_OPTION\n" printf "\n" - START_NODE_CMD="gcloud compute ssh ${!NODE_TARGET_ADDR} --command '$START_NODE_CMD_BASE $SEASON 0 $node_index $KEEP_CODE_OPTION $KEEP_DATA_OPTION $JSON_RPC_OPTION $UPDATE_FRONT_DB_OPTION $REST_FUNC_OPTION $ACCOUNT_INJECTION_OPTION' --project $PROJECT_ID --zone ${!NODE_ZONE}" + START_NODE_CMD="gcloud compute ssh ${!NODE_TARGET_ADDR} --command '$START_NODE_CMD_BASE $SEASON $GCP_USER 0 $node_index $KEEP_CODE_OPTION $KEEP_DATA_OPTION $CHOWN_DATA_OPTION $JSON_RPC_OPTION $UPDATE_FRONT_DB_OPTION $REST_FUNC_OPTION $ACCOUNT_INJECTION_OPTION' --project $PROJECT_ID --zone ${!NODE_ZONE}" # NOTE(minsulee2): Keep printf for extensibility experiment debugging purpose # printf "START_NODE_CMD=$START_NODE_CMD\n" eval $START_NODE_CMD + sleep 5 inject_account "$node_index" ((node_index++)) sleep 1 diff --git a/deploy_monitoring_gcp.sh b/deploy_monitoring_gcp.sh index 624bbb62e..c35599857 100644 --- a/deploy_monitoring_gcp.sh +++ b/deploy_monitoring_gcp.sh @@ -2,7 +2,7 @@ if [[ "$#" -lt 2 ]]; then printf "Usage: bash deploy_monitoring_gcp.sh [dev|staging|sandbox|exp|spring|summer|mainnet] [--setup]\n" - printf "Example: bash deploy_monitoring_gcp.sh dev my_username\n" + printf "Example: bash deploy_monitoring_gcp.sh dev gcp_user \n" printf "\n" exit fi @@ -61,4 +61,4 @@ fi # ssh into each instance, install packages and start up the server printf "\n\n############################\n# Running monitoring #\n############################\n\n" -gcloud compute ssh $MONITORING_TARGET_ADDR --command "cd ./ain-blockchain; . setup_monitoring_gcp.sh ${SEASON} && . start_monitoring_gcp.sh" --project $PROJECT_ID --zone $MONITORING_ZONE +gcloud compute ssh $MONITORING_TARGET_ADDR --command "cd ./ain-blockchain; . setup_monitoring_gcp.sh $SEASON $GCP_USER && . start_monitoring_gcp.sh" --project $PROJECT_ID --zone $MONITORING_ZONE diff --git a/event-handler/event-channel-manager.js b/event-handler/event-channel-manager.js index 22f1de447..4f43392da 100644 --- a/event-handler/event-channel-manager.js +++ b/event-handler/event-channel-manager.js @@ -5,13 +5,15 @@ const { getIpAddress } = require('../common/network-util'); const { BlockchainEventMessageTypes, NodeConfigs, + BlockchainEventTypes, } = require('../common/constants'); const EventHandlerError = require('./event-handler-error'); const { EventHandlerErrorCode } = require('../common/result-code'); +const BlockchainEvent = require('./blockchain-event'); class EventChannelManager { - constructor(eventHandler) { - this.eventHandler = eventHandler; + constructor(node) { + this.node = node; this.wsServer = null; // TODO(cshcomcom): Use Map data structure. this.channels = {}; // [channelId]: Channel @@ -28,7 +30,7 @@ class EventChannelManager { maxNumEventChannels: NodeConfigs.MAX_NUM_EVENT_CHANNELS, numEventChannels: this.getNumEventChannels(), maxNumEventFilters: NodeConfigs.MAX_NUM_EVENT_FILTERS, - numEventFilters: this.eventHandler.getNumEventFilters(), + numEventFilters: this.node.eh.getNumEventFilters(), } } @@ -44,6 +46,12 @@ class EventChannelManager { return channelInfo; } + getChannelByEventFilterId(eventFilterId) { + const channelId = this.filterIdToChannelId[eventFilterId]; + const channel = this.channels[channelId]; + return channel; + } + startListening() { this.wsServer = new ws.Server({ port: NodeConfigs.EVENT_HANDLER_PORT, @@ -92,7 +100,7 @@ class EventChannelManager { handleRegisterFilterMessage(channel, messageData) { const clientFilterId = messageData.id; - if (this.eventHandler.getNumEventFilters() >= NodeConfigs.MAX_NUM_EVENT_FILTERS) { + if (this.node.eh.getNumEventFilters() >= NodeConfigs.MAX_NUM_EVENT_FILTERS) { throw new EventHandlerError(EventHandlerErrorCode.EVENT_FILTER_EXCEEDS_SIZE_LIMIT, `The number of event filters exceeds its limit (${NodeConfigs.MAX_NUM_EVENT_FILTERS})`, null, clientFilterId); @@ -114,20 +122,50 @@ class EventChannelManager { `Can't find config from message.data (${JSON.stringify(messageData)})`, null, clientFilterId); } + this.registerFilter(channel, clientFilterId, eventType, config); + } + registerFilter(channel, clientFilterId, eventType, config) { const filter = - this.eventHandler.createAndRegisterEventFilter(clientFilterId, channel.id, + this.node.eh.createAndRegisterEventFilter(clientFilterId, channel.id, eventType, config); channel.addEventFilterId(filter.id); this.filterIdToChannelId[filter.id] = channel.id; + if (eventType === BlockchainEventTypes.TX_STATE_CHANGED) { + const transactionInfo = this.node.getTransactionByHash(config.tx_hash); + if (!transactionInfo) { + this.node.eh.setFilterDeletionTimeout(filter.id); + } else { + const blockchainEvent = new BlockchainEvent(BlockchainEventTypes.TX_STATE_CHANGED, { + transaction: transactionInfo.transaction, + tx_state: { + before: null, + after: transactionInfo.state, + }, + }); + this.transmitEventByEventFilterId(filter.id, blockchainEvent); + } + } } deregisterFilter(channel, clientFilterId) { - const filter = this.eventHandler.deregisterEventFilter(clientFilterId, channel.id); + const filter = this.node.eh.deregisterEventFilter(clientFilterId, channel.id); channel.deleteEventFilterId(filter.id); delete this.filterIdToChannelId[filter.id]; } + deregisterFilterByEventFilterId(eventFilterId) { + const LOG_HEADER = 'deregisterFilterByEventFilterId'; + const channel = this.getChannelByEventFilterId(eventFilterId); + if (!channel) { + logger.error(`[${LOG_HEADER}] Can't find channel by event filter id ` + + `(eventFilterId: ${eventFilterId})`); + return; + } + const clientFilterId = this.node.eh.getClientFilterIdFromGlobalFilterId(eventFilterId); + this.deregisterFilter(channel, clientFilterId); + } + handleDeregisterFilterMessage(channel, messageData) { const clientFilterId = messageData.id; this.deregisterFilter(channel, clientFilterId); @@ -138,7 +176,7 @@ class EventChannelManager { try { const globalFilterId = eventErr.globalFilterId; const clientFilterId = eventErr.clientFilterId || - this.eventHandler.getClientFilterIdFromGlobalFilterId(globalFilterId); + this.node.eh.getClientFilterIdFromGlobalFilterId(globalFilterId); if (!clientFilterId) { logger.error(`[${LOG_HEADER}] Can't find client filter ID (${eventErr.message})`); return; @@ -198,15 +236,14 @@ class EventChannelManager { transmitEventByEventFilterId(eventFilterId, event) { const LOG_HEADER = 'transmitEventByEventFilterId'; - const channelId = this.filterIdToChannelId[eventFilterId]; - const channel = this.channels[channelId]; + const channel = this.getChannelByEventFilterId(eventFilterId); if (!channel) { logger.error(`[${LOG_HEADER}] Can't find channel by event filter id ` + `(eventFilterId: ${eventFilterId})`); return; } const eventObj = event.toObject(); - const clientFilterId = this.eventHandler.getClientFilterIdFromGlobalFilterId(eventFilterId); + const clientFilterId = this.node.eh.getClientFilterIdFromGlobalFilterId(eventFilterId); Object.assign(eventObj, { filter_id: clientFilterId }); this.transmitEventObj(channel, eventObj); } @@ -237,7 +274,7 @@ class EventChannelManager { channel.webSocket.terminate(); const filterIds = channel.getAllFilterIds(); for (const filterId of filterIds) { - const clientFilterId = this.eventHandler.getClientFilterIdFromGlobalFilterId(filterId); + const clientFilterId = this.node.eh.getClientFilterIdFromGlobalFilterId(filterId); this.deregisterFilter(channel, clientFilterId); } delete this.channels[channel.id]; diff --git a/event-handler/index.js b/event-handler/index.js index 75538aa93..3e471effc 100644 --- a/event-handler/index.js +++ b/event-handler/index.js @@ -2,7 +2,8 @@ const logger = new (require('../logger'))('EVENT_HANDLER'); const _ = require('lodash'); const EventChannelManager = require('./event-channel-manager'); const StateEventTreeManager = require('./state-event-tree-manager'); -const { BlockchainEventTypes } = require('../common/constants'); +const { BlockchainEventTypes, isEndState, FilterDeletionReasons } + = require('../common/constants'); const CommonUtil = require('../common/common-util'); const EventFilter = require('./event-filter'); const BlockchainEvent = require('./blockchain-event'); @@ -13,11 +14,14 @@ const { } = require('../common/constants'); class EventHandler { - constructor() { - this.eventChannelManager = null; + constructor(node) { + this.node = node; + this.eventChannelManager = new EventChannelManager(node); this.stateEventTreeManager = new StateEventTreeManager(); this.eventFilters = {}; this.eventTypeToEventFilterIds = {}; + this.eventFilterIdToTimeoutCallback = new Map(); + this.txHashToEventFilterIdSet = new Map(); for (const eventType of Object.keys(BlockchainEventTypes)) { this.eventTypeToEventFilterIds[eventType] = new Set(); } @@ -26,7 +30,6 @@ class EventHandler { run() { const LOG_HEADER = 'run'; - this.eventChannelManager = new EventChannelManager(this); this.eventChannelManager.startListening(); logger.info(`[${LOG_HEADER}] Event handler started!`); } @@ -120,6 +123,74 @@ class EventHandler { } } + emitTxStateChanged(transaction, beforeState, afterState) { + const LOG_HEADER = 'emitTxStateChanged'; + if (!_.get(transaction, 'hash', null)) { + logger.error(`[${LOG_HEADER}] Invalid Tx(${JSON.stringify(transaction)})`); + return; + } + const eventFilterIdSet = this.txHashToEventFilterIdSet.get(transaction.hash); + if (!eventFilterIdSet) { + return; + } + for (const eventFilterId of eventFilterIdSet) { + const timeoutCallback = this.eventFilterIdToTimeoutCallback.get(eventFilterId); + if (timeoutCallback) { + clearTimeout(timeoutCallback); + } + + const blockchainEvent = new BlockchainEvent(BlockchainEventTypes.TX_STATE_CHANGED, { + transaction: transaction, + tx_state: { + before: beforeState, + after: afterState, + }, + }); + this.eventChannelManager.transmitEventByEventFilterId(eventFilterId, blockchainEvent); + + // NOTE(ehgmsdk20): When the state no longer changes, the event filter is removed. + if (isEndState(afterState)) { + // NOTE(ehgmsdk20): Send message that the filter has been deleted before deleting, + // because once the filter is deleted, the message cannot be sent. + this.emitFilterDeleted(eventFilterId, FilterDeletionReasons.END_STATE_REACHED); + this.eventChannelManager.deregisterFilterByEventFilterId(eventFilterId); + } + } + } + + emitFilterDeleted(eventFilterId, filterDeletionReason) { + const LOG_HEADER = 'emitFilterDeleted'; + + if (!eventFilterId) { + logger.error(`[${LOG_HEADER}] EventFilterId is empty.`); + return; + } + const clientFilterId = this.getClientFilterIdFromGlobalFilterId(eventFilterId); + const blockchainEvent = new BlockchainEvent(BlockchainEventTypes.FILTER_DELETED, { + filter_id: clientFilterId, // Client needs client filter ID. + reason: filterDeletionReason, + }); + this.eventChannelManager.transmitEventByEventFilterId(eventFilterId, blockchainEvent); + } + + setFilterDeletionTimeout(eventFilterId) { + const LOG_HEADER = 'setFilterDeletionTimeout'; + + if (!eventFilterId) { + logger.error(`[${LOG_HEADER}] EventFilterId is empty.`); + return; + } + const timeoutId = this.eventFilterIdToTimeoutCallback.get(eventFilterId); + if (timeoutId) { + clearTimeout(timeoutId); + } + + this.eventFilterIdToTimeoutCallback.set(eventFilterId, setTimeout(() => { + this.emitFilterDeleted(eventFilterId, FilterDeletionReasons.FILTER_TIMEOUT); + this.eventChannelManager.deregisterFilterByEventFilterId(eventFilterId); + }, NodeConfigs.EVENT_HANDLER_FILTER_DELETION_TIMEOUT_MS)); + } + getClientFilterIdFromGlobalFilterId(globalFilterId) { const [channelId, clientFilterId] = globalFilterId.split(':'); if (!clientFilterId) { @@ -158,6 +229,17 @@ class EventHandler { `Invalid format path (${path})`); } break; + case BlockchainEventTypes.TX_STATE_CHANGED: + const txHash = _.get(config, 'tx_hash', null); + if (!txHash) { + throw new EventHandlerError(EventHandlerErrorCode.MISSING_TX_HASH_IN_CONFIG, + `config.tx_hash is missing (${JSON.stringify(config)})`); + } + if (!CommonUtil.isValidHash(txHash)) { + throw new EventHandlerError(EventHandlerErrorCode.INVALID_TX_HASH, + `Invalid tx hash (${txHash})`); + } + break; default: throw new EventHandlerError(EventHandlerErrorCode.INVALID_EVENT_TYPE_IN_VALIDATE_FUNC, `Invalid event type (${eventType})`); @@ -178,6 +260,13 @@ class EventHandler { this.eventTypeToEventFilterIds[eventType].add(eventFilterId); if (eventType === BlockchainEventTypes.VALUE_CHANGED) { this.stateEventTreeManager.registerEventFilterId(config.path, eventFilterId); + } else if (eventType === BlockchainEventTypes.TX_STATE_CHANGED) { + const eventFilterIdSet = this.txHashToEventFilterIdSet.get(config.tx_hash); + if (eventFilterIdSet) { + eventFilterIdSet.add(eventFilterId); + } else { + this.txHashToEventFilterIdSet.set(config.tx_hash, new Set([eventFilterId])); + } } logger.info(`[${LOG_HEADER}] New filter is registered. (eventFilterId: ${eventFilterId}, ` + `eventType: ${eventType}, config: ${JSON.stringify(config)})`); @@ -208,6 +297,19 @@ class EventHandler { } if (eventFilter.type === BlockchainEventTypes.VALUE_CHANGED) { this.stateEventTreeManager.deregisterEventFilterId(eventFilterId); + } else if (eventFilter.type === BlockchainEventTypes.TX_STATE_CHANGED) { + const timeoutCallback = this.eventFilterIdToTimeoutCallback.get(eventFilterId); + if (timeoutCallback) { + clearTimeout(timeoutCallback); + } + this.eventFilterIdToTimeoutCallback.delete(eventFilterId); + + const txHash = _.get(eventFilter.config, 'tx_hash', null); + const eventFilterIdSet = this.txHashToEventFilterIdSet.get(txHash); + eventFilterIdSet.delete(eventFilterId); + if (eventFilterIdSet.size === 0) { + this.txHashToEventFilterIdSet.delete(txHash); + } } logger.info(`[${LOG_HEADER}] Filter is deregistered. (eventFilterId: ${eventFilterId})`); return eventFilter; diff --git a/json_rpc/index.js b/json_rpc/index.js index c7dd0f60b..cbd6b2c99 100644 --- a/json_rpc/index.js +++ b/json_rpc/index.js @@ -25,7 +25,7 @@ const { JSON_RPC_METHODS } = require('./constants'); * @return {dict} A closure of functions compatible with the jayson library for * servicing JSON-RPC requests. */ -module.exports = function getApis(node, p2pServer, eventHandler, minProtocolVersion, maxProtocolVersion) { +module.exports = function getApis(node, p2pServer, minProtocolVersion, maxProtocolVersion) { // Minimally required APIs const apis = { ...getAdminApis(node), @@ -45,8 +45,8 @@ module.exports = function getApis(node, p2pServer, eventHandler, minProtocolVers Object.assign(apis, { [JSON_RPC_METHODS.P2P_GET_PEER_CANDIDATE_INFO]: getNetworkApis(node, p2pServer).p2p_getPeerCandidateInfo }); } - if (eventHandler !== null) { - Object.assign(apis, getEventHandlerApis(eventHandler)); + if (node.eh !== null) { + Object.assign(apis, getEventHandlerApis(node.eh)); } return apis; diff --git a/node/index.js b/node/index.js index bf9c29aac..5312d24c1 100644 --- a/node/index.js +++ b/node/index.js @@ -37,9 +37,10 @@ const Consensus = require('../consensus'); const BlockPool = require('../block-pool'); const ConsensusUtil = require('../consensus/consensus-util'); const PathUtil = require('../common/path-util'); +const EventHandler = require('../event-handler'); class BlockchainNode { - constructor(account = null, eventHandler = null) { + constructor(account = null) { this.keysDir = path.resolve(NodeConfigs.KEYS_ROOT_DIR, `${NodeConfigs.PORT}`); FileUtil.createDir(this.keysDir); this.snapshotDir = path.resolve(NodeConfigs.SNAPSHOTS_ROOT_DIR, `${NodeConfigs.PORT}`); @@ -52,7 +53,7 @@ class BlockchainNode { this.urlInternal = null; this.urlExternal = null; - this.eh = eventHandler; + this.eh = NodeConfigs.ENABLE_EVENT_HANDLER === true ? new EventHandler(this) : null; this.bc = new Blockchain(String(NodeConfigs.PORT)); this.tp = new TransactionPool(this); this.bp = new BlockPool(this); @@ -61,7 +62,7 @@ class BlockchainNode { // Node's front db this.db = DB.create( StateVersions.EMPTY, initialVersion, this.bc, false, this.bc.lastBlockNumber(), - this.stateManager, eventHandler); + this.stateManager, this.eh); this.bootstrapSnapshotSource = null; this.bootstrapSnapshot = null; this.bootstrapSnapshotBlockNumber = -1; diff --git a/p2p/index.js b/p2p/index.js index d03e85e19..c56126dc8 100644 --- a/p2p/index.js +++ b/p2p/index.js @@ -1149,7 +1149,7 @@ class P2pClient { writeOldChainSegment(oldChainSegment) { for (const block of oldChainSegment) { - this.server.node.bc.writeBlock(block); + this.server.node.bc.writeBlock(block, 'writeOldChainSegment'); } } diff --git a/package.json b/package.json index da0233e39..3bd637f9e 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "ain-blockchain", "description": "AI Network Blockchain", - "version": "1.0.8", + "version": "1.0.9", "private": true, "license": "MIT", "author": "dev@ainetwork.ai", diff --git a/setup_monitoring_gcp.sh b/setup_monitoring_gcp.sh index 1208c5180..039445369 100644 --- a/setup_monitoring_gcp.sh +++ b/setup_monitoring_gcp.sh @@ -1,8 +1,8 @@ #!/bin/bash -if [[ "$#" -lt 1 ]]; then - printf "Usage: bash setup_monitoring_gcp.sh [dev|staging|sandbox|exp|spring|summer|mainnet]\n" - printf "Example: bash setup_monitoring_gcp.sh dev\n" +if [[ "$#" -lt 2 ]]; then + printf "Usage: bash setup_monitoring_gcp.sh [dev|staging|sandbox|exp|spring|summer|mainnet] \n" + printf "Example: bash setup_monitoring_gcp.sh dev gcp_user\n" printf "\n" exit fi @@ -14,7 +14,11 @@ if [[ "$1" != 'dev' ]] && [[ "$1" != 'staging' ]] && [[ "$1" != 'sandbox' ]] && fi SEASON="$1" +GCP_USER="$2" +printf "SEASON=$SEASON\n" +printf "GCP_USER=$GCP_USER\n" +printf "\n" printf 'Killing old jobs..\n' killall prometheus @@ -26,7 +30,7 @@ sudo rm -rf /home/ain-blockchain cd ~ sudo mv ain-blockchain /home sudo chmod -R 777 /home/ain-blockchain -sudo chown -R root:root /home/ain-blockchain +sudo chown -R $GCP_USER:$GCP_USER /home/ain-blockchain cd /home/ain-blockchain printf 'Installing Prometheus..\n' diff --git a/start_local_blockchain.sh b/start_local_blockchain.sh index 839a26239..586bd4c40 100755 --- a/start_local_blockchain.sh +++ b/start_local_blockchain.sh @@ -27,6 +27,7 @@ UNSAFE_PRIVATE_KEY=b22c95ffc4a5c096f7d7d0487ba963ce6ac945bdc91c79b64ce209de289be ENABLE_TX_SIG_VERIF_WORKAROUND=true \ ENABLE_GAS_FEE_WORKAROUND=true \ ENABLE_EXPRESS_RATE_LIMIT=false \ + TX_POOL_SIZE_LIMIT_PER_ACCOUNT=1000 \ node ./client/index.js & printf "\nDone\n\n" sleep 10 @@ -41,6 +42,7 @@ UNSAFE_PRIVATE_KEY=921cc48e48c876fc6ed1eb02a76ad520e8d16a91487f9c7e03441da8e35a0 ENABLE_TX_SIG_VERIF_WORKAROUND=true \ ENABLE_GAS_FEE_WORKAROUND=true \ ENABLE_EXPRESS_RATE_LIMIT=false \ + TX_POOL_SIZE_LIMIT_PER_ACCOUNT=1000 \ node ./client/index.js & printf "\nDone\n\n" sleep 10 @@ -55,6 +57,7 @@ UNSAFE_PRIVATE_KEY=41e6e5718188ce9afd25e4b386482ac2c5272c49a622d8d217887bce21dce ENABLE_TX_SIG_VERIF_WORKAROUND=true \ ENABLE_GAS_FEE_WORKAROUND=true \ ENABLE_EXPRESS_RATE_LIMIT=false \ + TX_POOL_SIZE_LIMIT_PER_ACCOUNT=1000 \ ENABLE_EVENT_HANDLER=true \ node ./client/index.js & printf "\nDone\n\n" diff --git a/start_node_genesis_gcp.sh b/start_node_genesis_gcp.sh index 444836735..ffe4a73dd 100644 --- a/start_node_genesis_gcp.sh +++ b/start_node_genesis_gcp.sh @@ -1,9 +1,9 @@ #!/bin/bash # NOTE(minsulee2): Since exit really exits terminals, those are replaced to return 1. -if [[ $# -lt 3 ]] || [[ $# -gt 10 ]]; then - printf "Usage: bash start_node_genesis_gcp.sh [dev|staging|sandbox|exp|spring|summer|mainnet] [--keystore|--mnemonic|--private-key] [--keep-code|--no-keep-code] [--keep-data|--no-keep-data] [--full-sync|--fast-sync] [--json-rpc] [--update-front-db] [--rest-func]\n" - printf "Example: bash start_node_genesis_gcp.sh spring 0 0 --keystore --no-keep-code --full-sync\n" +if [[ $# -lt 4 ]] || [[ $# -gt 12 ]]; then + printf "Usage: bash start_node_genesis_gcp.sh [dev|staging|sandbox|exp|spring|summer|mainnet] [--keystore|--mnemonic|--private-key] [--keep-code|--no-keep-code] [--keep-data|--no-keep-data] [--full-sync|--fast-sync] [--chown-data|--no-chown-data] [--json-rpc] [--update-front-db] [--rest-func]\n" + printf "Example: bash start_node_genesis_gcp.sh spring gcp_user 0 0 --keystore --no-keep-code --full-sync --no-chown-data\n" printf "\n" return 1 fi @@ -29,6 +29,10 @@ function parse_options() { SYNC_MODE_OPTION="$option" elif [[ $option = '--fast-sync' ]]; then SYNC_MODE_OPTION="$option" + elif [[ $option = '--chown-data' ]]; then + CHOWN_DATA_OPTION="$option" + elif [[ $option = '--no-chown-data' ]]; then + CHOWN_DATA_OPTION="$option" elif [[ $option = '--json-rpc' ]]; then JSON_RPC_OPTION="$option" elif [[ $option = '--update-front-db' ]]; then @@ -45,39 +49,44 @@ function parse_options() { # Parse options. SEASON="$1" +GCP_USER="$2" + number_re='^[0-9]+$' -if ! [[ $2 =~ $number_re ]] ; then - printf "Invalid argument: $2\n" +if ! [[ $3 =~ $number_re ]] ; then + printf "Invalid argument: $3\n" return 1 fi -SHARD_INDEX="$2" -if ! [[ $3 =~ $number_re ]] ; then - printf "Invalid argument: $3\n" +SHARD_INDEX="$3" + +if ! [[ $4 =~ $number_re ]] ; then + printf "Invalid argument: $4\n" return 1 fi # NOTE(minsulee2): Sandbox has 100 nodes. -if [[ "$3" -lt 0 ]] || [[ "$3" -gt 100 ]]; then - printf "Invalid argument: $3\n" +if [[ "$4" -lt 0 ]] || [[ "$4" -gt 100 ]]; then + printf "Invalid argument: $4\n" return 1 fi -NODE_INDEX="$3" +NODE_INDEX="$4" ACCOUNT_INJECTION_OPTION="--private-key" KEEP_CODE_OPTION="--keep-code" KEEP_DATA_OPTION="--keep-data" SYNC_MODE_OPTION="--fast-sync" +CHOWN_DATA_OPTION="--chown-data" JSON_RPC_OPTION="" UPDATE_FRONT_DB_OPTION="" REST_FUNC_OPTION="" EVENT_HANDLER_OPTION="" -ARG_INDEX=4 +ARG_INDEX=5 while [ $ARG_INDEX -le $# ]; do parse_options "${!ARG_INDEX}" ((ARG_INDEX++)) done printf "SEASON=$SEASON\n" +printf "GCP_USER=$GCP_USER\n" printf "SHARD_INDEX=$SHARD_INDEX\n" printf "NODE_INDEX=$NODE_INDEX\n" printf "\n" @@ -86,6 +95,7 @@ printf "ACCOUNT_INJECTION_OPTION=$ACCOUNT_INJECTION_OPTION\n" printf "KEEP_CODE_OPTION=$KEEP_CODE_OPTION\n" printf "KEEP_DATA_OPTION=$KEEP_DATA_OPTION\n" printf "SYNC_MODE_OPTION=$SYNC_MODE_OPTION\n" +printf "CHOWN_DATA_OPTION=$CHOWN_DATA_OPTION\n" printf "JSON_RPC_OPTION=$JSON_RPC_OPTION\n" printf "UPDATE_FRONT_DB_OPTION=$UPDATE_FRONT_DB_OPTION\n" printf "REST_FUNC_OPTION=$REST_FUNC_OPTION\n" @@ -149,7 +159,7 @@ if [[ $KEEP_CODE_OPTION = "--no-keep-code" ]]; then printf '\n' printf 'Setting up new working directory..\n' sudo rm -rf /home/ain-blockchain* - CODE_CMD="cd ~; sudo mv ain-blockchain /home; sudo chmod -R 777 /home/ain-blockchain; sudo chown -R root:root /home/ain-blockchain; cd /home/ain-blockchain" + CODE_CMD="cd ~; sudo mv ain-blockchain /home; sudo chmod -R 777 /home/ain-blockchain; sudo chown -R $GCP_USER:$GCP_USER /home/ain-blockchain; cd /home/ain-blockchain" printf "\nCODE_CMD=$CODE_CMD\n" eval $CODE_CMD else @@ -157,7 +167,7 @@ else printf 'Reusing existing working directory..\n' OLD_DIR_PATH=$(find /home/ain-blockchain* -maxdepth 0 -type d) printf "OLD_DIR_PATH=$OLD_DIR_PATH\n" - CODE_CMD="sudo chmod -R 777 $OLD_DIR_PATH; sudo chown -R root:root $OLD_DIR_PATH" + CODE_CMD="sudo chmod -R 777 $OLD_DIR_PATH; sudo chown -R $GCP_USER:$GCP_USER $OLD_DIR_PATH" printf "\nCODE_CMD=$CODE_CMD\n" eval $CODE_CMD fi @@ -167,12 +177,16 @@ if [[ $KEEP_DATA_OPTION = "--no-keep-data" ]]; then sudo rm -rf /home/ain_blockchain_data/chains sudo rm -rf /home/ain_blockchain_data/snapshots sudo rm -rf /home/ain_blockchain_data/logs - DATA_CMD="sudo mkdir -p /home/ain_blockchain_data; sudo chmod -R 777 /home/ain_blockchain_data; sudo chown -R root:root /home/ain_blockchain_data" + DATA_CMD="sudo mkdir -p /home/ain_blockchain_data; sudo chmod -R 777 /home/ain_blockchain_data; sudo chown -R $GCP_USER:$GCP_USER /home/ain_blockchain_data" printf "\nDATA_CMD=$DATA_CMD\n" eval $DATA_CMD else printf 'Reusing existing data directory..\n' - DATA_CMD="sudo mkdir -p /home/ain_blockchain_data; sudo chmod -R 777 /home/ain_blockchain_data; sudo chown -R root:root /home/ain_blockchain_data" + if [[ $CHOWN_DATA_OPTION = "--no-chown-data" ]]; then + DATA_CMD="sudo mkdir -p /home/ain_blockchain_data; sudo chmod 777 /home/ain_blockchain_data; sudo chown $GCP_USER:$GCP_USER /home/ain_blockchain_data" + else + DATA_CMD="sudo mkdir -p /home/ain_blockchain_data; sudo chmod -R 777 /home/ain_blockchain_data; sudo chown -R $GCP_USER:$GCP_USER /home/ain_blockchain_data" + fi printf "\nDATA_CMD=$DATA_CMD\n" eval $DATA_CMD fi diff --git a/start_node_incremental_gcp.sh b/start_node_incremental_gcp.sh index 7d2a423ee..509296fd6 100644 --- a/start_node_incremental_gcp.sh +++ b/start_node_incremental_gcp.sh @@ -1,8 +1,8 @@ #!/bin/bash -if [[ $# -lt 3 ]] || [[ $# -gt 10 ]]; then - printf "Usage: bash start_node_incremental_gcp.sh [dev|staging|sandbox|exp|spring|summer|mainnet] [--keystore|--mnemonic|--private-key] [--keep-code|--no-keep-code] [--keep-data|--no-keep-data] [--full-sync|--fast-sync] [--json-rpc] [--update-front-db] [--rest-func]\n" - printf "Example: bash start_node_incremental_gcp.sh spring 0 0 --keystore --no-keep-code --full-sync\n" +if [[ $# -lt 4 ]] || [[ $# -gt 12 ]]; then + printf "Usage: bash start_node_incremental_gcp.sh [dev|staging|sandbox|exp|spring|summer|mainnet] [--keystore|--mnemonic|--private-key] [--keep-code|--no-keep-code] [--keep-data|--no-keep-data] [--full-sync|--fast-sync] [--chown-data|--no-chown-data] [--json-rpc] [--update-front-db] [--rest-func]\n" + printf "Example: bash start_node_incremental_gcp.sh spring gcp_user 0 0 --keystore --no-keep-code --full-sync --no-chown-data\n" printf "\n" exit fi @@ -28,6 +28,10 @@ function parse_options() { SYNC_MODE_OPTION="$option" elif [[ $option = '--fast-sync' ]]; then SYNC_MODE_OPTION="$option" + elif [[ $option = '--chown-data' ]]; then + CHOWN_DATA_OPTION="$option" + elif [[ $option = '--no-chown-data' ]]; then + CHOWN_DATA_OPTION="$option" elif [[ $option = '--json-rpc' ]]; then JSON_RPC_OPTION="$option" elif [[ $option = '--update-front-db' ]]; then @@ -44,38 +48,43 @@ function parse_options() { # Parse options. SEASON="$1" +GCP_USER="$2" + number_re='^[0-9]+$' -if ! [[ $2 =~ $number_re ]] ; then - printf "Invalid argument: $2\n" +if ! [[ $3 =~ $number_re ]] ; then + printf "Invalid argument: $3\n" exit fi -SHARD_INDEX="$2" -if ! [[ $3 =~ $number_re ]] ; then - printf "Invalid argument: $3\n" +SHARD_INDEX="$3" + +if ! [[ $4 =~ $number_re ]] ; then + printf "Invalid argument: $4\n" exit fi -if [[ "$3" -lt 0 ]] || [[ "$3" -gt 9 ]]; then - printf "Invalid argument: $3\n" +if [[ "$4" -lt 0 ]] || [[ "$4" -gt 9 ]]; then + printf "Invalid argument: $4\n" exit fi -NODE_INDEX="$3" +NODE_INDEX="$4" ACCOUNT_INJECTION_OPTION="--private-key" KEEP_CODE_OPTION="--keep-code" KEEP_DATA_OPTION="--keep-data" SYNC_MODE_OPTION="--fast-sync" +CHOWN_DATA_OPTION="--chown-data" JSON_RPC_OPTION="" UPDATE_FRONT_DB_OPTION="" REST_FUNC_OPTION="" EVENT_HANDLER_OPTION="" -ARG_INDEX=4 +ARG_INDEX=5 while [ $ARG_INDEX -le $# ]; do parse_options "${!ARG_INDEX}" ((ARG_INDEX++)) done printf "SEASON=$SEASON\n" +printf "GCP_USER=$GCP_USER\n" printf "SHARD_INDEX=$SHARD_INDEX\n" printf "NODE_INDEX=$NODE_INDEX\n" printf "\n" @@ -84,6 +93,7 @@ printf "ACCOUNT_INJECTION_OPTION=$ACCOUNT_INJECTION_OPTION\n" printf "KEEP_CODE_OPTION=$KEEP_CODE_OPTION\n" printf "KEEP_DATA_OPTION=$KEEP_DATA_OPTION\n" printf "SYNC_MODE_OPTION=$SYNC_MODE_OPTION\n" +printf "CHOWN_DATA_OPTION=$CHOWN_DATA_OPTION\n" printf "JSON_RPC_OPTION=$JSON_RPC_OPTION\n" printf "UPDATE_FRONT_DB_OPTION=$UPDATE_FRONT_DB_OPTION\n" printf "REST_FUNC_OPTION=$REST_FUNC_OPTION\n" @@ -299,20 +309,20 @@ printf "\n#### [Step 3] Set up working directory & install modules ####\n\n" if [[ $KEEP_CODE_OPTION = "--no-keep-code" ]]; then printf '\n' printf 'Setting up new working directory..\n' - CODE_CMD="cd ~; sudo mv ain-blockchain $NEW_DIR_NAME; sudo mv $NEW_DIR_NAME /home; sudo chmod -R 777 $NEW_DIR_PATH; sudo chown -R root:root $NEW_DIR_PATH" + CODE_CMD="cd ~; sudo mv ain-blockchain $NEW_DIR_NAME; sudo mv $NEW_DIR_NAME /home; sudo chmod -R 777 $NEW_DIR_PATH; sudo chown -R $GCP_USER:$GCP_USER $NEW_DIR_PATH" printf "\nCODE_CMD=$CODE_CMD\n" eval $CODE_CMD printf '\n' printf 'Installing node modules..\n' cd $NEW_DIR_PATH - INSTALL_CMD="sudo yarn install --ignore-engines" + INSTALL_CMD="yarn install --ignore-engines" printf "\nINSTALL_CMD=$INSTALL_CMD\n" eval $INSTALL_CMD else printf '\n' printf 'Reusing existing working directory..\n' - CODE_CMD="sudo chmod -R 777 $OLD_DIR_PATH; sudo chown -R root:root $OLD_DIR_PATH" + CODE_CMD="sudo chmod -R 777 $OLD_DIR_PATH; sudo chown -R $GCP_USER:$GCP_USER $OLD_DIR_PATH" printf "\nCODE_CMD=$CODE_CMD\n" eval $CODE_CMD fi @@ -333,13 +343,17 @@ if [[ $KEEP_DATA_OPTION = "--no-keep-data" ]]; then sudo rm -rf /home/ain_blockchain_data/chains sudo rm -rf /home/ain_blockchain_data/snapshots sudo rm -rf /home/ain_blockchain_data/logs - DATA_CMD="sudo mkdir -p /home/ain_blockchain_data; sudo chmod -R 777 /home/ain_blockchain_data; sudo chown -R root:root /home/ain_blockchain_data" + DATA_CMD="sudo mkdir -p /home/ain_blockchain_data; sudo chmod -R 777 /home/ain_blockchain_data; sudo chown -R $GCP_USER:$GCP_USER /home/ain_blockchain_data" printf "\nDATA_CMD=$DATA_CMD\n" eval $DATA_CMD else printf '\n' printf 'Reusing existing data directory..\n' - DATA_CMD="sudo mkdir -p /home/ain_blockchain_data; sudo chmod -R 777 /home/ain_blockchain_data; sudo chown -R root:root /home/ain_blockchain_data" + if [[ $CHOWN_DATA_OPTION = "--no-chown-data" ]]; then + DATA_CMD="sudo mkdir -p /home/ain_blockchain_data; sudo chmod 777 /home/ain_blockchain_data; sudo chown $GCP_USER:$GCP_USER /home/ain_blockchain_data" + else + DATA_CMD="sudo mkdir -p /home/ain_blockchain_data; sudo chmod -R 777 /home/ain_blockchain_data; sudo chown -R $GCP_USER:$GCP_USER /home/ain_blockchain_data" + fi printf "\nDATA_CMD=$DATA_CMD\n" eval $DATA_CMD fi diff --git a/start_tracker_genesis_gcp.sh b/start_tracker_genesis_gcp.sh index 1fa162b08..1f1c24609 100644 --- a/start_tracker_genesis_gcp.sh +++ b/start_tracker_genesis_gcp.sh @@ -1,26 +1,28 @@ #!/bin/bash -if [[ $# -gt 1 ]]; then - printf "Usage: bash start_tracker_genesis_gcp.sh [--keep-code|--no-keep-code]\n" - printf "Example: bash start_tracker_genesis_gcp.sh --keep-code\n" +if [[ $# -gt 2 ]]; then + printf "Usage: bash start_tracker_genesis_gcp.sh [--keep-code|--no-keep-code]\n" + printf "Example: bash start_tracker_genesis_gcp.sh gcp_user --keep-code\n" printf "\n" exit fi printf "\n[[[[[ start_tracker_genesis_gcp.sh ]]]]]\n\n" -KEEP_CODE_OPTION="--keep-code" +GCP_USER="$1" -if [[ $# = 1 ]]; then - if [[ $1 = '--keep-code' ]]; then - KEEP_CODE_OPTION=$1 - elif [[ $1 = '--no-keep-code' ]]; then - KEEP_CODE_OPTION=$1 +KEEP_CODE_OPTION="--keep-code" +if [[ $# = 2 ]]; then + if [[ $2 = '--keep-code' ]]; then + KEEP_CODE_OPTION=$2 + elif [[ $2 = '--no-keep-code' ]]; then + KEEP_CODE_OPTION=$2 else - printf "Invalid option: $1\n" + printf "Invalid option: $2\n" exit fi fi +printf "GCP_USER=$GCP_USER\n" printf "KEEP_CODE_OPTION=$KEEP_CODE_OPTION\n" printf '\n' @@ -32,13 +34,13 @@ if [[ $KEEP_CODE_OPTION = "--no-keep-code" ]]; then printf '\n' printf 'Creating new working directory..\n' sudo rm -rf /home/ain-blockchain* - CODE_CMD="cd ~; sudo mv ain-blockchain /home; sudo chmod -R 777 /home/ain-blockchain; sudo chown -R root:root /home/ain-blockchain; cd /home/ain-blockchain" + CODE_CMD="cd ~; sudo mv ain-blockchain /home; sudo chmod -R 777 /home/ain-blockchain; sudo chown -R $GCP_USER:$GCP_USER /home/ain-blockchain; cd /home/ain-blockchain" printf "\nCODE_CMD=$CODE_CMD\n" eval $CODE_CMD printf '\n' printf 'Installing node modules..\n' - INSTALL_CMD="sudo yarn install --ignore-engines" + INSTALL_CMD="yarn install --ignore-engines" printf "\nINSTALL_CMD=$INSTALL_CMD\n" eval $INSTALL_CMD else @@ -46,7 +48,7 @@ else printf 'Using old directory..\n' OLD_DIR_PATH=$(find /home/ain-blockchain* -maxdepth 0 -type d) printf "OLD_DIR_PATH=$OLD_DIR_PATH\n" - CODE_CMD="sudo chmod -R 777 $OLD_DIR_PATH; sudo chown -R root:root $OLD_DIR_PATH" + CODE_CMD="sudo chmod -R 777 $OLD_DIR_PATH; sudo chown -R $GCP_USER:$GCP_USER $OLD_DIR_PATH" printf "\nCODE_CMD=$CODE_CMD\n" eval $CODE_CMD fi diff --git a/start_tracker_incremental_gcp.sh b/start_tracker_incremental_gcp.sh index aef2f715e..197dc4b1f 100644 --- a/start_tracker_incremental_gcp.sh +++ b/start_tracker_incremental_gcp.sh @@ -1,8 +1,8 @@ #!/bin/bash -if [[ "$#" -gt 1 ]]; then - printf "Usage: bash start_tracker_incremental_gcp.sh [--keep-code|--no-keep-code]\n" - printf "Example: bash start_tracker_incremental_gcp.sh --keep-code\n" +if [[ "$#" -gt 2 ]]; then + printf "Usage: bash start_tracker_incremental_gcp.sh [--keep-code|--no-keep-code]\n" + printf "Example: bash start_tracker_incremental_gcp.sh gcp_user --keep-code\n" printf "\n" exit fi @@ -11,19 +11,22 @@ printf "\n[[[[[ start_tracker_incremental_gcp.sh ]]]]]\n\n" # 1. Configure env vars printf "\n#### [Step 1] Configure env vars ####\n\n" +GCP_USER="$1" + KEEP_CODE_OPTION="--keep-code" -if [[ $# = 1 ]]; then - if [[ $1 = '--keep-code' ]]; then - KEEP_CODE_OPTION=$1 - elif [[ $1 = '--no-keep-code' ]]; then - KEEP_CODE_OPTION=$1 +if [[ $# = 2 ]]; then + if [[ $2 = '--keep-code' ]]; then + KEEP_CODE_OPTION=$2 + elif [[ $2 = '--no-keep-code' ]]; then + KEEP_CODE_OPTION=$2 else - printf "Invalid option: $1\n" + printf "Invalid option: $2\n" exit fi fi +printf "GCP_USER=$GCP_USER\n" printf "KEEP_CODE_OPTION=$KEEP_CODE_OPTION\n" # 2. Get currently used directory & new directory @@ -44,20 +47,20 @@ printf "\n#### [Step 3] Set up working directory & install modules ####\n\n" if [[ $KEEP_CODE_OPTION = "--no-keep-code" ]]; then printf '\n' printf 'Setting up new data directory..\n' - CODE_CMD="cd ~; sudo mv ain-blockchain $NEW_DIR_NAME; sudo mv $NEW_DIR_NAME /home; sudo chmod -R 777 $NEW_DIR_PATH; sudo chown -R root:root $NEW_DIR_PATH" + CODE_CMD="cd ~; sudo mv ain-blockchain $NEW_DIR_NAME; sudo mv $NEW_DIR_NAME /home; sudo chmod -R 777 $NEW_DIR_PATH; sudo chown -R $GCP_USER:$GCP_USER $NEW_DIR_PATH" printf "\nCODE_CMD=$CODE_CMD\n" eval $CODE_CMD printf '\n' printf 'Installing node modules..\n' cd $NEW_DIR_PATH - INSTALL_CMD="sudo yarn install --ignore-engines" + INSTALL_CMD="yarn install --ignore-engines" printf "\nINSTALL_CMD=$INSTALL_CMD\n" eval $INSTALL_CMD else printf '\n' printf 'Reusing existing working directory..\n' - CODE_CMD="sudo chmod -R 777 $OLD_DIR_PATH; sudo chown -R root:root $OLD_DIR_PATH" + CODE_CMD="sudo chmod -R 777 $OLD_DIR_PATH; sudo chown -R $GCP_USER:$GCP_USER $OLD_DIR_PATH" printf "\nCODE_CMD=$CODE_CMD\n" eval $CODE_CMD fi diff --git a/test/integration/event_handler.test.js b/test/integration/event_handler.test.js index 6443dc271..649283133 100644 --- a/test/integration/event_handler.test.js +++ b/test/integration/event_handler.test.js @@ -11,6 +11,8 @@ const { NodeConfigs, BlockchainEventMessageTypes, BlockchainEventTypes, + TransactionStates, + FilterDeletionReasons, } = require('../../common/constants'); const CommonUtil = require('../../common/common-util'); const { @@ -40,6 +42,9 @@ const ENV_VARIABLES = [ BLOCKCHAIN_CONFIGS_DIR: 'blockchain-configs/3-nodes', PORT: 8083, P2P_PORT: 5003, ENABLE_GAS_FEE_WORKAROUND: true, ENABLE_EXPRESS_RATE_LIMIT: false, ENABLE_EVENT_HANDLER: true, EVENT_HANDLER_PORT: 6000, + // NOTE(ehgmsdk20): Epoch time for test nodes is 1000ms, so set + // EVENT_HANDLER_FILTER_DELETION_TIMEOUT_MS = 5 * epoch_ms = 5000 + EVENT_HANDLER_FILTER_DELETION_TIMEOUT_MS: 5000, }, ]; @@ -50,6 +55,7 @@ const serverList = [ ]; const testAppName = 'test'; const epochMs = _.get(BlockchainParams, 'genesis.epoch_ms', 30000); +const dummyTxHash = '0x9ac44b45853c2244715528f89072a337540c909c36bab4c9ed2fd7b7dbab47b2'; function startServer(application, serverName, envVars, stdioInherit = false) { const options = { @@ -131,7 +137,8 @@ function setValue(nodeEndpoint, ref, value) { // TODO(cshcomcom): Add to deploy_test_gcp.sh describe('Event Handler Test', function() { - let tracker_proc, server_proc_list = []; + const server_proc_list = []; + let tracker_proc; before(async function() { rimraf.sync(NodeConfigs.CHAINS_DIR); @@ -169,7 +176,8 @@ describe('Event Handler Test', function() { }); describe('Full flow', () => { - let wsClient = null, filterId = null; + let wsClient = null; + let filterId = null; it('Connect to event handler & check number of channels === 1', async function() { // Connect to event handler @@ -280,5 +288,170 @@ describe('Event Handler Test', function() { }); setValue(serverList[EVENT_HANDLER_NODE_INDEX], targetPath, 'dummy'); }); + + describe('TX_STATE_CHANGED', () => { + it('send valid transaction', function(done) { + this.timeout(10 * epochMs); + const filterId = Date.now(); + const targetPath = `/apps/${testAppName}`; + let eventTriggeredCnt = 0; + wsClient.on('message', (message) => { + const parsedMessage = JSON.parse(message); + const messageType = parsedMessage.type; + const eventType = _.get(parsedMessage, 'data.type'); + const txState = _.get(parsedMessage, 'data.payload.tx_state'); + if (messageType === BlockchainEventMessageTypes.EMIT_EVENT && + eventType === BlockchainEventTypes.TX_STATE_CHANGED) { + if (eventTriggeredCnt === 0) { + expect(txState.before).to.equal(null); + expect(txState.after).to.equal(TransactionStates.EXECUTED); + eventTriggeredCnt++; + } else { + expect(txState.before).to.equal(TransactionStates.EXECUTED); + expect(txState.after).to.equal(TransactionStates.FINALIZED); + done(); + } + } + }); + const txResult = setValue(serverList[EVENT_HANDLER_NODE_INDEX], targetPath, 'change') + .result; + const config = { + tx_hash: txResult.tx_hash, + }; + registerFilter(wsClient, filterId, BlockchainEventTypes.TX_STATE_CHANGED, config); + }); + + it('send valid transaction with two filters', function(done) { + this.timeout(10 * epochMs); + let filterId = Date.now(); + let eventTriggeredCnt = 0; + const targetPath = `/apps/${testAppName}`; + wsClient.on('message', (message) => { + const parsedMessage = JSON.parse(message); + const messageType = parsedMessage.type; + const eventType = _.get(parsedMessage, 'data.type'); + const txState = _.get(parsedMessage, 'data.payload.tx_state'); + if (messageType === BlockchainEventMessageTypes.EMIT_EVENT && + eventType === BlockchainEventTypes.TX_STATE_CHANGED) { + if (eventTriggeredCnt < 2) { + expect(txState.before).to.equal(null); + expect(txState.after).to.equal(TransactionStates.EXECUTED); + eventTriggeredCnt++; + } else { + expect(txState.before).to.equal(TransactionStates.EXECUTED); + expect(txState.after).to.equal(TransactionStates.FINALIZED); + eventTriggeredCnt++; + if (eventTriggeredCnt === 4) { + done(); + } + } + } + }); + const txResult = setValue(serverList[EVENT_HANDLER_NODE_INDEX], targetPath, 'change') + .result; + const config = { + tx_hash: txResult.tx_hash, + }; + registerFilter(wsClient, filterId, BlockchainEventTypes.TX_STATE_CHANGED, config); + filterId += 1; + registerFilter(wsClient, filterId, BlockchainEventTypes.TX_STATE_CHANGED, config); + }); + + it('send invalid transaction', function(done) { + this.timeout(10 * epochMs); + const filterId = Date.now(); + const invalidTargetPath = `/apps/dummy`; + const txResult = setValue(serverList[EVENT_HANDLER_NODE_INDEX], invalidTargetPath, 'change') + .result; + const config = { + tx_hash: txResult.tx_hash, + }; + let eventTriggeredCnt = 0; + wsClient.on('message', (message) => { + const parsedMessage = JSON.parse(message); + const messageType = parsedMessage.type; + const eventType = _.get(parsedMessage, 'data.type'); + const txState = _.get(parsedMessage, 'data.payload.tx_state'); + if (messageType === BlockchainEventMessageTypes.EMIT_EVENT && + eventType === BlockchainEventTypes.TX_STATE_CHANGED) { + if (eventTriggeredCnt === 0) { + expect(txState.before).to.equal(null); + expect(txState.after).to.equal(TransactionStates.PENDING); + eventTriggeredCnt++; + } else { + expect(txState.before).to.equal(TransactionStates.PENDING); + expect(txState.after).to.equal(TransactionStates.REVERTED); + done(); + } + } + }); + registerFilter(wsClient, filterId, BlockchainEventTypes.TX_STATE_CHANGED, config); + }); + }); + + describe('FILTER_DELETED', () => { + it('deleted because of timeout', function(done) { + this.timeout(10 * epochMs); + const filterId = Date.now(); + const config = { + tx_hash: dummyTxHash, + }; + wsClient.on('message', (message) => { + const parsedMessage = JSON.parse(message); + const messageType = parsedMessage.type; + const eventType = _.get(parsedMessage, 'data.type'); + const payload = _.get(parsedMessage, 'data.payload'); + if (messageType === BlockchainEventMessageTypes.EMIT_EVENT && + eventType === BlockchainEventTypes.FILTER_DELETED) { + expect(payload.reason).to.equal(FilterDeletionReasons.FILTER_TIMEOUT); + expect(payload.filter_id).to.equal(filterId.toString()); + // NOTE(ehgmsdk20): Wait until filter is deleted after event is emitted + setTimeout(()=>{ + const eventHandlerChannelInfo = getEventHandlerChannelInfo(); + expect(Object.keys(eventHandlerChannelInfo).length).to.equal(1); + expect(Object.values(eventHandlerChannelInfo)[0].eventFilterIds.length).to.equal(0); + done(); + }, 1000) + } + }); + registerFilter(wsClient, filterId, BlockchainEventTypes.TX_STATE_CHANGED, config); + const eventHandlerChannelInfo = getEventHandlerChannelInfo(); + expect(Object.keys(eventHandlerChannelInfo).length).to.equal(1); + expect(Object.values(eventHandlerChannelInfo)[0].eventFilterIds.length).to.equal(1); + }); + + it('deleted because end state reached', function(done) { + this.timeout(10 * epochMs); + const filterId = Date.now(); + const targetPath = `/apps/${testAppName}`; + const txResult = setValue(serverList[EVENT_HANDLER_NODE_INDEX], targetPath, 'change') + .result; + const config = { + tx_hash: txResult.tx_hash, + }; + wsClient.on('message', (message) => { + const parsedMessage = JSON.parse(message); + const messageType = parsedMessage.type; + const eventType = _.get(parsedMessage, 'data.type'); + const payload = _.get(parsedMessage, 'data.payload'); + if (messageType === BlockchainEventMessageTypes.EMIT_EVENT && + eventType === BlockchainEventTypes.FILTER_DELETED) { + expect(payload.reason).to.equal(FilterDeletionReasons.END_STATE_REACHED); + expect(payload.filter_id).to.equal(filterId.toString()); + // NOTE(ehgmsdk20): Wait until filter deleted after event emited + setTimeout(()=>{ + const eventHandlerChannelInfo = getEventHandlerChannelInfo(); + expect(Object.keys(eventHandlerChannelInfo).length).to.equal(1); + expect(Object.values(eventHandlerChannelInfo)[0].eventFilterIds.length).to.equal(0); + done(); + }, 1000) + } + }); + registerFilter(wsClient, filterId, BlockchainEventTypes.TX_STATE_CHANGED, config); + const eventHandlerChannelInfo = getEventHandlerChannelInfo(); + expect(Object.keys(eventHandlerChannelInfo).length).to.equal(1); + expect(Object.values(eventHandlerChannelInfo)[0].eventFilterIds.length).to.equal(1); + }); + }); }); }); diff --git a/test/unit/event-handler.test.js b/test/unit/event-handler.test.js index 765d19cf2..605e8fae4 100644 --- a/test/unit/event-handler.test.js +++ b/test/unit/event-handler.test.js @@ -1,17 +1,45 @@ const EventHandler = require('../../event-handler'); +const EventChannel = require('../../event-handler/event-channel'); const chai = require('chai'); const { expect, assert } = chai; const { getIpAddress } = require('../../common/network-util'); -const { NodeConfigs, BlockchainEventTypes } = require('../../common/constants'); +const { NodeConfigs, BlockchainEventTypes, TransactionStates } + = require('../../common/constants'); +const CommonUtil = require('../../common/common-util'); +const Transaction = require('../../tx-pool/transaction'); +const BlockchainNode = require('../../node'); + +const validTxHash = '0x9ac44b45853c2244715528f89072a337540c909c36bab4c9ed2fd7b7dbab47b2' +const dummyTx = new Transaction({}, 'signature', validTxHash, 'address', true, Date.now()); + +class MockWebSockect { + close() { + return; + } + terminate() { + return; + } + send() { + return; + } +} describe('EventHandler Test', () => { - let eventHandler; + let eventHandler = null; + let node = null; + const origianlfilterDeletionTimeout = NodeConfigs.EVENT_HANDLER_FILTER_DELETION_TIMEOUT_MS; - before(() => { - eventHandler = new EventHandler(); + before(async () => { + NodeConfigs.ENABLE_EVENT_HANDLER = true; + // NOTE(ehgmsdk20): Reduce EVENT_HANDLER_FILTER_DELETION_TIMEOUT_MS for faster test + NodeConfigs.EVENT_HANDLER_FILTER_DELETION_TIMEOUT_MS = 10000; + node = new BlockchainNode(); + eventHandler = node.eh; }); after(() => { + NodeConfigs.ENABLE_EVENT_HANDLER = false; + NodeConfigs.EVENT_HANDLER_FILTER_DELETION_TIMEOUT_MS = origianlfilterDeletionTimeout // TODO(cshcomcom): stop & cleanup logic }); @@ -46,7 +74,7 @@ describe('EventHandler Test', () => { })).to.throw('Invalid block_number type. (string)'); }); it('validate VALUE_CHANGED config with right config', () => { - expect(EventHandler.validateEventFilterConfig(BlockchainEventTypes.BLOCK_FINALIZED, { + expect(EventHandler.validateEventFilterConfig(BlockchainEventTypes.VALUE_CHANGED, { path: '/apps/test', })).to.equal(undefined); }); @@ -58,35 +86,263 @@ describe('EventHandler Test', () => { })).to.throw(`Invalid format path (${wrongPath})`); } }); + it('validate TX_STATE_CHANGED config with right config', () => { + expect(EventHandler.validateEventFilterConfig(BlockchainEventTypes.TX_STATE_CHANGED, { + tx_hash: validTxHash, + })).to.equal(undefined); + }); + it('validate TX_STATE_CHANGED config with wrong config', () => { + expect(() => EventHandler.validateEventFilterConfig(BlockchainEventTypes.TX_STATE_CHANGED, + {})).to.throw('config.tx_hash is missing ({})'); + expect(() => EventHandler.validateEventFilterConfig(BlockchainEventTypes.TX_STATE_CHANGED, { + tx_hash: 123, + })).to.throw('Invalid tx hash (123)'); + }); }); describe('createAndRegisterFilter', () => { + let channel; + beforeEach( async () => { // NOTE(cshcomcom): To avoid id collisions. await new Promise((resolve) => setTimeout(resolve, 1000)); + channel = new EventChannel(Date.now(), new MockWebSockect()); + eventHandler.eventChannelManager.channels[channel.id] = channel; + }); + + afterEach(() => { + eventHandler.eventChannelManager.closeChannel(channel); }); it('create and register with right config', () => { const numberOfFiltersBefore = Object.keys(eventHandler.eventFilters).length; - const now = Date.now(); - eventHandler.createAndRegisterEventFilter(now, now, + const clientFilterId = Date.now(); + eventHandler.eventChannelManager.registerFilter(channel, clientFilterId, BlockchainEventTypes.BLOCK_FINALIZED, { block_number: 100, }); const numberOfFiltersAfter = Object.keys(eventHandler.eventFilters).length; expect(numberOfFiltersBefore + 1).to.equal(numberOfFiltersAfter); - eventHandler.deregisterEventFilter(now, now); + const numberOfFiltersPerChannel = eventHandler.eventChannelManager.channels[channel.id].getFilterIdsSize(); + expect(numberOfFiltersPerChannel).to.equal(1); + eventHandler.eventChannelManager.deregisterFilter(channel, clientFilterId); }); it('create and register with wrong config', () => { const numberOfFiltersBefore = Object.keys(eventHandler.eventFilters).length; + const clientFilterId = Date.now(); try { // NOTE(cshcomcom): createAndRegisterEventFilter throws error in this case. - eventHandler.createAndRegisterEventFilter(Date.now(), Date.now(), + eventHandler.eventChannelManager.registerFilter(channel, clientFilterId, BlockchainEventTypes.BLOCK_FINALIZED, { block_number: -1, }); } catch (err) {} const numberOfFiltersAfter = Object.keys(eventHandler.eventFilters).length; expect(numberOfFiltersBefore).to.equal(numberOfFiltersAfter); + const numberOfFiltersPerChannel = eventHandler.eventChannelManager.channels[channel.id].getFilterIdsSize(); + expect(numberOfFiltersPerChannel).to.equal(0); + }); + + it('create, register and wait until deregistered', async () => { + const numberOfFiltersBefore = Object.keys(eventHandler.eventFilters).length; + const clientFilterId = Date.now(); + eventHandler.eventChannelManager.registerFilter(channel, clientFilterId, + BlockchainEventTypes.TX_STATE_CHANGED, { + tx_hash: validTxHash, + }); + let numberOfFiltersAfter = Object.keys(eventHandler.eventFilters).length; + let numberOfFiltersPerChannel = eventHandler.eventChannelManager.channels[channel.id].getFilterIdsSize(); + expect(numberOfFiltersBefore + 1).to.equal(numberOfFiltersAfter); + expect(numberOfFiltersPerChannel).to.equal(1); + expect(eventHandler.txHashToEventFilterIdSet.get(validTxHash)).to.deep.equal( + new Set([eventHandler.getGlobalFilterId(channel.id, clientFilterId)])); + await CommonUtil.sleep(NodeConfigs.EVENT_HANDLER_FILTER_DELETION_TIMEOUT_MS); + numberOfFiltersAfter = Object.keys(eventHandler.eventFilters).length; + numberOfFiltersPerChannel = eventHandler.eventChannelManager.channels[channel.id].getFilterIdsSize(); + // Filter is deleted due to filter timeout + expect(numberOfFiltersBefore).to.equal(numberOfFiltersAfter); + expect(numberOfFiltersPerChannel).to.equal(0); + expect(eventHandler.txHashToEventFilterIdSet.get(validTxHash)).to.equal(undefined); + }); + }); + + describe('emitTxStateChanged', () => { + let channel; + + beforeEach( async () => { // NOTE(cshcomcom): To avoid id collisions. + await new Promise((resolve) => setTimeout(resolve, 1000)); + channel = new EventChannel(Date.now(), new MockWebSockect()); + eventHandler.eventChannelManager.channels[channel.id] = channel; + }); + + afterEach(() => { + eventHandler.eventChannelManager.closeChannel(channel); + }); + + it('emit tx_state_changed event which is not an end state', async () => { + const numberOfFiltersBefore = Object.keys(eventHandler.eventFilters).length; + const clientFilterId = Date.now(); + eventHandler.eventChannelManager.registerFilter(channel, clientFilterId, + BlockchainEventTypes.TX_STATE_CHANGED, { + tx_hash: validTxHash, + }); + let numberOfFiltersAfter = Object.keys(eventHandler.eventFilters).length; + let numberOfFiltersPerChannel = eventHandler.eventChannelManager.channels[channel.id] + .getFilterIdsSize(); + expect(numberOfFiltersBefore + 1).to.equal(numberOfFiltersAfter); + expect(numberOfFiltersPerChannel).to.equal(1); + + eventHandler.emitTxStateChanged(dummyTx, null, TransactionStates.EXECUTED); + numberOfFiltersAfter = Object.keys(eventHandler.eventFilters).length; + numberOfFiltersPerChannel = eventHandler.eventChannelManager.channels[channel.id] + .getFilterIdsSize(); + expect(numberOfFiltersBefore + 1).to.equal(numberOfFiltersAfter); + expect(numberOfFiltersPerChannel).to.equal(1); + expect(eventHandler.txHashToEventFilterIdSet.get(validTxHash)).to.deep.equal( + new Set([eventHandler.getGlobalFilterId(channel.id, clientFilterId)])); + + // Check whether FilterDeletionTimeout is deleted + await CommonUtil.sleep(NodeConfigs.EVENT_HANDLER_FILTER_DELETION_TIMEOUT_MS); + numberOfFiltersAfter = Object.keys(eventHandler.eventFilters).length; + expect(numberOfFiltersBefore + 1).to.equal(numberOfFiltersAfter); + expect(numberOfFiltersPerChannel).to.equal(1); + expect(eventHandler.txHashToEventFilterIdSet.get(validTxHash)).to.deep.equal( + new Set([eventHandler.getGlobalFilterId(channel.id, clientFilterId)])); + + eventHandler.eventChannelManager.deregisterFilter(channel, clientFilterId); + numberOfFiltersAfter = Object.keys(eventHandler.eventFilters).length; + numberOfFiltersPerChannel = eventHandler.eventChannelManager.channels[channel.id] + .getFilterIdsSize(); + expect(numberOfFiltersBefore).to.equal(numberOfFiltersAfter); + expect(numberOfFiltersPerChannel).to.equal(0); + expect(eventHandler.txHashToEventFilterIdSet.get(validTxHash)).to.equal(undefined); + }); + + it('emit tx_state_changed event which is an end state', () => { + const numberOfFiltersBefore = Object.keys(eventHandler.eventFilters).length; + const clientFilterId = Date.now(); + eventHandler.eventChannelManager.registerFilter(channel, clientFilterId, + BlockchainEventTypes.TX_STATE_CHANGED, { + tx_hash: validTxHash, + }); + let numberOfFiltersAfter = Object.keys(eventHandler.eventFilters).length; + let numberOfFiltersPerChannel = eventHandler.eventChannelManager.channels[channel.id] + .getFilterIdsSize(); + expect(numberOfFiltersBefore + 1).to.equal(numberOfFiltersAfter); + expect(numberOfFiltersPerChannel).to.equal(1); + expect(eventHandler.txHashToEventFilterIdSet.get(validTxHash)).to.deep.equal( + new Set([eventHandler.getGlobalFilterId(channel.id, clientFilterId)])); + + eventHandler.emitTxStateChanged(dummyTx, null, TransactionStates.FINALIZED); + numberOfFiltersAfter = Object.keys(eventHandler.eventFilters).length; + numberOfFiltersPerChannel = eventHandler.eventChannelManager.channels[channel.id] + .getFilterIdsSize(); + // Filter is deleted due to end of state + expect(numberOfFiltersBefore).to.equal(numberOfFiltersAfter); + expect(numberOfFiltersPerChannel).to.equal(0); + expect(eventHandler.txHashToEventFilterIdSet.get(validTxHash)).to.equal(undefined); + }); + }); + + describe('deregisterEventFilter', () => { + let channel; + + beforeEach( async () => { // NOTE(cshcomcom): To avoid id collisions. + await new Promise((resolve) => setTimeout(resolve, 1000)); + channel = new EventChannel(Date.now(), new MockWebSockect()); + eventHandler.eventChannelManager.channels[channel.id] = channel; + }); + + afterEach(() => { + eventHandler.eventChannelManager.closeChannel(channel); + }); + + describe('deregister filter registered', () => { + beforeEach( async () => { + await new Promise((resolve) => setTimeout(resolve, 1000)); + }); + + it('BLOCK_FINALIZED event', () => { + const numberOfFiltersBefore = Object.keys(eventHandler.eventFilters).length; + const now = Date.now(); + eventHandler.createAndRegisterEventFilter(now, now, + BlockchainEventTypes.BLOCK_FINALIZED, { + block_number: 100, + }); + let numberOfFiltersAfter = Object.keys(eventHandler.eventFilters).length; + expect(numberOfFiltersBefore + 1).to.equal(numberOfFiltersAfter); + eventHandler.deregisterEventFilter(now, now); + numberOfFiltersAfter = Object.keys(eventHandler.eventFilters).length; + expect(numberOfFiltersBefore).to.equal(numberOfFiltersAfter); + }); + + it('VALUE_CHANGED event', () => { + const numberOfFiltersBefore = Object.keys(eventHandler.eventFilters).length; + const now = Date.now(); + const eventFilterId = eventHandler.createAndRegisterEventFilter(now, now, + BlockchainEventTypes.VALUE_CHANGED, { + path: '/apps/test', + }).id; + let numberOfFiltersAfter = Object.keys(eventHandler.eventFilters).length; + expect(numberOfFiltersBefore + 1).to.equal(numberOfFiltersAfter); + expect(eventHandler.stateEventTreeManager.filterIdToParsedPath[eventFilterId]).to.exist; + eventHandler.deregisterEventFilter(now, now); + numberOfFiltersAfter = Object.keys(eventHandler.eventFilters).length; + expect(numberOfFiltersBefore).to.equal(numberOfFiltersAfter); + expect(eventHandler.stateEventTreeManager.filterIdToParsedPath[eventFilterId]) + .to.be.undefined; + }); + + it('TX_STATE_CHANGED event', () => { + const numberOfFiltersBefore = Object.keys(eventHandler.eventFilters).length; + const clientFilterId = Date.now(); + const eventFilterId = eventHandler.getGlobalFilterId(channel.id, clientFilterId); + eventHandler.eventChannelManager.registerFilter(channel, clientFilterId, + BlockchainEventTypes.TX_STATE_CHANGED, { + tx_hash: validTxHash, + }); + let numberOfFiltersAfter = Object.keys(eventHandler.eventFilters).length; + let numberOfFiltersPerChannel = eventHandler.eventChannelManager.channels[channel.id] + .getFilterIdsSize(); + expect(numberOfFiltersBefore + 1).to.equal(numberOfFiltersAfter); + expect(numberOfFiltersPerChannel).to.equal(1); + expect(eventHandler.eventFilterIdToTimeoutCallback.has(eventFilterId)).to.be.true; + expect(eventHandler.txHashToEventFilterIdSet.get(validTxHash)).to.deep.equal( + new Set([eventHandler.getGlobalFilterId(channel.id, clientFilterId)])); + + eventHandler.eventChannelManager.deregisterFilter(channel, clientFilterId); + numberOfFiltersAfter = Object.keys(eventHandler.eventFilters).length; + numberOfFiltersPerChannel = eventHandler.eventChannelManager.channels[channel.id] + .getFilterIdsSize(); + expect(numberOfFiltersBefore).to.equal(numberOfFiltersAfter); + expect(numberOfFiltersPerChannel).to.equal(0); + expect(eventHandler.eventFilterIdToTimeoutCallback.has(eventFilterId)).to.be.false; + expect(eventHandler.txHashToEventFilterIdSet.get(validTxHash)).to.equal(undefined); + }); + }); + + it('deregister filter which does not exist', () => { + const numberOfFiltersBefore = Object.keys(eventHandler.eventFilters).length; + const now = Date.now(); + eventHandler.deregisterEventFilter(now, now); + const numberOfFiltersAfter = Object.keys(eventHandler.eventFilters).length; + expect(numberOfFiltersBefore).to.equal(numberOfFiltersAfter); + }); + + it('deregister filter already deregistered', () => { + const numberOfFiltersBefore = Object.keys(eventHandler.eventFilters).length; + const now = Date.now(); + eventHandler.createAndRegisterEventFilter(now, now, + BlockchainEventTypes.BLOCK_FINALIZED, { + block_number: 100, + }); + let numberOfFiltersAfter = Object.keys(eventHandler.eventFilters).length; + expect(numberOfFiltersBefore + 1).to.equal(numberOfFiltersAfter); + eventHandler.deregisterEventFilter(now, now); + numberOfFiltersAfter = Object.keys(eventHandler.eventFilters).length; + expect(numberOfFiltersBefore).to.equal(numberOfFiltersAfter); + eventHandler.deregisterEventFilter(now, now); + numberOfFiltersAfter = Object.keys(eventHandler.eventFilters).length; + expect(numberOfFiltersBefore).to.equal(numberOfFiltersAfter); }); }); }); diff --git a/tx-pool/index.js b/tx-pool/index.js index df645a77a..9aef61a05 100644 --- a/tx-pool/index.js +++ b/tx-pool/index.js @@ -46,7 +46,7 @@ class TransactionPool { } } - updateFreeTxCountPerAccount(address, change){ + updateFreeTxCountPerAccount(address, change) { const freeTxCntBefore = this.freeTxCountPerAccount.get(address) || 0; const freeTxCntAfter = freeTxCntBefore + change; if (freeTxCntAfter === 0) { @@ -77,8 +77,9 @@ class TransactionPool { this.transactions.set(tx.address, []); } this.transactions.get(tx.address).push(tx); + const txState = isExecutedTx ? TransactionStates.EXECUTED : TransactionStates.PENDING; this.transactionTracker.set(tx.hash, { - state: isExecutedTx ? TransactionStates.EXECUTED : TransactionStates.PENDING, + state: txState, number: -1, index: this.transactions.get(tx.address).length - 1, address: tx.address, @@ -94,6 +95,9 @@ class TransactionPool { this.freeTxCountTotal++; this.updateFreeTxCountPerAccount(tx.address, 1); } + if (this.node.eh) { + this.node.eh.emitTxStateChanged(tx, null, txState); + } logger.debug(`ADDING(${this.getPoolSize()}): ${JSON.stringify(tx)}`); return true; } @@ -124,7 +128,7 @@ class TransactionPool { hasPerAccountRoom(address) { return this.getPerAccountPoolSize(address) < NodeConfigs.TX_POOL_SIZE_LIMIT_PER_ACCOUNT; } - + hasPerAccountFreeRoom(address) { return this.getPerAccountFreePoolSize(address) < Math.floor(NodeConfigs.TX_POOL_SIZE_LIMIT_PER_ACCOUNT * NodeConfigs.FREE_TX_POOL_SIZE_LIMIT_RATIO_PER_ACCOUNT); } @@ -417,7 +421,20 @@ class TransactionPool { // Remove timed-out transactions from the pool. const sizeBefore = this.txCountTotal; for (const [address, txList] of this.transactions.entries()) { - const filterFunc = (tx) => !this.isTimedOutFromPool(tx.extra.created_at, blockTimestamp); + const filterFunc = (tx) => { + const isTimedOut = this.isTimedOutFromPool(tx.extra.created_at, blockTimestamp); + if (isTimedOut) { + const tracked = this.transactionTracker.get(tx.hash); + const beforeState = _.get(tracked, 'state', null); + if (tracked && !isTxInBlock(beforeState)) { + tracked.state = TransactionStates.TIMED_OUT; + } + if (this.node.eh) { + this.node.eh.emitTxStateChanged(tx, beforeState, TransactionStates.TIMED_OUT); + } + } + return !isTimedOut; + }; this.updateTxListAndCounts(address, txList, filterFunc); } const sizeAfter = this.txCountTotal; @@ -445,9 +462,13 @@ class TransactionPool { } addrToInvalidTxSet.get(address).add(hash); const tracked = this.transactionTracker.get(hash); - if (tracked && !isTxInBlock(tracked.state)) { + const beforeState = _.get(tracked, 'state', null); + if (tracked && !isTxInBlock(beforeState)) { tracked.state = TransactionStates.FAILED; } + if (this.node.eh) { + this.node.eh.emitTxStateChanged(tx, beforeState, TransactionStates.FAILED); + } }); for (const [address, invalidTxSet] of addrToInvalidTxSet.entries()) { if (this.transactions.has(address)) { @@ -512,7 +533,9 @@ class TransactionPool { const addrToTimestamp = {}; for (const voteTx of block.last_votes) { const txTimestamp = voteTx.tx_body.timestamp; - const executedAt = _.get(this.transactionTracker.get(voteTx.hash), 'executed_at', -1); + const tracked = this.transactionTracker.get(voteTx.hash); + const executedAt = _.get(tracked, 'executed_at', -1); + const beforeState = _.get(tracked, 'state', null); // voting txs with ordered nonces. this.transactionTracker.set(voteTx.hash, { state: TransactionStates.FINALIZED, @@ -526,6 +549,10 @@ class TransactionPool { executed_at: executedAt, finalized_at: finalizedAt, }); + + if (this.node.eh) { + this.node.eh.emitTxStateChanged(voteTx, beforeState, TransactionStates.FINALIZED); + } inBlockTxs.add(voteTx.hash); } this.addEvidenceTxsToTxHashSet(inBlockTxs, block.evidence); @@ -533,10 +560,14 @@ class TransactionPool { const tx = block.transactions[i]; const txNonce = tx.tx_body.nonce; const txTimestamp = tx.tx_body.timestamp; - const executedAt = _.get(this.transactionTracker.get(tx.hash), 'executed_at', -1); + const tracked = this.transactionTracker.get(tx.hash); + const executedAt = _.get(tracked, 'executed_at', -1); + const beforeState = _.get(tracked, 'state', null); // Update transaction tracker. + const txState = isFailedTx(block.receipts[i]) ? TransactionStates.REVERTED : + TransactionStates.FINALIZED; this.transactionTracker.set(tx.hash, { - state: isFailedTx(block.receipts[i]) ? TransactionStates.REVERTED : TransactionStates.FINALIZED, + state: txState, number: block.number, index: i, address: tx.address, @@ -548,6 +579,10 @@ class TransactionPool { finalized_at: finalizedAt, }); inBlockTxs.add(tx.hash); + + if (this.node.eh) { + this.node.eh.emitTxStateChanged(tx, beforeState, txState); + } const lastNonce = addrToNonce[tx.address]; const lastTimestamp = addrToTimestamp[tx.address]; if (txNonce >= 0 && (lastNonce === undefined || txNonce > lastNonce)) {