Skip to content

Commit

Permalink
feat: singleton (#35)
Browse files Browse the repository at this point in the history
* add single axios

* add apiRequester singletone

* remove unused code
  • Loading branch information
JSHan94 authored Jun 3, 2024
1 parent 64ccca1 commit 10b6803
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 82 deletions.
25 changes: 12 additions & 13 deletions src/config.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { APIRequester, LCDClient as LCDClientL1 } from 'initia-l1'
import { LCDClient as LCDClientL1 } from 'initia-l1'
import { LCDClient as LCDClientL2 } from 'initia-l2'
import { validateCelestiaConfig } from './celestia/utils'
import * as dotenv from 'dotenv'
import http from 'http'
import https from 'https'
import APIRequesterSingleton from './lib/apiRequester'

const envFile =
process.env.NODE_ENV === 'test' || !process.env.WORKER_NAME
Expand Down Expand Up @@ -67,11 +66,6 @@ const supportedPublishBatchTargets = ['l1', 'celestia']

const getUri = (uri, defaultUri = 'http://127.0.0.1:1317') =>
uri ? uri.split(',')[0] : defaultUri
const createApiRequester = (uri) =>
new APIRequester(getUri(uri), {
httpAgent: new http.Agent({ keepAlive: true }),
httpsAgent: new https.Agent({ keepAlive: true })
})

export const config = {
EXECUTOR_PORT: EXECUTOR_PORT ? parseInt(EXECUTOR_PORT) : 5000,
Expand Down Expand Up @@ -155,7 +149,7 @@ export const config = {
gasAdjustment: '2',
chainId: L1_CHAIN_ID
},
createApiRequester(L1_LCD_URI)
APIRequesterSingleton.getInstance(getUri(L1_LCD_URI))
),
l2lcd: new LCDClientL2(
getUri(L2_LCD_URI),
Expand All @@ -164,7 +158,7 @@ export const config = {
gasAdjustment: '2',
chainId: L2_CHAIN_ID
},
createApiRequester(L2_LCD_URI)
APIRequesterSingleton.getInstance(getUri(L2_LCD_URI))
),
batchlcd: (() => {
const uri =
Expand All @@ -178,7 +172,7 @@ export const config = {
gasAdjustment: '2',
chainId: BATCH_CHAIN_ID ? BATCH_CHAIN_ID : L1_CHAIN_ID
},
createApiRequester(uri)
APIRequesterSingleton.getInstance(getUri(uri))
)
})(),
SLACK_WEB_HOOK: SLACK_WEB_HOOK ? SLACK_WEB_HOOK : '',
Expand Down Expand Up @@ -230,8 +224,13 @@ validateCelestiaConfig()
// - false for individual worker files
//
// NOTE: this needs to be a function instead of const, as it needs to be hoisted
export function isInvokedFromEntrypoint(module: NodeJS.Module | undefined): boolean {
return require.main === module && (module?.filename.includes("entrypoint") || false)
export function isInvokedFromEntrypoint(
module: NodeJS.Module | undefined
): boolean {
return (
require.main === module &&
(module?.filename.includes('entrypoint') || false)
)
}

export const INTERVAL_BATCH = 100_000
Expand Down
40 changes: 25 additions & 15 deletions src/entrypoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,33 @@ import { startOutput } from './worker/outputSubmitter'
import { isInvokedFromEntrypoint } from './config'

const modeToEntrypointMap: Record<string, () => Promise<void>> = {
executor: startExecutor,
batch: startBatch,
challenger: startChallenger,
output: startOutput,
executor: startExecutor,
batch: startBatch,
challenger: startChallenger,
output: startOutput
}

const entrypoint = (mode: string): Promise<void> => {
return Promise.resolve()
.then(() => console.log("Starting worker in mode:", mode))
.then(() => modeToEntrypointMap[mode] || Promise.reject(`unknown mode: ${mode}, available options = ${Object.keys(modeToEntrypointMap)}`))
.then(workerFn => workerFn())
return (
Promise.resolve()
.then(() => console.log('Starting worker in mode:', mode))
.then(
() =>
modeToEntrypointMap[mode] ||
Promise.reject(
`unknown mode: ${mode}, available options = ${Object.keys(modeToEntrypointMap)}`
)
)
.then((workerFn) => workerFn())

// sink any rejection to console.error, and exit with code 127 (command not found)
.catch(e => {
console.error(e)
process.exit(127)
})
}
// sink any rejection to console.error, and exit with code 127 (command not found)
.catch((e) => {
console.error(e)
process.exit(127)
})
)
};
// -------------------------------------
;(async() => isInvokedFromEntrypoint(module) && entrypoint(process.env.WORKER_NAME || process.argv[2]))()
(async () =>
isInvokedFromEntrypoint(module) &&
entrypoint(process.env.WORKER_NAME || process.argv[2]))()
21 changes: 21 additions & 0 deletions src/lib/apiRequester.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { APIRequester } from 'initia-l1'
import http from 'http'
import https from 'https'

