Skip to content

Commit

Permalink
experiment
Browse files Browse the repository at this point in the history
  • Loading branch information
ledhed2222 committed Jul 28, 2023
1 parent 4a24b15 commit 23fa88b
Showing 1 changed file with 108 additions and 93 deletions.
201 changes: 108 additions & 93 deletions ledgerInfo.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,111 +10,126 @@ const bigquery = new BigQuery({ projectId: PROJECT_ID })

const XRPLNodeUrl = typeof process.env.NODE === 'undefined' ? 'wss://s2.ripple.com' : process.env.NODE.trim()
const StartLedger = typeof process.env.LEDGER === 'undefined' ? 32570 : parseInt(process.env.LEDGER)

console.log('Fetch XRPL Ledger Info into Google BigQuery')
let Stopped = false

const Client = new XrplClient(XRPLNodeUrl)

async function safeHalt() {
async function fetchLedger(ledgerIndex) {
try {
await Client.close()
return await Client.send({
command: 'ledger',
ledger_index: parseInt(ledgerIndex),
transactions: false,
expand: false,
})
} catch(e) {
console.error('ERROR closing connection:', e)
} finally {
process.exit(1)
console.log('Ledger fetching error', e)
throw e
}
}

Client.ready().then(() => {
let Stopped = false
let LastLedger = 0

console.log('Connected to the XRPL')
let retryTimeout = 60 * 60 * 12

const fetchLedger = (ledger_index) => {
return new Promise((resolve, reject) => {
return Client.send({
command: 'ledger',
ledger_index: parseInt(ledger_index),
transactions: false,
expand: false
}).then(Result => {
resolve(Result)
return
}).catch(reject)
async function getLastDBLedger() {
let result

try {
result = await bigquery.query({
query: `SELECT MAX(LedgerIndex) as MaxLedger
FROM ${PROJECT_ID}.${DATASET_NAME}.${LEDGER_TABLE_NAME}`,
useLegacySql: false,
})
} catch(e) {
console.log('Google BigQuery Error', e)
throw e
}

const run = (ledger_index) => {
return fetchLedger(ledger_index).then(Result => {
console.log(`${Result.ledger_index}`)
bigquery.dataset(DATASET_NAME).table(LEDGER_TABLE_NAME).insert([{
LedgerIndex: parseInt(Result.ledger.ledger_index),
hash: Result.ledger.hash,
CloseTime: bigquery.timestamp(new Date(Date.parse(Result.ledger.close_time_human)).toISOString().replace('T', ' ').replace(/[^0-9]+$/, '')),
CloseTimeTimestamp: Result.ledger.close_time,
CloseTimeHuman: Result.ledger.close_time_human,
TotalCoins: parseInt(Result.ledger.totalCoins),
ParentHash: Result.ledger.parent_hash,
AccountHash: Result.ledger.account_hash,
TransactionHash: Result.ledger.transaction_hash,
_InsertedAt: bigquery.timestamp(new Date()),
}])
.then(r => {
console.log(`Inserted rows`, r)
LastLedger = Result.ledger_index
})
.catch(err => {
if (err && err.name === 'PartialFailureError') {
if (err.errors && err.errors.length > 0) {
console.log('Insert errors:')
err.errors.forEach(err => console.dir(err, { depth: null }))
return safeHalt()
}
} else {
console.error('ERROR:', err)
return safeHalt()
}
})

if (Stopped) {
return Client.close()
return result[0][0].MaxLedger
}

async function insertDBLedger(ledger) {
try {
await bigquery.dataset(DATASET_NAME).table(LEDGER_TABLE_NAME).insert([{
LedgerIndex: parseInt(ledgerResult.ledger.ledger_index),
hash: ledgerResult.ledger.hash,
CloseTime: bigquery.timestamp(new Date(Date.parse(ledgerResult.ledger.close_time_human)).toISOString().replace('T', ' ').replace(/[^0-9]+$/, '')),
CloseTimeTimestamp: ledgerResult.ledger.close_time,
CloseTimeHuman: ledgerResult.ledger.close_time_human,
TotalCoins: parseInt(ledgerResult.ledger.totalCoins),
ParentHash: ledgerResult.ledger.parent_hash,
AccountHash: ledgerResult.ledger.account_hash,
TransactionHash: ledgerResult.ledger.transaction_hash,
_InsertedAt: bigquery.timestamp(new Date()),
}])
} catch(err) {
if (err && err.name === 'PartialFailureError') {
if (err.errors && err.errors.length > 0) {
console.log('Insert errors:')
err.errors.forEach(err => console.dir(err, { depth: null }))
throw err
}
} else {
console.error('ERROR:', err)
throw err
}
}
}

return run(ledger_index + 1)
}).catch(e => {
console.log(e)
return safeHalt()
})
async function doMain(startLedger) {
let thisLedger = startLedger

while (!Stopped) {
const ledgerResult = await fetchLedger(thisLedger)
console.log(ledgerResult.ledger_index)

const bqResult = await insertDBLedger(ledgerResult)
console.log(`Inserted rows`, bqResult)

thisLedger = ledgerResult.ledger_index
}

console.log(`Starting at ledger [ ${StartLedger} ], \n Checking last ledger in BigQuery...`)

bigquery.query({
query: `SELECT
MAX(LedgerIndex) as MaxLedger
FROM
${PROJECT_ID}.${DATASET_NAME}.${LEDGER_TABLE_NAME}`,
useLegacySql: false, // Use standard SQL syntax for queries.
}).then(r => {
if (r[0][0].MaxLedger > StartLedger) {
console.log(`BigQuery History at ledger [ ${r[0][0].MaxLedger} ], > StartLedger.\n Forcing StartLedger at:\n >>> ${r[0][0].MaxLedger+1}\n\n`)
run(r[0][0].MaxLedger + 1)
} else{
run(StartLedger)
}
}).catch(e => {
console.log('Google BigQuery Error', e)
return safeHalt()
})

process.on('SIGINT', function() {
console.log(`\nGracefully shutting down from SIGINT (Ctrl+C)\n -- Wait for remaining BigQuery inserts and XRPL Connection close...`);

Stopped = true
if (LastLedger > 0) {
console.log(`\nLast ledger: [ ${LastLedger} ]\n\nRun your next job with ENV: "LEDGER=${LastLedger+1}"\n\n`)
}
})
})
return thisLedger
}

async function main() {
console.log('Fetch XRPL Ledger Info into Google BigQuery')
await Client.ready()
console.log('Connected to the XRPL')

let errored = false

// Determine start ledger
let lastLedger = 0
let startLedger
const cmdStart = typeof process.env.LEDGER === 'undefined' ? 32570 : parseInt(process.env.LEDGER)
const startLedgerDB = await getLastDBLedger()
if (startLedgerDB >= cmdStart) {
console.log(`BigQuery History at ledger [ ${startLedgerDB} ], > StartLedger.\n Forcing StartLedger at:\n >>> ${startLedgerDB+1}\n\n`)
startLedger = startLedgerDB + 1
} else {
console.log(`Starting at ledger ${cmdLedger-1}`)
startLedger = cmdLedger
}

try {
lastLedger = await doMain(startLedger)
} catch(e) {
console.error('Error:', e)
errored = true
} finally {
await Client.close()
}

if (lastLedger > 0) {
console.log(`\nLast ledger: [ ${lastLedger} ]\n\nRun your next job with ENV: "LEDGER=${lastLedger+1}"\n\n`)
}

process.exit(errored ? 1 : 0)
}

function onSigInt() {
console.log(`\nGracefully shutting down from SIGINT (Ctrl+C)\n -- Wait for remaining BigQuery inserts and XRPL Connection close...`);
Stopped = true
}

process.on('SIGINT', onSigInt)

main().then()

0 comments on commit 23fa88b

Please sign in to comment.