Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply idle time check to event handler channels #1293

Merged
merged 10 commits into from
Aug 9, 2024
3 changes: 3 additions & 0 deletions blockchain-configs/afan-shard/node_params.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
"ENABLE_REST_FUNCTION_CALL": false,
"ENABLE_STATUS_REPORT_TO_TRACKER": true,
"ENABLE_TX_SIG_VERIF_WORKAROUND": false,
"EVENT_HANDLER_CHANNEL_IDLE_CHECK_INTERVAL_MS": 10000,
"EVENT_HANDLER_CHANNEL_IDLE_TIME_LIMIT_SECS": 600,
"EVENT_HANDLER_CHANNEL_LIFE_TIME_LIMIT_SECS": 3600,
"EVENT_HANDLER_FILTER_DELETION_TIMEOUT_MS": 15000,
"EVENT_HANDLER_HEARTBEAT_INTERVAL_MS": 15000,
"EVENT_HANDLER_PORT": 5100,
Expand Down
3 changes: 3 additions & 0 deletions blockchain-configs/base/node_params.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
"ENABLE_REST_FUNCTION_CALL": false,
"ENABLE_STATUS_REPORT_TO_TRACKER": true,
"ENABLE_TX_SIG_VERIF_WORKAROUND": false,
"EVENT_HANDLER_CHANNEL_IDLE_CHECK_INTERVAL_MS": 10000,
"EVENT_HANDLER_CHANNEL_IDLE_TIME_LIMIT_SECS": 600,
"EVENT_HANDLER_CHANNEL_LIFE_TIME_LIMIT_SECS": 3600,
"EVENT_HANDLER_FILTER_DELETION_TIMEOUT_MS": 100000,
"EVENT_HANDLER_HEARTBEAT_INTERVAL_MS": 15000,
"EVENT_HANDLER_PORT": 5100,
Expand Down
3 changes: 3 additions & 0 deletions blockchain-configs/he-shard/node_params.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
"ENABLE_REST_FUNCTION_CALL": false,
"ENABLE_STATUS_REPORT_TO_TRACKER": true,
"ENABLE_TX_SIG_VERIF_WORKAROUND": false,
"EVENT_HANDLER_CHANNEL_IDLE_CHECK_INTERVAL_MS": 10000,
"EVENT_HANDLER_CHANNEL_IDLE_TIME_LIMIT_SECS": 600,
"EVENT_HANDLER_CHANNEL_LIFE_TIME_LIMIT_SECS": 3600,
"EVENT_HANDLER_FILTER_DELETION_TIMEOUT_MS": 15000,
"EVENT_HANDLER_HEARTBEAT_INTERVAL_MS": 15000,
"EVENT_HANDLER_PORT": 5100,
Expand Down
3 changes: 3 additions & 0 deletions blockchain-configs/mainnet-prod/node_params.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
"ENABLE_REST_FUNCTION_CALL": false,
"ENABLE_STATUS_REPORT_TO_TRACKER": true,
"ENABLE_TX_SIG_VERIF_WORKAROUND": false,
"EVENT_HANDLER_CHANNEL_IDLE_CHECK_INTERVAL_MS": 10000,
"EVENT_HANDLER_CHANNEL_IDLE_TIME_LIMIT_SECS": 600,
"EVENT_HANDLER_CHANNEL_LIFE_TIME_LIMIT_SECS": 3600,
"EVENT_HANDLER_FILTER_DELETION_TIMEOUT_MS": 100000,
"EVENT_HANDLER_HEARTBEAT_INTERVAL_MS": 15000,
"EVENT_HANDLER_PORT": 5100,
Expand Down
3 changes: 3 additions & 0 deletions blockchain-configs/sim-shard/node_params.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
"ENABLE_REST_FUNCTION_CALL": false,
"ENABLE_STATUS_REPORT_TO_TRACKER": true,
"ENABLE_TX_SIG_VERIF_WORKAROUND": false,
"EVENT_HANDLER_CHANNEL_IDLE_CHECK_INTERVAL_MS": 10000,
"EVENT_HANDLER_CHANNEL_IDLE_TIME_LIMIT_SECS": 600,
"EVENT_HANDLER_CHANNEL_LIFE_TIME_LIMIT_SECS": 3600,
"EVENT_HANDLER_FILTER_DELETION_TIMEOUT_MS": 100000,
"EVENT_HANDLER_HEARTBEAT_INTERVAL_MS": 15000,
"EVENT_HANDLER_PORT": 5100,
Expand Down
3 changes: 3 additions & 0 deletions blockchain-configs/testnet-dev/node_params.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
"ENABLE_REST_FUNCTION_CALL": false,
"ENABLE_STATUS_REPORT_TO_TRACKER": true,
"ENABLE_TX_SIG_VERIF_WORKAROUND": false,
"EVENT_HANDLER_CHANNEL_IDLE_CHECK_INTERVAL_MS": 10000,
"EVENT_HANDLER_CHANNEL_IDLE_TIME_LIMIT_SECS": 600,
"EVENT_HANDLER_CHANNEL_LIFE_TIME_LIMIT_SECS": 3600,
"EVENT_HANDLER_FILTER_DELETION_TIMEOUT_MS": 100000,
"EVENT_HANDLER_HEARTBEAT_INTERVAL_MS": 15000,
"EVENT_HANDLER_PORT": 5100,
Expand Down
3 changes: 3 additions & 0 deletions blockchain-configs/testnet-exp/node_params.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
"ENABLE_REST_FUNCTION_CALL": false,
"ENABLE_STATUS_REPORT_TO_TRACKER": true,
"ENABLE_TX_SIG_VERIF_WORKAROUND": false,
"EVENT_HANDLER_CHANNEL_IDLE_CHECK_INTERVAL_MS": 10000,
"EVENT_HANDLER_CHANNEL_IDLE_TIME_LIMIT_SECS": 600,
"EVENT_HANDLER_CHANNEL_LIFE_TIME_LIMIT_SECS": 3600,
"EVENT_HANDLER_FILTER_DELETION_TIMEOUT_MS": 100000,
"EVENT_HANDLER_HEARTBEAT_INTERVAL_MS": 15000,
"EVENT_HANDLER_PORT": 5100,
Expand Down
3 changes: 3 additions & 0 deletions blockchain-configs/testnet-prod/node_params.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
"ENABLE_REST_FUNCTION_CALL": false,
"ENABLE_STATUS_REPORT_TO_TRACKER": true,
"ENABLE_TX_SIG_VERIF_WORKAROUND": false,
"EVENT_HANDLER_CHANNEL_IDLE_CHECK_INTERVAL_MS": 10000,
"EVENT_HANDLER_CHANNEL_IDLE_TIME_LIMIT_SECS": 600,
"EVENT_HANDLER_CHANNEL_LIFE_TIME_LIMIT_SECS": 3600,
"EVENT_HANDLER_FILTER_DELETION_TIMEOUT_MS": 100000,
"EVENT_HANDLER_HEARTBEAT_INTERVAL_MS": 15000,
"EVENT_HANDLER_PORT": 5100,
Expand Down
3 changes: 3 additions & 0 deletions blockchain-configs/testnet-sandbox/node_params.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
"ENABLE_REST_FUNCTION_CALL": false,
"ENABLE_STATUS_REPORT_TO_TRACKER": true,
"ENABLE_TX_SIG_VERIF_WORKAROUND": false,
"EVENT_HANDLER_CHANNEL_IDLE_CHECK_INTERVAL_MS": 10000,
"EVENT_HANDLER_CHANNEL_IDLE_TIME_LIMIT_SECS": 600,
"EVENT_HANDLER_CHANNEL_LIFE_TIME_LIMIT_SECS": 3600,
"EVENT_HANDLER_FILTER_DELETION_TIMEOUT_MS": 100000,
"EVENT_HANDLER_HEARTBEAT_INTERVAL_MS": 15000,
"EVENT_HANDLER_PORT": 5100,
Expand Down
3 changes: 3 additions & 0 deletions blockchain-configs/testnet-staging/node_params.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
"ENABLE_REST_FUNCTION_CALL": false,
"ENABLE_STATUS_REPORT_TO_TRACKER": true,
"ENABLE_TX_SIG_VERIF_WORKAROUND": false,
"EVENT_HANDLER_CHANNEL_IDLE_CHECK_INTERVAL_MS": 10000,
"EVENT_HANDLER_CHANNEL_IDLE_TIME_LIMIT_SECS": 600,
"EVENT_HANDLER_CHANNEL_LIFE_TIME_LIMIT_SECS": 3600,
"EVENT_HANDLER_FILTER_DELETION_TIMEOUT_MS": 100000,
"EVENT_HANDLER_HEARTBEAT_INTERVAL_MS": 15000,
"EVENT_HANDLER_PORT": 5100,
Expand Down
10 changes: 9 additions & 1 deletion common/network-util.js
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,18 @@ function getIpAddress(internal = false) {
});
}