class APIRequesterSingleton {
private static instances: { [key: string]: APIRequester } = {}

private constructor() {}

public static getInstance(uri: string): APIRequester {
if (!APIRequesterSingleton.instances[uri]) {
APIRequesterSingleton.instances[uri] = new APIRequester(uri, {
httpAgent: new http.Agent({ keepAlive: true }),
httpsAgent: new https.Agent({ keepAlive: true })
})
}
return APIRequesterSingleton.instances[uri]
}
}

export default APIRequesterSingleton
25 changes: 25 additions & 0 deletions src/lib/axios.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import axios, { AxiosInstance } from 'axios'
import https from 'https'
import http from 'http'

class AxiosSingleton {
private static instance: AxiosInstance

private constructor() {}

public static getInstance(): AxiosInstance {
if (!AxiosSingleton.instance) {
AxiosSingleton.instance = axios.create({
headers: {
'Content-Type': 'application/json',
'User-Agent': 'initia-rollup'
},
httpsAgent: new https.Agent({ keepAlive: true }),
httpAgent: new http.Agent({ keepAlive: true })
})
}
return AxiosSingleton.instance
}
}

export default AxiosSingleton
31 changes: 10 additions & 21 deletions src/lib/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
OutputResponse,
WithdrawalTxResponse
} from './types'
import axios from 'axios'
import AxiosSingleton from './axios'

/// LCD query

Expand Down Expand Up @@ -44,8 +44,8 @@ export async function getWithdrawalTxFromExecutor(
sequence: number
): Promise<WithdrawalTxResponse> {
const url = `${config.EXECUTOR_URI}/tx/withdrawal/${bridge_id}/${sequence}`

const res = await axios.get(url)
const axiosInstance = AxiosSingleton.getInstance()
const res = await axiosInstance.get(url)
return res.data
}

Expand All @@ -54,7 +54,8 @@ export async function getDepositTxFromExecutor(
sequence: number
): Promise<DepositTxResponse> {
const url = `${config.EXECUTOR_URI}/tx/deposit/${bridge_id}/${sequence}`
const res = await axios.get(url)
const axiosInstance = AxiosSingleton.getInstance()
const res = await axiosInstance.get(url)
return res.data
}

Expand All @@ -63,27 +64,15 @@ export async function getOutputFromExecutor(
outputIndex: number
): Promise<OutputResponse> {
const url = `${config.EXECUTOR_URI}/output/${outputIndex}`
const res = await axios.get(url)
const axiosInstance = AxiosSingleton.getInstance()
const res = await axiosInstance.get(url)
return res.data
}

// fetching the latest output from l2 chain
export async function getLatestOutputFromExecutor(): Promise<OutputResponse> {
const url = `${config.EXECUTOR_URI}/output/latest`
const res = await axios.get(url)
const axiosInstance = AxiosSingleton.getInstance()
const res = await axiosInstance.get(url)
return res.data
}

