Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

close connections properly #14

Open
wants to merge 15 commits into
base: google-bigquery
Choose a base branch
from
42 changes: 19 additions & 23 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,18 @@ const StartLedger = typeof process.env.LEDGER === 'undefined' ? 32570 : parseInt
console.log('Fetch XRPL transactions into Google BigQuery')

const Client = new XrplClient(XRPLNodeUrl)

async function safeHalt() {
try {
await Client.close()
} catch(e) {
console.error('ERROR closing connection:', e)
} finally {
process.exit(1)
}
}
ledhed2222 marked this conversation as resolved.
Show resolved Hide resolved

Client.ready().then(Connection => {
Client.ready().then(() => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this since I know understand that Client.ready returns Client anyway and we have that variable in global scope here. fewer variables is easier to read.

let Stopped = false
let LastLedger = 0

Expand All @@ -26,7 +36,7 @@ Client.ready().then(Connection => {

const fetchLedgerTransactions = (ledger_index) => {
return new Promise((resolve, reject) => {
return Connection.send({
return Client.send({
command: 'ledger',
ledger_index: parseInt(ledger_index),
transactions: true,
Expand All @@ -41,7 +51,7 @@ Client.ready().then(Connection => {
// Lots of data. Per TX
console.log(`<<< MANY TXS at ledger ${ledger_index}: [[ ${Result.ledger.transactions.length} ]], processing per-tx...`)
let transactions = Result.ledger.transactions.map(Tx => {
return Connection.send({
return Client.send({
command: 'tx',
transaction: Tx
}, 10)
Expand All @@ -61,7 +71,7 @@ Client.ready().then(Connection => {
} else {
// Fetch at once.
resolve(new Promise((resolve, reject) => {
Connection.send({
Client.send({
command: 'ledger',
ledger_index: parseInt(ledger_index),
transactions: true,
Expand Down Expand Up @@ -176,47 +186,34 @@ Client.ready().then(Connection => {

return _Tx
})

// console.dir(Transactions[0], { depth: null })
// process.exit(1)
ledhed2222 marked this conversation as resolved.
Show resolved Hide resolved

bigquery.dataset(DATASET_NAME).table(TRANSACTION_TABLE_NAME).insert(Transactions)
.then(r => {
console.log(`Inserted rows`, r)
LastLedger = Result.ledger_index
// process.exit(0)
})
.catch(err => {
if (err && err.name === 'PartialFailureError') {
if (err.errors && err.errors.length > 0) {
console.log('Insert errors:')
err.errors.forEach(err => console.dir(err, { depth: null }))
process.exit(1)
return safeHalt()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI - I personally do not like .then and greatly prefer async/await. It took me forever to remember how to await a promise from within another promise, conditionally, without nesting. to me, the fact that returning a promise from then will await the returned promise is too magical. async/await is much clearer

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have we considered using util.promisify to wrap this callback function bigquery.dataset(DATASET_NAME).table(TRANSACTION_TABLE_NAME).insert portion?

}
} else {
console.error('ERROR:', err)
process.exit(1)
return safeHalt()
}
})
}

// retryTimeout = 0

if (Stopped) {
return
return Client.close()
}

return run(ledger_index + 1)
}).catch(e => {
console.log(e)
process.exit(1)

// retryTimeout += 500
// if (retryTimeout > 5000) retryTimeout = 5000
console.log(`Oops... Retry in ${retryTimeout / 1000} sec.`)
setTimeout(() => {
return run(ledger_index)
}, retryTimeout * 1000)
return safeHalt()
})
}

Expand All @@ -240,14 +237,13 @@ Client.ready().then(Connection => {
}
}).catch(e => {
console.log('Google BigQuery Error', e)
process.exit(1)
return safeHalt()
})

process.on('SIGINT', function() {
console.log(`\nGracefully shutting down from SIGINT (Ctrl+C)\n -- Wait for remaining BigQuery inserts and XRPL Connection close...`);

Stopped = true
Connection.close()
if (LastLedger > 0) {
console.log(`\nLast ledger: [ ${LastLedger} ]\n\nRun your next job with ENV: "LEDGER=${LastLedger+1}"\n\n`)
}
Expand Down
36 changes: 17 additions & 19 deletions ledgerInfo.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,18 @@ const StartLedger = typeof process.env.LEDGER === 'undefined' ? 32570 : parseInt
console.log('Fetch XRPL Ledger Info into Google BigQuery')

const Client = new XrplClient(XRPLNodeUrl)

async function safeHalt() {
try {
await Client.close()
} catch(e) {
console.error('ERROR closing connection:', e)
} finally {
process.exit(1)
}
}
ledhed2222 marked this conversation as resolved.
Show resolved Hide resolved

Client.ready().then(Connection => {
Client.ready().then(() => {
let Stopped = false
let LastLedger = 0

Expand All @@ -24,7 +34,7 @@ Client.ready().then(Connection => {

const fetchLedger = (ledger_index) => {
return new Promise((resolve, reject) => {
return Connection.send({
return Client.send({
command: 'ledger',
ledger_index: parseInt(ledger_index),
transactions: false,
Expand All @@ -39,7 +49,6 @@ Client.ready().then(Connection => {
const run = (ledger_index) => {
return fetchLedger(ledger_index).then(Result => {
console.log(`${Result.ledger_index}`)
// console.log(Result)
bigquery.dataset(DATASET_NAME).table(LEDGER_TABLE_NAME).insert([{
LedgerIndex: parseInt(Result.ledger.ledger_index),
hash: Result.ledger.hash,
Expand All @@ -55,38 +64,28 @@ Client.ready().then(Connection => {
.then(r => {
console.log(`Inserted rows`, r)
LastLedger = Result.ledger_index
// process.exit(0)
})
.catch(err => {
if (err && err.name === 'PartialFailureError') {
if (err.errors && err.errors.length > 0) {
console.log('Insert errors:')
err.errors.forEach(err => console.dir(err, { depth: null }))
process.exit(1)
return safeHalt()
}
} else {
console.error('ERROR:', err)
process.exit(1)
return safeHalt()
}
})

// retryTimeout = 0

if (Stopped) {
return
return Client.close()
}

return run(ledger_index + 1)
}).catch(e => {
console.log(e)
process.exit(1)

// retryTimeout += 500
// if (retryTimeout > 5000) retryTimeout = 5000
console.log(`Oops... Retry in ${retryTimeout / 1000} sec.`)
setTimeout(() => {
return run(ledger_index)
}, retryTimeout * 1000)
return safeHalt()
})
}

Expand All @@ -107,14 +106,13 @@ Client.ready().then(Connection => {
}
}).catch(e => {
console.log('Google BigQuery Error', e)
process.exit(1)
return safeHalt()
})

process.on('SIGINT', function() {
console.log(`\nGracefully shutting down from SIGINT (Ctrl+C)\n -- Wait for remaining BigQuery inserts and XRPL Connection close...`);

Stopped = true
Connection.close()
if (LastLedger > 0) {
console.log(`\nLast ledger: [ ${LastLedger} ]\n\nRun your next job with ENV: "LEDGER=${LastLedger+1}"\n\n`)
}
Expand Down