Skip to content

Commit

Permalink
close connections properly
Browse files Browse the repository at this point in the history
  • Loading branch information
ledhed2222 committed Jul 20, 2023
1 parent 9e94722 commit 4a24b15
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 42 deletions.
42 changes: 19 additions & 23 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,18 @@ const StartLedger = typeof process.env.LEDGER === 'undefined' ? 32570 : parseInt
console.log('Fetch XRPL transactions into Google BigQuery')

const Client = new XrplClient(XRPLNodeUrl)

async function safeHalt() {
try {
await Client.close()
} catch(e) {
console.error('ERROR closing connection:', e)
} finally {
process.exit(1)
}
}

Client.ready().then(Connection => {
Client.ready().then(() => {
let Stopped = false
let LastLedger = 0

Expand All @@ -26,7 +36,7 @@ Client.ready().then(Connection => {

const fetchLedgerTransactions = (ledger_index) => {
return new Promise((resolve, reject) => {
return Connection.send({
return Client.send({
command: 'ledger',
ledger_index: parseInt(ledger_index),
transactions: true,
Expand All @@ -41,7 +51,7 @@ Client.ready().then(Connection => {
// Lots of data. Per TX
console.log(`<<< MANY TXS at ledger ${ledger_index}: [[ ${Result.ledger.transactions.length} ]], processing per-tx...`)
let transactions = Result.ledger.transactions.map(Tx => {
return Connection.send({
return Client.send({
command: 'tx',
transaction: Tx
}, 10)
Expand All @@ -61,7 +71,7 @@ Client.ready().then(Connection => {
} else {
// Fetch at once.
resolve(new Promise((resolve, reject) => {
Connection.send({
Client.send({
command: 'ledger',
ledger_index: parseInt(ledger_index),
transactions: true,
Expand Down Expand Up @@ -176,47 +186,34 @@ Client.ready().then(Connection => {

return _Tx
})

// console.dir(Transactions[0], { depth: null })
// process.exit(1)

bigquery.dataset(DATASET_NAME).table(TRANSACTION_TABLE_NAME).insert(Transactions)
.then(r => {
console.log(`Inserted rows`, r)
LastLedger = Result.ledger_index
// process.exit(0)
})
.catch(err => {
if (err && err.name === 'PartialFailureError') {
if (err.errors && err.errors.length > 0) {
console.log('Insert errors:')
err.errors.forEach(err => console.dir(err, { depth: null }))
process.exit(1)
return safeHalt()
}
} else {
console.error('ERROR:', err)
process.exit(1)
return safeHalt()
}
})
}

// retryTimeout = 0

if (Stopped) {
return
return Client.close()
}

return run(ledger_index + 1)
}).catch(e => {
console.log(e)
process.exit(1)

// retryTimeout += 500
// if (retryTimeout > 5000) retryTimeout = 5000
console.log(`Oops... Retry in ${retryTimeout / 1000} sec.`)
setTimeout(() => {
return run(ledger_index)
}, retryTimeout * 1000)
return safeHalt()
})
}

Expand All @@ -240,14 +237,13 @@ Client.ready().then(Connection => {
}
}).catch(e => {
console.log('Google BigQuery Error', e)
process.exit(1)
return safeHalt()
})

process.on('SIGINT', function() {
console.log(`\nGracefully shutting down from SIGINT (Ctrl+C)\n -- Wait for remaining BigQuery inserts and XRPL Connection close...`);

Stopped = true
Connection.close()
if (LastLedger > 0) {
console.log(`\nLast ledger: [ ${LastLedger} ]\n\nRun your next job with ENV: "LEDGER=${LastLedger+1}"\n\n`)
}
Expand Down
36 changes: 17 additions & 19 deletions ledgerInfo.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,18 @@ const StartLedger = typeof process.env.LEDGER === 'undefined' ? 32570 : parseInt
console.log('Fetch XRPL Ledger Info into Google BigQuery')

const Client = new XrplClient(XRPLNodeUrl)

async function safeHalt() {
try {
await Client.close()
} catch(e) {
console.error('ERROR closing connection:', e)
} finally {
process.exit(1)
}
}

Client.ready().then(Connection => {
Client.ready().then(() => {
let Stopped = false
let LastLedger = 0

Expand All @@ -24,7 +34,7 @@ Client.ready().then(Connection => {

const fetchLedger = (ledger_index) => {
return new Promise((resolve, reject) => {
return Connection.send({
return Client.send({
command: 'ledger',
ledger_index: parseInt(ledger_index),
transactions: false,
Expand All @@ -39,7 +49,6 @@ Client.ready().then(Connection => {
const run = (ledger_index) => {
return fetchLedger(ledger_index).then(Result => {
console.log(`${Result.ledger_index}`)
// console.log(Result)
bigquery.dataset(DATASET_NAME).table(LEDGER_TABLE_NAME).insert([{
LedgerIndex: parseInt(Result.ledger.ledger_index),
hash: Result.ledger.hash,
Expand All @@ -55,38 +64,28 @@ Client.ready().then(Connection => {
.then(r => {
console.log(`Inserted rows`, r)
LastLedger = Result.ledger_index
// process.exit(0)
})
.catch(err => {
if (err && err.name === 'PartialFailureError') {
if (err.errors && err.errors.length > 0) {
console.log('Insert errors:')
err.errors.forEach(err => console.dir(err, { depth: null }))
process.exit(1)
return safeHalt()
}
} else {
console.error('ERROR:', err)
process.exit(1)
return safeHalt()
}
})

// retryTimeout = 0

if (Stopped) {
return
return Client.close()
}

return run(ledger_index + 1)
}).catch(e => {
console.log(e)
process.exit(1)

// retryTimeout += 500
// if (retryTimeout > 5000) retryTimeout = 5000
console.log(`Oops... Retry in ${retryTimeout / 1000} sec.`)
setTimeout(() => {
return run(ledger_index)
}, retryTimeout * 1000)
return safeHalt()
})
}

Expand All @@ -107,14 +106,13 @@ Client.ready().then(Connection => {
}
}).catch(e => {
console.log('Google BigQuery Error', e)
process.exit(1)
return safeHalt()
})

process.on('SIGINT', function() {
console.log(`\nGracefully shutting down from SIGINT (Ctrl+C)\n -- Wait for remaining BigQuery inserts and XRPL Connection close...`);

Stopped = true
Connection.close()
if (LastLedger > 0) {
console.log(`\nLast ledger: [ ${LastLedger} ]\n\nRun your next job with ENV: "LEDGER=${LastLedger+1}"\n\n`)
}
Expand Down

0 comments on commit 4a24b15

Please sign in to comment.