export const checkHealth = async (url: string, timeout = 60_000) => {
const startTime = Date.now()

while (Date.now() - startTime < timeout) {
try {
const response = await axios.get(url)
if (response.status === 200) return
} catch {
continue
}
await new Promise((res) => setTimeout(res, 1_000))
}
}
}
13 changes: 3 additions & 10 deletions src/lib/rpc.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import * as winston from 'winston'
import axios, { AxiosRequestConfig } from 'axios'
import Websocket from 'ws'
import * as http from 'http'
import * as https from 'https'
import { delay } from 'bluebird'
import { SECOND } from '../config'
import AxiosSingleton from './axios'

const MAX_RETRY = 10

Expand Down Expand Up @@ -205,14 +205,7 @@ export class RPCClient {
path: string,
params?: Record<string, string>
): Promise<any> {
const options: AxiosRequestConfig = {
headers: {
'Content-Type': 'application/json',
'User-Agent': 'initia-rollup'
},
httpsAgent: new https.Agent({ keepAlive: true }),
httpAgent: new http.Agent({ keepAlive: true })
}
const axiosInsance = AxiosSingleton.getInstance()

let url = `${this.rpcUrl}${path}`
params &&
Expand All @@ -225,7 +218,7 @@ export class RPCClient {
}

try {
const response = await axios.get(url, options)
const response = await axiosInsance.get(url)
if (response.status !== 200) {
throw new Error(`Invalid status code: ${response.status}`)
}
Expand Down
15 changes: 4 additions & 11 deletions src/lib/slack.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,11 @@
import axios from 'axios'
import BigNumber from 'bignumber.js'
import { config } from '../config'
import * as http from 'http'
import * as https from 'https'
import UnconfirmedTxEntity from '../orm/executor/UnconfirmedTxEntity'
import { ChallengedOutputEntity } from '../orm/index'
import AxiosSingleton from './axios'

const postedKeys = new Set<string>()

const ax = axios.create({
httpAgent: new http.Agent({ keepAlive: true }),
httpsAgent: new https.Agent({ keepAlive: true }),
timeout: 15000
})

export async function notifySlack(
key: string,
text: { text: string },
Expand All @@ -23,15 +15,16 @@ export async function notifySlack(
return

const keyExists = postedKeys.has(key)
const axiosInsance = AxiosSingleton.getInstance()

if (isError) {
if (!keyExists) {
await ax.post(config.SLACK_WEB_HOOK, text)
await axiosInsance.post(config.SLACK_WEB_HOOK, text)
postedKeys.add(key)
}
} else {
if (keyExists) {
await ax.post(config.SLACK_WEB_HOOK, text)
await axiosInsance.post(config.SLACK_WEB_HOOK, text)
postedKeys.delete(key)
}
}
Expand Down
25 changes: 13 additions & 12 deletions src/worker/batchSubmitter/batchSubmitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ export class BatchSubmitter {
}
}


async processBatch() {
await this.db.transaction(async (manager: EntityManager) => {
const latestBatch = await this.getStoredBatch(manager)
Expand All @@ -78,26 +77,28 @@ export class BatchSubmitter {
return
}

for (let i = output.startBlockNumber; i <= output.endBlockNumber; i += maxBulkSize) {
await this.processBatchRange(manager, i, Math.min(i + maxBulkSize - 1, output.endBlockNumber))
for (
let i = output.startBlockNumber;
i <= output.endBlockNumber;
i += maxBulkSize
) {
await this.processBatchRange(
manager,
i,
Math.min(i + maxBulkSize - 1, output.endBlockNumber)
)
}

logger.info(
`${this.batchIndex}th batch (${output.startBlockNumber}, ${output.endBlockNumber}) is successfully saved`
)
})
}

async processBatchRange(manager:EntityManager, start: number, end: number) {
async processBatchRange(manager: EntityManager, start: number, end: number) {
const batch = await this.getBatch(start, end)
const batchInfo: string[] = await this.publishBatch(batch)
await this.saveBatchToDB(
manager,
batchInfo,
this.batchIndex,
start,
end
)
await this.saveBatchToDB(manager, batchInfo, this.batchIndex, start, end)
}

// Get [start, end] batch from L2 and last commit info
Expand Down

0 comments on commit 10b6803

Please sign in to comment.