Skip to content

Commit

Permalink
implement proposal synchronization with MongoDB and Elasticsearch sup…
Browse files Browse the repository at this point in the history
…port; add interfaces for proposals and transactions
  • Loading branch information
lealbrunocalhau committed Nov 14, 2024
1 parent b48f969 commit 2de952a
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 66 deletions.
176 changes: 110 additions & 66 deletions src/cli/sync-accounts/sync-proposals.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,45 @@
import { APIClient, Name, PackedTransaction, Serializer, UInt64 } from "@wharfkit/antelope";
import { get } from "lodash";
import { getProposalsHandler } from "../../api/routes/v2-state/get_proposals/get_proposals.js";
import { promises } from "dns";
import { abi } from "../../indexer/definitions/index-templates.js";
import { log } from "console";
import { Client } from "@elastic/elasticsearch";
import { MongoClient } from "mongodb";
import { readFileSync } from "fs";
import { join } from "path";
import { cargo } from "async";

const chain = process.argv[2];
if (!chain) {
console.log("Please select a chain to run on!");
process.exit();
}

const url = "http://localhost:8888";
console.log('Proposal & Approval Synchronizer');

const api = new APIClient({ url });
const configDir = join(import.meta.dirname, '../../../config');
const connections = JSON.parse(readFileSync(join(configDir, "connections.json")).toString());

const abiCacheMap = new Map<string, any>();
const endpoint = connections.chains[chain].http;
const api = new APIClient({ url: endpoint });

if (!endpoint) {
console.log("No HTTP Endpoint!");
process.exit();
}

const _es = connections.elasticsearch;
const elastic = new Client({
node: `${_es.protocol}://${_es.host}`,
auth: { username: _es.user, password: _es.pass },
tls: _es.protocol === 'https' ? { rejectUnauthorized: false } : undefined
});

const _mongo = connections.mongodb;
const mongoClient = new MongoClient(`mongodb://${_mongo.host}:${_mongo.port}`);
const proposalsCollection = mongoClient.db(`${_mongo.database_prefix}_${chain}`).collection("proposals");


const abiCacheMap = new Map<string, any>();
let accounts = 0;


async function getProposals(scope) {
let lb: UInt64 | undefined = undefined;
let more = false;
Expand All @@ -23,35 +50,28 @@ async function getProposals(scope) {
table: "proposal",
scope: scope,
limit: 10,
lower_bound: lb ? lb : undefined
lower_bound: lb
});
lb = result.next_key;
more = result.more;
// return result.rows;

