diff --git a/zp-relayer/utils/redisFields.ts b/zp-relayer/utils/redisFields.ts index abbbdd9d..896b627c 100644 --- a/zp-relayer/utils/redisFields.ts +++ b/zp-relayer/utils/redisFields.ts @@ -3,15 +3,12 @@ import { redis } from '@/services/redisClient' import { web3 } from '@/services/web3' import config from '@/config' import { getNonce } from './web3' -import { pool } from '@/pool' export enum RelayerKeys { - TRANSFER_NUM = 'relayer:transferNum', NONCE = `relayer:nonce`, } export const readNonce = readFieldBuilder(RelayerKeys.NONCE, () => getNonce(web3, config.relayerAddress)) -export const readTransferNum = readFieldBuilder(RelayerKeys.TRANSFER_NUM, () => pool.getContractIndex()) function readFieldBuilder(key: RelayerKeys, forceUpdateFunc?: Function) { return async (forceUpdate?: boolean) => { diff --git a/zp-relayer/workers/poolTxWorker.ts b/zp-relayer/workers/poolTxWorker.ts index 531a9b91..0553c1dd 100644 --- a/zp-relayer/workers/poolTxWorker.ts +++ b/zp-relayer/workers/poolTxWorker.ts @@ -3,8 +3,8 @@ import { Job, Worker } from 'bullmq' import { web3, web3Redundant } from '@/services/web3' import { logger } from '@/services/appLogger' import { poolTxQueue, BatchTx, PoolTxResult, TxPayload } from '@/queue/poolTxQueue' -import { TX_QUEUE_NAME, OUTPLUSONE } from '@/utils/constants' -import { readNonce, updateField, RelayerKeys, updateNonce } from '@/utils/redisFields' +import { TX_QUEUE_NAME } from '@/utils/constants' +import { readNonce, updateNonce } from '@/utils/redisFields' import { buildPrefixedMemo, truncateMemoTxPrefix, waitForFunds, withErrorLog, withMutex } from '@/utils/helpers' import { signTransaction, sendTransaction } from '@/tx/signAndSend' import { Pool, pool } from '@/pool' @@ -77,11 +77,11 @@ export async function createPoolTxWorker( }, config.relayerPrivateKey ) + jobLogger.info('Sending tx', { txHash }) try { await sendTransaction(web3Redundant, rawTransaction) } catch (e) { - const err = e as Error - if (isInsufficientBalanceError(err)) { + if (isInsufficientBalanceError(e as Error)) { const minimumBalance = toBN(gas).mul(toBN(getMaxRequiredGasPrice(gasPriceWithExtra))) jobLogger.error('Insufficient balance, waiting for funds', { minimumBalance: minimumBalance.toString(10) }) await Promise.all([poolTxQueue.pause(), sentTxQueue.pause()]) @@ -92,16 +92,13 @@ export async function createPoolTxWorker( minimumBalance, config.insufficientBalanceCheckTimeout ) + throw e } - throw e + jobLogger.error('Tx send failed; it will be re-sent later', { txHash, error: (e as Error).message }) } await updateNonce(++nonce) - jobLogger.info('Sent tx', { txHash }) - - await updateField(RelayerKeys.TRANSFER_NUM, commitIndex * OUTPLUSONE) - const nullifier = getTxProofField(txProof, 'nullifier') const outCommit = getTxProofField(txProof, 'out_commit') diff --git a/zp-relayer/workers/sentTxWorker.ts b/zp-relayer/workers/sentTxWorker.ts index f2f6f765..6d5439fd 100644 --- a/zp-relayer/workers/sentTxWorker.ts +++ b/zp-relayer/workers/sentTxWorker.ts @@ -1,16 +1,22 @@ import type Redis from 'ioredis' import type { Mutex } from 'async-mutex' import { toBN } from 'web3-utils' -import type { TransactionReceipt } from 'web3-core' +import type { TransactionReceipt, TransactionConfig } from 'web3-core' import { Job, Worker } from 'bullmq' import config from '@/config' import { pool } from '@/pool' import { web3, web3Redundant } from '@/services/web3' import { logger } from '@/services/appLogger' -import { GasPrice, EstimationType, chooseGasPriceOptions, addExtraGasPrice, getMaxRequiredGasPrice } from '@/services/gas-price' -import { buildPrefixedMemo, waitForFunds, withErrorLog, withLoop, withMutex } from '@/utils/helpers' +import { + GasPrice, + EstimationType, + chooseGasPriceOptions, + addExtraGasPrice, + getMaxRequiredGasPrice, +} from '@/services/gas-price' +import { buildPrefixedMemo, withErrorLog, withLoop, withMutex } from '@/utils/helpers' import { OUTPLUSONE, SENT_TX_QUEUE_NAME } from '@/utils/constants' -import { isGasPriceError, isInsufficientBalanceError, isSameTransactionError } from '@/utils/web3Errors' +import { isGasPriceError, isInsufficientBalanceError, isNonceError, isSameTransactionError } from '@/utils/web3Errors' import { SendAttempt, SentTxPayload, sentTxQueue, SentTxResult, SentTxState } from '@/queue/sentTxQueue' import { sendTransaction, signTransaction } from '@/tx/signAndSend' import { poolTxQueue } from '@/queue/poolTxQueue' @@ -40,6 +46,155 @@ async function clearOptimisticState() { logger.info(`Assert roots are equal: ${root1}, ${root2}, ${root1 === root2}`) } +async function handleMined( + { transactionHash, blockNumber }: TransactionReceipt, + { outCommit, commitIndex, nullifier, truncatedMemo, root }: SentTxPayload, + jobLogger = logger +): Promise { + // Successful + jobLogger.info('Transaction was successfully mined', { transactionHash, blockNumber }) + + const prefixedMemo = buildPrefixedMemo(outCommit, transactionHash, truncatedMemo) + pool.state.updateState(commitIndex, outCommit, prefixedMemo) + // Update tx hash in optimistic state tx db + pool.optimisticState.addTx(commitIndex * OUTPLUSONE, Buffer.from(prefixedMemo, 'hex')) + + // Add nullifier to confirmed state and remove from optimistic one + jobLogger.info('Adding nullifier %s to PS', nullifier) + await pool.state.nullifiers.add([nullifier]) + jobLogger.info('Removing nullifier %s from OS', nullifier) + await pool.optimisticState.nullifiers.remove([nullifier]) + + const node1 = pool.state.getCommitment(commitIndex) + const node2 = pool.optimisticState.getCommitment(commitIndex) + jobLogger.info('Assert commitments are equal: %s, %s', node1, node2) + if (node1 !== node2) { + jobLogger.error('Commitments are not equal') + } + + const rootConfirmed = pool.state.getMerkleRoot() + jobLogger.info('Assert roots are equal') + if (rootConfirmed !== root) { + // TODO: Should be impossible but in such case + // we should recover from some checkpoint + jobLogger.error('Roots are not equal: %s should be %s', rootConfirmed, root) + } + + return [SentTxState.MINED, transactionHash, []] as SentTxResult +} + +async function handleReverted( + { transactionHash: txHash, blockNumber }: TransactionReceipt, + jobId: string, + redis: Redis, + jobLogger = logger +): Promise { + jobLogger.error('Transaction reverted', { txHash, blockNumber }) + + // Means that rollback was done previously, no need to do it now + if (await checkMarked(redis, jobId)) { + jobLogger.info('Job marked as failed, skipping') + return [SentTxState.REVERT, txHash, []] as SentTxResult + } + + await clearOptimisticState() + + // Send all jobs to re-process + // Validation of these jobs will be done in `poolTxWorker` + const waitingJobIds = [] + const reschedulePromises = [] + const newPoolJobIdMapping: Record = {} + const waitingJobs = await sentTxQueue.getJobs(['delayed', 'waiting']) + for (let wj of waitingJobs) { + // One of the jobs can be undefined, so we need to check it + // https://github.com/taskforcesh/bullmq/blob/master/src/commands/addJob-8.lua#L142-L143 + if (!wj?.id) continue + waitingJobIds.push(wj.id) + + const { txPayload, traceId } = wj.data + const transactions = [txPayload] + + // To not mess up traceId we add each transaction separately + const reschedulePromise = poolTxQueue.add(txHash, { transactions, traceId }).then(j => { + const newPoolJobId = j.id as string + newPoolJobIdMapping[wj.data.poolJobId] = newPoolJobId + return newPoolJobId + }) + reschedulePromises.push(reschedulePromise) + } + jobLogger.info('Marking ids %j as failed', waitingJobIds) + await markFailed(redis, waitingJobIds) + jobLogger.info('Rescheduling %d jobs to process...', waitingJobs.length) + const rescheduledIds = await Promise.all(reschedulePromises) + jobLogger.info('Update pool job id mapping %j ...', newPoolJobIdMapping) + await pool.state.jobIdsMapping.add(newPoolJobIdMapping) + + return [SentTxState.REVERT, txHash, rescheduledIds] as SentTxResult +} + +async function handleResend( + txConfig: TransactionConfig, + gasPrice: GasPrice, + job: Job, + jobLogger = logger +) { + const [lastHash, lastGasPrice] = job.data.prevAttempts.at(-1) as SendAttempt + + const fetchedGasPrice = await gasPrice.fetchOnce() + const oldWithExtra = addExtraGasPrice(lastGasPrice, config.minGasPriceBumpFactor, null) + const newWithExtra = addExtraGasPrice(fetchedGasPrice, config.gasPriceSurplus, null) + + const newGasPrice = chooseGasPriceOptions(oldWithExtra, newWithExtra) + + jobLogger.warn('Tx %s is not mined; updating gasPrice: %o -> %o', lastHash, lastGasPrice, newGasPrice) + + const newTxConfig = { + ...txConfig, + ...newGasPrice, + } + + const [newTxHash, rawTransaction] = await signTransaction(web3, newTxConfig, config.relayerPrivateKey) + job.data.prevAttempts.push([newTxHash, newGasPrice]) + jobLogger.info('Re-send tx', { txHash: newTxHash }) + try { + await sendTransaction(web3Redundant, rawTransaction) + } catch (e) { + const err = e as Error + jobLogger.warn('Tx resend failed', { error: err.message, txHash: newTxHash }) + if (isGasPriceError(err) || isSameTransactionError(err)) { + // Tx wasn't sent successfully, but still update last attempt's + // gasPrice to be accounted in the next iteration + await job.update({ + ...job.data, + }) + } else if (isInsufficientBalanceError(err)) { + // We don't want to take into account last gasPrice increase + job.data.prevAttempts.at(-1)![1] = lastGasPrice + + const minimumBalance = toBN(txConfig.gas!).mul(toBN(getMaxRequiredGasPrice(newGasPrice))) + jobLogger.error('Insufficient balance, waiting for funds', { minimumBalance: minimumBalance.toString(10) }) + } else if (isNonceError(err)) { + jobLogger.warn('Nonce error', { error: err.message, txHash: newTxHash }) + // Throw suppressed error to be treated as a warning + throw new Error(RECHECK_ERROR) + } + // Error should be caught by `withLoop` to re-run job + throw e + } + + // Overwrite old tx recorded in optimistic state db with new tx hash + const { truncatedMemo, outCommit, commitIndex } = job.data + const prefixedMemo = buildPrefixedMemo(outCommit, newTxHash, truncatedMemo) + pool.optimisticState.addTx(commitIndex * OUTPLUSONE, Buffer.from(prefixedMemo, 'hex')) + + // Update job + await job.update({ + ...job.data, + txConfig: newTxConfig, + }) + await job.updateProgress({ txHash: newTxHash, gasPrice: newGasPrice }) +} + export async function createSentTxWorker(gasPrice: GasPrice, mutex: Mutex, redis: Redis) { const workerLogger = logger.child({ worker: 'sent-tx' }) const WORKER_OPTIONS = { @@ -82,12 +237,11 @@ export async function createSentTxWorker(gasPrice: Gas const jobLogger = workerLogger.child({ jobId: job.id, traceId: job.data.traceId }) jobLogger.info('Verifying job %s', job.data.poolJobId) - const { truncatedMemo, commitIndex, outCommit, nullifier, root, prevAttempts, txConfig } = job.data + const { prevAttempts, txConfig } = job.data // Any thrown web3 error will re-trigger re-send loop iteration const [tx, shouldReprocess] = await checkMined(prevAttempts, txConfig.nonce as number) // Should always be defined - const [lastHash, lastGasPrice] = prevAttempts.at(-1) as SendAttempt if (shouldReprocess) { // TODO: handle this case later @@ -95,141 +249,23 @@ export async function createSentTxWorker(gasPrice: Gas throw new Error('Ambiguity detected: nonce increased but no respond that transaction was mined') } - if (tx) { - const txHash = tx.transactionHash - // Tx mined - if (tx.status) { - // Successful - jobLogger.info('Transaction was successfully mined', { txHash, blockNumber: tx.blockNumber }) - - const prefixedMemo = buildPrefixedMemo(outCommit, txHash, truncatedMemo) - pool.state.updateState(commitIndex, outCommit, prefixedMemo) - // Update tx hash in optimistic state tx db - pool.optimisticState.addTx(commitIndex * OUTPLUSONE, Buffer.from(prefixedMemo, 'hex')) - - // Add nullifier to confirmed state and remove from optimistic one - jobLogger.info('Adding nullifier %s to PS', nullifier) - await pool.state.nullifiers.add([nullifier]) - jobLogger.info('Removing nullifier %s from OS', nullifier) - await pool.optimisticState.nullifiers.remove([nullifier]) - - const node1 = pool.state.getCommitment(commitIndex) - const node2 = pool.optimisticState.getCommitment(commitIndex) - jobLogger.info('Assert commitments are equal: %s, %s', node1, node2) - if (node1 !== node2) { - jobLogger.error('Commitments are not equal') - } - - const rootConfirmed = pool.state.getMerkleRoot() - jobLogger.info('Assert roots are equal') - if (rootConfirmed !== root) { - // TODO: Should be impossible but in such case - // we should recover from some checkpoint - jobLogger.error('Roots are not equal: %s should be %s', rootConfirmed, root) - } - - return [SentTxState.MINED, txHash, []] as SentTxResult - } else { - // Revert - jobLogger.error('Transaction reverted', { txHash, blockNumber: tx.blockNumber }) - - // Means that rollback was done previously, no need to do it now - if (await checkMarked(redis, job.id as string)) { - jobLogger.info('Job marked as failed, skipping') - return [SentTxState.REVERT, txHash, []] as SentTxResult - } - - await clearOptimisticState() - - // Send all jobs to re-process - // Validation of these jobs will be done in `poolTxWorker` - const waitingJobIds = [] - const reschedulePromises = [] - const newPoolJobIdMapping: Record = {} - const waitingJobs = await sentTxQueue.getJobs(['delayed', 'waiting']) - for (let wj of waitingJobs) { - // One of the jobs can be undefined, so we need to check it - // https://github.com/taskforcesh/bullmq/blob/master/src/commands/addJob-8.lua#L142-L143 - if (!wj?.id) continue - waitingJobIds.push(wj.id) - - const { txPayload, traceId } = wj.data - const transactions = [txPayload] - - // To not mess up traceId we add each transaction separately - const reschedulePromise = poolTxQueue.add(txHash, { transactions, traceId }).then(j => { - const newPoolJobId = j.id as string - newPoolJobIdMapping[wj.data.poolJobId] = newPoolJobId - return newPoolJobId - }) - reschedulePromises.push(reschedulePromise) - } - jobLogger.info('Marking ids %j as failed', waitingJobIds) - await markFailed(redis, waitingJobIds) - jobLogger.info('Rescheduling %d jobs to process...', waitingJobs.length) - const rescheduledIds = await Promise.all(reschedulePromises) - jobLogger.info('Update pool job id mapping %j ...', newPoolJobIdMapping) - await pool.state.jobIdsMapping.add(newPoolJobIdMapping) - - return [SentTxState.REVERT, txHash, rescheduledIds] as SentTxResult - } - } else { + if (!tx) { // Resend with updated gas price - const fetchedGasPrice = await gasPrice.fetchOnce() - const oldWithExtra = addExtraGasPrice(lastGasPrice, config.minGasPriceBumpFactor, null) - const newWithExtra = addExtraGasPrice(fetchedGasPrice, config.gasPriceSurplus, null) - - const newGasPrice = chooseGasPriceOptions(oldWithExtra, newWithExtra) - - jobLogger.warn('Tx %s is not mined; updating gasPrice: %o -> %o', lastHash, lastGasPrice, newGasPrice) - - const newTxConfig = { - ...txConfig, - ...newGasPrice, - } - - const [newTxHash, rawTransaction] = await signTransaction(web3, newTxConfig, config.relayerPrivateKey) - job.data.prevAttempts.push([newTxHash, newGasPrice]) - try { - await sendTransaction(web3Redundant, rawTransaction) - jobLogger.info('Re-send tx', { txHash: newTxHash }) - } catch (e) { - const err = e as Error - jobLogger.warn('Tx resend failed: %s', err.message, { txHash: newTxHash }) - if (isGasPriceError(err) || isSameTransactionError(err)) { - // Tx wasn't sent successfully, but still update last attempt's - // gasPrice to be accounted in the next iteration - await job.update({ - ...job.data, - }) - } else if (isInsufficientBalanceError(err)) { - // We don't want to take into account last gasPrice increase - job.data.prevAttempts.at(-1)![1] = lastGasPrice - - const minimumBalance = toBN(txConfig.gas!).mul(toBN(getMaxRequiredGasPrice(newGasPrice))) - logger.error('Insufficient balance, waiting for funds', { minimumBalance: minimumBalance.toString(10) }) - } - // Error should be caught by `withLoop` to re-run job - throw e - } - - // Overwrite old tx recorded in optimistic state db with new tx hash - const prefixedMemo = buildPrefixedMemo(outCommit, newTxHash, truncatedMemo) - pool.optimisticState.addTx(commitIndex * OUTPLUSONE, Buffer.from(prefixedMemo, 'hex')) - - // Update job - await job.update({ - ...job.data, - txConfig: newTxConfig, - }) - await job.updateProgress({ txHash: newTxHash, gasPrice: newGasPrice }) + await handleResend(txConfig, gasPrice, job, jobLogger) // Tx re-send successful // Throw error to re-run job after delay and // check if tx was mined throw new Error(RECHECK_ERROR) } + + if (tx.status) { + return await handleMined(tx, job.data, jobLogger) + } else { + return await handleReverted(tx, job.id as string, redis, jobLogger) + } } + const sentTxWorker = new Worker( SENT_TX_QUEUE_NAME, job =>