Skip to content

Commit

Permalink
Storing local cache by commit (instead of indexes)
Browse files Browse the repository at this point in the history
  • Loading branch information
EvgenKor committed Sep 2, 2024
1 parent 89a1da2 commit 994e2c0
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 30 deletions.
23 changes: 10 additions & 13 deletions zp-relayer/pool/RelayPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ export class RelayPool extends BasePool<Network> {
}

// cache transaction locally
await this.cacheTxLocally(commitIndex, outCommit, txHash, memo);
await this.cacheTxLocally(outCommit, txHash, memo);
// start monitoring local cache against the indexer to cleanup already indexed txs
this.startLocalCacheObserver(commitIndex);
}
Expand All @@ -278,7 +278,7 @@ export class RelayPool extends BasePool<Network> {
poolJob.data.transaction.txHash = txHash;
await poolJob.update(poolJob.data);

await this.cacheTxLocally(res.commitIndex, res.outCommit, txHash, res.memo);
await this.cacheTxLocally(res.outCommit, txHash, res.memo);
}
}
}
Expand All @@ -296,16 +296,16 @@ export class RelayPool extends BasePool<Network> {
}
}

protected async cacheTxLocally(index: number, commit: string, txHash: string, memo: string) {
protected async cacheTxLocally(commit: string, txHash: string, memo: string) {
// store or updating local tx store
// (we should keep sent transaction until the indexer grab them)
const prefixedMemo = buildPrefixedMemo(
commit,
txHash,
memo
);
await this.txStore.add(index, prefixedMemo);
logger.info(`Tx @${index} with commit ${commit} has been CACHED locally`);
await this.txStore.add(commit, prefixedMemo);
logger.info(`Tx with commit ${commit} has been CACHED locally`);
}

private async getIndexerInfo() {
Expand Down Expand Up @@ -350,9 +350,7 @@ export class RelayPool extends BasePool<Network> {
const CACHE_OBSERVE_INTERVAL_MS = 1000; // waiting time between checks
const EXTEND_LIMIT_TO_FETCH = 10; // taking into account non-atomic nature of /info and /transactions/v2 requests
while (true) {
const localEntries = Object.entries(await this.txStore.getAll())
.map(([i, v]) => [parseInt(i), v] as [number, string])
.sort(([i1], [i2]) => i1 - i2)
const localEntries = Object.entries(await this.txStore.getAll());
let localEntriesCnt = localEntries.length;

if (localEntries.length == 0) {
Expand All @@ -367,11 +365,10 @@ export class RelayPool extends BasePool<Network> {
const indexerCommitments = (await this.getIndexerTxs(fromIndex, limit)).map(tx => tx.slice(65, 129));

// find cached commitments in the indexer's response
for (const [index, memo] of localEntries) {
const commitLocal = memo.slice(0, 64)
if (indexerCommitments.includes(commitLocal)) {
logger.info('Deleting index from optimistic state', { index, commitLocal })
await this.txStore.remove(index.toString())
for (const [commit, memo] of localEntries) {
if (indexerCommitments.includes(commit)) {
logger.info('Deleting cached entry', { commit })
await this.txStore.remove(commit)
localEntriesCnt--;
}
}
Expand Down
15 changes: 4 additions & 11 deletions zp-relayer/services/relayer/endpoints.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,23 +97,16 @@ async function getTransactionsV2(req: Request, res: Response, { pool }: PoolInje
}
const indexerTxs: string[] = await response.json()

const lastIndex = offset + indexerTxs.length * OUTPLUSONE
const txStore = (pool as RelayPool).txStore
const indices = await txStore.getAll().then(keys => {
return Object.entries(keys)
.map(([i, v]) => [parseInt(i), v] as [number, string])
.filter(([i]) => offset <= i && i <= lastIndex)
.sort(([i1], [i2]) => i1 - i2)
})
const localEntries = Object.entries(await txStore.getAll());

const indexerCommitments = indexerTxs.map(tx => tx.slice(65, 129));
const optimisticTxs: string[] = []
for (const [index, memo] of indices) {
const commitLocal = memo.slice(0, 64)
if (indexerCommitments.includes(commitLocal)) {
for (const [commit, memo] of localEntries) {
if (indexerCommitments.includes(commit)) {
// !!! we shouldn't modify local cache from here. Just filter entries to return correct response
//logger.info('Deleting index from optimistic state', { index })
//await txStore.remove(index.toString())
//await txStore.remove(commit)
} else {
optimisticTxs.push(txToV2Format('0', memo))
}
Expand Down
12 changes: 6 additions & 6 deletions zp-relayer/state/TxStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ import type { Redis } from 'ioredis'
export class TxStore {
constructor(public name: string, private redis: Redis) {}

async add(index: number, memo: string) {
await this.redis.hset(this.name, { [index]: memo })
async add(commitment: string, memo: string) {
await this.redis.hset(this.name, { [commitment]: memo })
}

async remove(index: string) {
await this.redis.hdel(this.name, index)
async remove(commitment: string) {
await this.redis.hdel(this.name, commitment)
}

async get(index: string) {
const memo = await this.redis.hget(this.name, index)
async get(commitment: string) {
const memo = await this.redis.hget(this.name, commitment)
return memo
}

Expand Down

0 comments on commit 994e2c0

Please sign in to comment.