Skip to content

Commit

Permalink
experiment
Browse files Browse the repository at this point in the history
  • Loading branch information
ledhed2222 committed Jul 28, 2023
1 parent 4a24b15 commit 390db6d
Showing 1 changed file with 107 additions and 96 deletions.
203 changes: 107 additions & 96 deletions ledgerInfo.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,113 +8,124 @@ const XrplClient = require('xrpl-client').XrplClient
const BigQuery = require('@google-cloud/bigquery')
const bigquery = new BigQuery({ projectId: PROJECT_ID })

const XRPLNodeUrl = typeof process.env.NODE === 'undefined' ? 'wss://s2.ripple.com' : process.env.NODE.trim()
const StartLedger = typeof process.env.LEDGER === 'undefined' ? 32570 : parseInt(process.env.LEDGER)
let Stopped = false

console.log('Fetch XRPL Ledger Info into Google BigQuery')

const Client = new XrplClient(XRPLNodeUrl)

async function safeHalt() {
async function fetchLedger(client, ledgerIndex) {
try {
await Client.close()
return await client.send({
command: 'ledger',
ledger_index: parseInt(ledgerIndex),
transactions: false,
expand: false,
})
} catch(e) {
console.error('ERROR closing connection:', e)
} finally {
process.exit(1)
console.error('Ledger fetching error', e)
throw e
}
}

Client.ready().then(() => {
let Stopped = false
let LastLedger = 0

console.log('Connected to the XRPL')
let retryTimeout = 60 * 60 * 12

const fetchLedger = (ledger_index) => {
return new Promise((resolve, reject) => {
return Client.send({
command: 'ledger',
ledger_index: parseInt(ledger_index),
transactions: false,
expand: false
}).then(Result => {
resolve(Result)
return
}).catch(reject)
async function getLastDBLedger() {
let result
try {
result = await bigquery.query({
query: `SELECT MAX(LedgerIndex) as MaxLedger
FROM ${PROJECT_ID}.${DATASET_NAME}.${LEDGER_TABLE_NAME}`,
useLegacySql: false,
})
} catch(e) {
console.error('Google BigQuery Error', e)
throw e
}

const run = (ledger_index) => {
return fetchLedger(ledger_index).then(Result => {
console.log(`${Result.ledger_index}`)
bigquery.dataset(DATASET_NAME).table(LEDGER_TABLE_NAME).insert([{
LedgerIndex: parseInt(Result.ledger.ledger_index),
hash: Result.ledger.hash,
CloseTime: bigquery.timestamp(new Date(Date.parse(Result.ledger.close_time_human)).toISOString().replace('T', ' ').replace(/[^0-9]+$/, '')),
CloseTimeTimestamp: Result.ledger.close_time,
CloseTimeHuman: Result.ledger.close_time_human,
TotalCoins: parseInt(Result.ledger.totalCoins),
ParentHash: Result.ledger.parent_hash,
AccountHash: Result.ledger.account_hash,
TransactionHash: Result.ledger.transaction_hash,
_InsertedAt: bigquery.timestamp(new Date()),
}])
.then(r => {
console.log(`Inserted rows`, r)
LastLedger = Result.ledger_index
})
.catch(err => {
if (err && err.name === 'PartialFailureError') {
if (err.errors && err.errors.length > 0) {
console.log('Insert errors:')
err.errors.forEach(err => console.dir(err, { depth: null }))
return safeHalt()
}
} else {
console.error('ERROR:', err)
return safeHalt()
}
})

if (Stopped) {
return Client.close()
return result[0][0].MaxLedger
}

async function insertDBLedger(ledgerResult) {
try {
await bigquery.dataset(DATASET_NAME).table(LEDGER_TABLE_NAME).insert([{
LedgerIndex: parseInt(ledgerResult.ledger.ledger_index),
hash: ledgerResult.ledger.hash,
CloseTime: bigquery.timestamp(new Date(Date.parse(ledgerResult.ledger.close_time_human)).toISOString().replace('T', ' ').replace(/[^0-9]+$/, '')),
CloseTimeTimestamp: ledgerResult.ledger.close_time,
CloseTimeHuman: ledgerResult.ledger.close_time_human,
TotalCoins: parseInt(ledgerResult.ledger.totalCoins),
ParentHash: ledgerResult.ledger.parent_hash,
AccountHash: ledgerResult.ledger.account_hash,
TransactionHash: ledgerResult.ledger.transaction_hash,
_InsertedAt: bigquery.timestamp(new Date()),
}])
} catch(err) {
if (err && err.name === 'PartialFailureError') {
if (err.errors && err.errors.length > 0) {
console.error('Insert errors:')
err.errors.forEach(err => console.dir(err, { depth: null }))
throw err
}
} else {
console.error('ERROR:', err)
throw err
}
}
}

return run(ledger_index + 1)
}).catch(e => {
console.log(e)
return safeHalt()
})
async function processLedger(client, lastLedger) {
const ledgerResult = await fetchLedger(client, lastLedger + 1)
await insertDBLedger(ledgerResult)
lastLedger = ledgerResult.ledger_index
console.log(`${lastLedger} inserted`)
return lastLedger
}

async function main() {
console.log('Fetch XRPL Ledger Info into Google BigQuery')
const xrplNodeUrl = typeof process.env.NODE === 'undefined' ? 'wss://s2.ripple.com' : process.env.NODE.trim()
const client = new XrplClient(xrplNodeUrl)
await client.ready()
console.log('Connected to the XRPL')

let errored = false

// Determine start ledger. lastLedger represents the last ledger that we
// _have_ stored. So lastLedger + 1 is the next ledger we need. The
// commandline input is supposed to represent the _next_ ledger, so we need
// to subtract one.
let lastLedger = 0
const cmdLineStartLedger = typeof process.env.LEDGER === 'undefined' ? 32570 : parseInt(process.env.LEDGER)
const startLedgerDB = await getLastDBLedger()
if (startLedgerDB >= cmdLineStartLedger) {
console.log(`BigQuery History at ledger [ ${startLedgerDB} ], > StartLedger.\n Forcing StartLedger at:\n >>> ${startLedgerDB+1}\n\n`)
lastLedger = startLedgerDB
} else {
console.log(`Starting at ledger ${cmdLineStartLedger}`)
lastLedger = Math.max(cmdLineStartLedger - 1, 1)
}

console.log(`Starting at ledger [ ${StartLedger} ], \n Checking last ledger in BigQuery...`)

bigquery.query({
query: `SELECT
MAX(LedgerIndex) as MaxLedger
FROM
${PROJECT_ID}.${DATASET_NAME}.${LEDGER_TABLE_NAME}`,
useLegacySql: false, // Use standard SQL syntax for queries.
}).then(r => {
if (r[0][0].MaxLedger > StartLedger) {
console.log(`BigQuery History at ledger [ ${r[0][0].MaxLedger} ], > StartLedger.\n Forcing StartLedger at:\n >>> ${r[0][0].MaxLedger+1}\n\n`)
run(r[0][0].MaxLedger + 1)
} else{
run(StartLedger)
}
}).catch(e => {
console.log('Google BigQuery Error', e)
return safeHalt()
})

process.on('SIGINT', function() {
console.log(`\nGracefully shutting down from SIGINT (Ctrl+C)\n -- Wait for remaining BigQuery inserts and XRPL Connection close...`);

Stopped = true
if (LastLedger > 0) {
console.log(`\nLast ledger: [ ${LastLedger} ]\n\nRun your next job with ENV: "LEDGER=${LastLedger+1}"\n\n`)
while (!Stopped) {
try {
lastLedger = await processLedger(client, lastLedger)
} catch(e) {
console.error('Error', e)
errored = true
break
}
})
})
}

console.log('Disconnecting from ledger')
await client.close()
console.log('Disconnected')

if (lastLedger > 0) {
console.log(`\nLast ledger: [ ${lastLedger} ]\n\nRun your next job with ENV: "LEDGER=${lastLedger+1}"\n\n`)
}

process.exit(errored ? 1 : 0)
}

function onSigInt() {
console.log(`\nGracefully shutting down from SIGINT (Ctrl+C)\n -- Wait for remaining BigQuery inserts and XRPL Connection close...`);
Stopped = true
}

process.on('SIGINT', onSigInt)

main().then()

0 comments on commit 390db6d

Please sign in to comment.