for (const proposal of result.rows) {
const trx = PackedTransaction.from({ packed_trx: proposal.packed_transaction }).getTransaction();
delete proposal.packed_transaction;
const transaction = { ...Serializer.objectify(trx) };
transaction.actions = [];
if (trx.actions.length > 0) {
for (const action of trx.actions) {

try {
let abi = abiCacheMap.get(action.account.toString());
if (!abi) {
abi = await api.v1.chain.get_abi(action.account.toString());
abiCacheMap.set(action.account.toString(), abi);
}
const decodedData = Serializer.objectify(action.decodeData(abi.abi));
const mergedAction = Serializer.objectify(action);
mergedAction.data = decodedData;
transaction.actions.push(mergedAction);
} catch (error: any) {
console.log(`Failed to deserialize action ${action.account}::${action.name} - ${error.message}`);
transaction.actions.push(Serializer.objectify(action));
for (const action of trx.actions) {
try {
let abi = abiCacheMap.get(action.account.toString());
if (!abi) {
abi = await api.v1.chain.get_abi(action.account.toString());
abiCacheMap.set(action.account.toString(), abi);
}

const decodedData = Serializer.objectify(action.decodeData(abi.abi));
transaction.actions.push({ ...Serializer.objectify(action), data: decodedData });
} catch (error: any) {
console.log(`Failed to decode action ${action.account}::${action.name} - ${error.message}`);
transaction.actions.push(Serializer.objectify(action));
}
}
proposals.push({ ...proposal, trx: transaction });
Expand All @@ -64,28 +84,21 @@ async function getApprovals(scope) {
let lb: UInt64 | undefined = undefined;
let more = false;
const approvals: any[] = [];

do {
const result = await api.v1.chain.get_table_rows({
code: "eosio.msig",
table: "approvals2",
scope: scope,
limit: 10,
lower_bound: lb ? lb : undefined
lower_bound: lb
});
lb = result.next_key;
more = result.more;
// return result.rows;

for (const proposal of result.rows) {
approvals.push(proposal);
}
approvals.push(...result.rows);
} while (more);

return approvals;
}


async function* scan() {
let lb = '';
do {
Expand All @@ -96,46 +109,77 @@ async function* scan() {
lower_bound: Name.from(lb).value.toString()
});
for (const row of result.rows) {
try {
const scope = row.scope.toString();
const [proposals, approvals] = await Promise.all([getProposals(scope), getApprovals(scope)]);

const proposalMap = new Map();
for (const proposal of proposals) {
proposalMap.set(proposal.proposal_name, { ...proposal });
}
const scope = row.scope.toString();
const [proposals, approvals] = await Promise.all([getProposals(scope), getApprovals(scope)]);
const proposalMap = new Map();

for (const approval of approvals) {
const proposalMerge = proposalMap.get(approval.proposal_name);
if (proposalMerge) {
proposalMerge.proposer = scope;
proposalMerge.version = approval.version;
proposalMerge.requested_approvals = approval.requested_approvals;
proposalMerge.provided_approvals = approval.provided_approvals;
}
}

for (const entry of proposalMap.values()) {
yield entry;
for (const proposal of proposals) {
proposalMap.set(proposal.proposal_name, { ...proposal });
}
for (const approval of approvals) {
const proposalMerge = proposalMap.get(approval.proposal_name);
if (proposalMerge) {
proposalMerge.proposer = scope;
proposalMerge.version = approval.version;
proposalMerge.requested_approvals = approval.requested_approvals.map(el => {
return { actor: el.level.actor, permission: el.level.permission, time: el.time };
});
proposalMerge.provided_approvals = approval.provided_approvals.map(el => {
return { actor: el.level.actor, permission: el.level.permission, time: el.time };
});
}
} catch (error: any) {
console.log(row, error.message);
}
for (const entry of proposalMap.values()) {

yield entry;
}
}
lb = result.more;
// console.log(`NEXT >>> ${lb} | ${Name.from(lb).value.toString()}`);
} while (lb !== '');
}

async function main() {
for await (const data of scan()) {
accounts++;
if (data.proposer === 'onurbtesteee') {
console.log(data);
console.log(`action`, data.trx.actions)
const isMongoEnabled = _mongo?.enabled === true;
console.log(`MongoDB habilitado: ${isMongoEnabled}`);
try {
await mongoClient.connect();

if (isMongoEnabled && proposalsCollection) {
console.log(`Mongo Enable`)
const cargoQueue = cargo((docs: any[], cb) => {
proposalsCollection.bulkWrite(docs.map(doc => ({
updateOne: {
filter: { proposal_name: doc.proposal_name, proposer: doc.proposer },
update: { $set: doc },
upsert: true
}
}))).finally(() => cb());
}, 1000); // Processa até 1000 documentos por vez

for await (const data of scan()) {
accounts++;
cargoQueue.push(data).catch(console.log);
}

await cargoQueue.drain();
} else {
console.log(`Elastic Enable`)
const bulkResponse = await elastic.helpers.bulk({
datasource: scan(),
onDocument: (doc) => [{
index: {
_index: `${chain}-table-proposals-v1`,
_id: `${doc.proposer}-${doc.proposal_name}`
}
}, doc]
});
accounts = bulkResponse.total;
}

console.log(`Total Proposals Processed: ${accounts}`);
} finally {
await mongoClient.close();
}
console.log(`Total Accounts ${accounts}`);
}

main().catch(console.log);
main().catch(console.log);
46 changes: 46 additions & 0 deletions src/interfaces/table-proposal.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
export interface ITransaction {
expiration: string;
ref_block_num: number;
ref_block_prefix: number;
max_net_usage_words: number;
max_cpu_usage_ms: number;
delay_sec: number;
context_free_actions: any[];
actions: Array<{
account: string;
name: string;
authorization: Array<{
actor: string;
permission: string;
}>;
data: {
payer: string;
receiver: string;
quant: string;
};
}>;
transaction_extensions: any[];
}

export interface IApproval {
actor: string;
permission: string;
time: string;
}


export interface IProposal {
proposal_name: string;
trx: ITransaction;
amount: number | string;
code: string;
scope: string;
proposer: string;
version: number;
requested_approvals: IApproval[];
provided_approvals: IApproval[];
}




0 comments on commit 2de952a

Please sign in to comment.