// NOTE(minsulee2): This builds the URL using a client socket in the server side.
function buildRemoteUrlFromSocket(socket) {
const remoteAddress = _.get(socket, '_socket.remoteAddress', '');
const remotePort = _.get(socket, '_socket.remotePort', '');
return `${remoteAddress}:${remotePort}`;
}

module.exports = {
sendTxAndWaitForFinalization,
sendSignedTx,
signAndSendTx,
sendGetRequest,
getIpAddress
getIpAddress,
buildRemoteUrlFromSocket,
};
69 changes: 64 additions & 5 deletions event-handler/event-channel-manager.js
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
const logger = new (require('../logger'))('EVENT_CHANNEL_MANAGER');
const EventChannel = require('./event-channel');
const ws = require('ws');
const { getIpAddress } = require('../common/network-util');
const {
BlockchainEventMessageTypes,
NodeConfigs,
Expand All @@ -21,10 +20,11 @@ class EventChannelManager {
this.channels = {}; // [channelId]: Channel
this.filterIdToChannelId = {}; // [globalFilterId]: channelId
this.heartbeatInterval = null;
this.idleCheckInterval = null;
}

getNetworkInfo() {
const ipAddr = (NodeConfigs.HOSTING_ENV === HostingEnvs.COMCOM || NodeConfigs.HOSTING_ENV === HostingEnvs.LOCAL) ? this.node.ipAddrInternal : this.node.ipAddrExternal;
const ipAddr = this.node.ipAddrExternal;
const eventHandlerUrl = new URL(`ws://${ipAddr}:${NodeConfigs.EVENT_HANDLER_PORT}`);
return {
url: eventHandlerUrl.toString(),
Expand All @@ -35,10 +35,42 @@ class EventChannelManager {
}
}

getChannelStatus() {
const channelStats = this.getChannelStats();
return {
maxNumEventChannels: NodeConfigs.MAX_NUM_EVENT_CHANNELS,
numEventChannels: this.getNumEventChannels(),
channelIdleTimeLimitSecs: NodeConfigs.EVENT_HANDLER_CHANNEL_IDLE_TIME_LIMIT_SECS,
maxChannelLifeTimeMs: channelStats.maxLifeTimeMs,
maxChannelIdleTimeMs: channelStats.maxIdleTimeMs,
channelInfo: this.getChannelInfo(),
}
}

getNumEventChannels() {
return Object.keys(this.channels).length;
}

getChannelStats() {
let maxLifeTimeMs = 0;
let maxIdleTimeMs = 0;
for (const channel of Object.values(this.channels)) {
const lifeTimeMs = channel.getLifeTimeMs();
const idleTimeMs = channel.getIdleTimeMs();
if (lifeTimeMs > maxLifeTimeMs) {
maxLifeTimeMs = lifeTimeMs;
}
if (idleTimeMs > maxIdleTimeMs) {
maxIdleTimeMs = idleTimeMs;
}
}

return {
maxLifeTimeMs: maxLifeTimeMs,
maxIdleTimeMs: maxIdleTimeMs,
}
}

getChannelInfo() {
const channelInfo = {};
for (const [channelId, channel] of Object.entries(this.channels)) {
Expand All @@ -61,18 +93,21 @@ class EventChannelManager {
this.handleConnection(ws);
});
this.startHeartbeat(this.wsServer);
this.startIdleCheck();
}

handleConnection(webSocket) {
const LOG_HEADER = 'handleConnection';
try {
if (this.getNumEventChannels() >= NodeConfigs.MAX_NUM_EVENT_CHANNELS) {
webSocket.terminate();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

throw new EventHandlerError(EventHandlerErrorCode.EVENT_CHANNEL_EXCEEDS_SIZE_LIMIT,
`The number of event channels exceeds its limit ` +
`(${NodeConfigs.MAX_NUM_EVENT_CHANNELS})`);
}
const channelId = Date.now(); // NOTE: Only used in blockchain
if (this.channels[channelId]) { // TODO(cshcomcom): Retry logic.
webSocket.terminate();
throw new EventHandlerError(EventHandlerErrorCode.DUPLICATED_CHANNEL_ID,
`Channel ID ${channelId} is already in use`);
}
Expand All @@ -99,6 +134,7 @@ class EventChannelManager {
if (messageType === BlockchainEventMessageTypes.PONG) {
this.handlePong(webSocket);
} else {
channel.setLastMessagingTimeMs(Date.now());
this.handleMessage(channel, messageType, messageData);
}
} catch (err) {
Expand Down Expand Up @@ -308,6 +344,7 @@ class EventChannelManager {
close() {
const LOG_HEADER = 'close';
this.stopHeartbeat();
this.stopIdleCheck();
this.wsServer.close(() => {
logger.info(`[${LOG_HEADER}] Closed event channel manager's socket`);
});
Expand All @@ -316,7 +353,7 @@ class EventChannelManager {
closeChannel(channel) {
const LOG_HEADER = 'closeChannel';
try {
logger.info(`[${LOG_HEADER}] Close channel ${channel.id}`);
logger.info(`[${LOG_HEADER}] Closing channel ${channel.id}`);
channel.webSocket.terminate();
const filterIds = channel.getAllFilterIds();
for (const filterId of filterIds) {
Expand All @@ -326,7 +363,7 @@ class EventChannelManager {
}
delete this.channels[channel.id];
} catch (err) {
logger.error(`[${LOG_HEADER}] Error while close channel (channelId: ${channel.id}, ` +
logger.error(`[${LOG_HEADER}] Error while closing channel (channelId: ${channel.id}, ` +
`message:${err.message})`);
}
}
Expand All @@ -340,7 +377,25 @@ class EventChannelManager {
ws.isAlive = false;
this.sendPing(ws);
});
}, NodeConfigs.EVENT_HANDLER_HEARTBEAT_INTERVAL_MS || 15000);
}, NodeConfigs.EVENT_HANDLER_HEARTBEAT_INTERVAL_MS);
}

startIdleCheck() {
const LOG_HEADER = 'startIdleCheck';
this.idleCheckInterval = setInterval(() => {
for (const channel of Object.values(this.channels)) {
const idleTimeMs = channel.getIdleTimeMs();
if (idleTimeMs > NodeConfigs.EVENT_HANDLER_CHANNEL_IDLE_TIME_LIMIT_SECS * 1000) {
logger.info(`[${LOG_HEADER}] Closing long-idle channel: ${JSON.stringify(channel.toObject())}`);
this.closeChannel(channel);
}
const lifeTimeMs = channel.getLifeTimeMs();
if (lifeTimeMs > NodeConfigs.EVENT_HANDLER_CHANNEL_LIFE_TIME_LIMIT_SECS * 1000) {
logger.info(`[${LOG_HEADER}] Closing long-life channel: ${JSON.stringify(channel.toObject())}`);
this.closeChannel(channel);
}
}
}, NodeConfigs.EVENT_HANDLER_CHANNEL_IDLE_CHECK_INTERVAL_MS);
}

sendPing(webSocket) {
Expand All @@ -351,6 +406,10 @@ class EventChannelManager {
stopHeartbeat() {
clearInterval(this.heartbeatInterval);
}

stopIdleCheck() {
clearInterval(this.idleCheckInterval);
}
}

module.exports = EventChannelManager;
22 changes: 22 additions & 0 deletions event-handler/event-channel.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
const { buildRemoteUrlFromSocket } = require('../common/network-util');

class EventChannel {
constructor(id, webSocket) {
this.id = id;
this.webSocket = webSocket;
this.remoteUrl = buildRemoteUrlFromSocket(webSocket);
this.eventFilterIds = new Set();
const curTimeMs = Date.now();
this.creationTimeMs = curTimeMs;
this.lastMessagingTimeMs = curTimeMs;
}

setLastMessagingTimeMs(timeMs) {
this.lastMessagingTimeMs = timeMs;
}

getFilterIdsSize() {
Expand All @@ -21,10 +31,22 @@ class EventChannel {
return this.eventFilterIds.delete(filterId);
}

getLifeTimeMs() {
return Date.now() - this.creationTimeMs;
}

getIdleTimeMs() {
return Date.now() - this.lastMessagingTimeMs;
}

toObject() {
return {
id: this.id,
remoteUrl: this.remoteUrl,
eventFilterIds: [...this.eventFilterIds],
lastMessagingTimeMs: this.lastMessagingTimeMs,
lifeTimeMs: this.getLifeTimeMs(),
idleTimeMs: this.getIdleTimeMs(),
};
}
}
Expand Down
27 changes: 23 additions & 4 deletions event-handler/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ class EventHandler {
return {
isEnabled: true,
networkInfo: this.eventChannelManager.getNetworkInfo(),
filterInfo: this.getFilterInfo(),
channelInfo: this.eventChannelManager.getChannelInfo(),
channelStatus: this.eventChannelManager.getChannelStatus(),
filterStatus: this.getFilterStatus(),
};
}

Expand All @@ -45,8 +45,19 @@ class EventHandler {
maxNumEventFilters: NodeConfigs.MAX_NUM_EVENT_FILTERS,
numEventFilters: 0,
},
filterInfo: {},
channelInfo: {},
channelStatus: {
maxNumEventChannels: NodeConfigs.MAX_NUM_EVENT_CHANNELS,
numEventChannels: 0,
channelIdleTimeLimitSecs: NodeConfigs.EVENT_HANDLER_CHANNEL_IDLE_TIME_LIMIT_SECS,
maxChannelLifeTimeMs: 0,
maxChannelIdleTimeMs: 0,
channelInfo: {},
},
filterStatus: {
maxNumEventFilters: NodeConfigs.MAX_NUM_EVENT_FILTERS,
numEventFilters: 0,
filterInfo: {},
},
};
}

Expand All @@ -66,6 +77,14 @@ class EventHandler {
return true;
}

getFilterStatus() {
return {
maxNumEventFilters: NodeConfigs.MAX_NUM_EVENT_FILTERS,
numEventFilters: this.getNumEventFilters(),
filterInfo: this.getFilterInfo(),
};
}

getNumEventFilters() {
return Object.keys(this.eventFilters).length;
}
Expand Down
Loading
Loading