Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

close connections properly #14

Open
wants to merge 15 commits into
base: google-bigquery
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ export GOOGLE_APPLICATION_CREDENTIALS="/home/user/Downloads/[FILE_NAME].json"
More information:
https://cloud.google.com/docs/authentication/getting-started

> Now modify `schema/CONSTANTS.js` to point to your own projectId, dataset and table names.
Set the following env variables:
- `PROJECT_ID`: the BigQuery project ID to use
- `DATASET_NAME`: the BigQuery dataset to use
- `TRANSACTION_TABLE_NAME`: the BigQuery table to write to for transactions
- `LEDGER_TABLE_NAME`: the BigQuery table to write to for ledgers

# Create schema

Expand All @@ -73,3 +77,11 @@ You can invoke the script from a node enabled environment by setting these envir

- `NODE`: the rippled node (`wss://...`) to connect to, default: **wss://s2.ripple.com**
- `LEDGER`: the ledger index to start fetching transactions from, default: **32570**
- `PROJECT_ID`: the BigQuery project ID to use
- `DATASET_NAME`: the BigQuery dataset to use
- `MODE`: `transactions` will insert transaction data, `ledgers` will insert ledger data
- `TABLE_NAME`: the BigQuery table to write to

## systemd integration

Use the `exampleService.servie` file as a template for writing a Linux `systemd` service.
83 changes: 43 additions & 40 deletions applySchema.js
Original file line number Diff line number Diff line change
@@ -1,55 +1,58 @@
const {
PROJECT_ID,
DATASET_NAME,
TRANSACTION_TABLE_NAME,
LEDGER_TABLE_NAME,
transactionSchema,
ledgerSchema,
} = require('./schema')

const BigQuery = require('@google-cloud/bigquery')
const bigquery = new BigQuery({ projectId: PROJECT_ID })

const createTable = async (name, schema) => {
return new Promise((resolve, reject) => {
bigquery.dataset(DATASET_NAME).createTable(name, { schema: schema })
.then(r => {
console.log(` -- BigQuery Table ${r[0].id} created`)
resolve()
})
.catch(e => {
reject(e)
})
})
}

const deleteTable = async (name) => {
return new Promise((resolve, reject) => {
bigquery.dataset(DATASET_NAME).table(name).delete().then(() => {
console.log(` -- BigQuery Table ${name} removed`)
resolve()
}).catch(e => {
if (e.errors[0].reason === 'notFound') {
resolve()
} else{
reject(e)
}
})
})
async function createTable(name, schema, bigquery, dataset) {
const result = await bigquery.dataset(dataset).createTable(name, { schema })
console.log(` -- BigQuery Table ${result[0].id} created`)
}

const recreateTable = async (name, schema) => {
console.log(`Dropping and creating table [ ${name} ] in dataset [ ${DATASET_NAME} ] @ Google BigQuery`)
async function deleteTable(name, bigquery, dataset) {
try {
await bigquery.dataset(dataset).table(name).delete()
console.log(` -- BigQuery Table ${name} removed`)
} catch(e) {
if (e.errors[0].reason === 'notFound') {
console.log(` Table ${name} doesn't yet exist - nothing to delete`)
} else {
throw e
}
}
}

await deleteTable(name)
await createTable(name, schema)
async function recreateTable(name, schema, bigquery, dataset) {
console.log(`Dropping and creating table [ ${name} ] in dataset [ ${dataset} ] @ Google BigQuery`)
await deleteTable(name, bigquery, dataset)
await createTable(name, schema, bigquery, dataset)
}

(async () => {
async function main() {
const dbDetails = {
projectID: process.env.PROJECT_ID?.trim(),
datasetName: process.env.DATASET_NAME?.trim(),
txTableName: process.env.TRANSACTION_TABLE_NAME?.trim(),
ledgerTableName: process.env.LEDGER_TABLE_NAME?.trim(),
}

const hasInvalidValue = Object.values(dbDetails).find((value) => {
return typeof value !== 'string' || value.length === 0
}) !== undefined
if (hasInvalidValue) {
console.error('Invalid db args')
process.exit(1)
}

const bigquery = new BigQuery({ projectId: dbDetails.projectID })

await Promise.all([
recreateTable(TRANSACTION_TABLE_NAME, transactionSchema),
recreateTable(LEDGER_TABLE_NAME, ledgerSchema),
recreateTable(dbDetails.txTableName, transactionSchema, bigquery, dbDetails.datasetName),
recreateTable(dbDetails.ledgerTableName, ledgerSchema, bigquery, dbDetails.datasetName),
])

console.log(`Done\n`)
})()
process.exit(0)
}

main().then()
13 changes: 13 additions & 0 deletions exampleService.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[Unit]
Description=Transaction ETL

[Service]
Environment=GOOGLE_APPLICATION_CREDENTIALS=/home/myuser/secret.json NODE=wss://s1.ripple.com:51233 LEDGER=75443456 PROJECT_ID=my-project DATASET_NAME=my_dataset TABLE_NAME=transactions MODE=transactions
ExecStart=/usr/bin/node /path/to/this/repo/index.js
TimeoutStopSec=60
KillSignal=SIGTERM
RestartSec=4320
Restart=on-failure

[Install]
WantedBy=multi-user.target
Loading