From 00ce93e38538adf1a9bb669df699e40c50ee2a55 Mon Sep 17 00:00:00 2001 From: Ziggy Jonsson Date: Mon, 22 Jan 2018 17:51:57 -0500 Subject: [PATCH 1/3] Add the ability to fetch data remote data Adapters included for S3 files and files available over http(s) --- README.md | 33 ++++++++++++++++++++++ lib/reader.js | 77 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 110 insertions(+) diff --git a/README.md b/README.md index 82a47a8c..72abf81d 100644 --- a/README.md +++ b/README.md @@ -114,6 +114,39 @@ avoid leaking file descriptors. await reader.close(); ``` +### Reading data from a url + +Parquet files can be read from a url without having to download the whole file. +You will have to supply the request library as a first argument and the request parameters +as a second argument to the function `parquetReader.openUrl`. + +``` js +const request = require('request'); +let reader = await parquet.ParquetReader.openUrl(request,'https://domain/fruits.parquet'); +``` + +### Reading data from S3 + +Parquet files can be read from an S3 object without having to download the whole file. +You will have to supply the aws-sdk client as first argument and the bucket/key information +as second argument to the function `parquetReader.openS3`. + +``` js +const AWS = require('aws-sdk'); +const client = new AWS.S3({ + accessKeyId: 'xxxxxxxxxxx', + secretAccessKey: 'xxxxxxxxxxx' +}); + +const params = { + Bucket: 'xxxxxxxxxxx', + Key: 'xxxxxxxxxxx' +}; + +let reader = await parquet.ParquetReader.openS3(client,params); +``` + + Encodings --------- diff --git a/lib/reader.js b/lib/reader.js index 43e78d9d..b7bd315c 100644 --- a/lib/reader.js +++ b/lib/reader.js @@ -91,7 +91,31 @@ class ParquetReader { */ static async openFile(filePath) { let envelopeReader = await ParquetEnvelopeReader.openFile(filePath); + return this.openEnvelopeReader(envelopeReader); + } + + /** + * Open the parquet file from S3 using the supplied aws client and params + * The params have to include `Bucket` and `Key` to the file requested + * This function returns a new parquet reader + */ + static async openS3(client, params) { + let envelopeReader = await ParquetEnvelopeReader.openS3(client, params); + return this.openEnvelopeReader(envelopeReader); + } + + /** + * Open the parquet file from a url using the supplied request module + * params should either be a string (url) or an object that includes + * a `url` property. + * This function returns a new parquet reader + */ + static async openUrl(request, params) { + let envelopeReader = await ParquetEnvelopeReader.openUrl(request, params); + return this.openEnvelopeReader(envelopeReader); + } + static async openEnvelopeReader(envelopeReader) { try { await envelopeReader.readHeader(); let metadata = await envelopeReader.readFooter(); @@ -200,6 +224,59 @@ class ParquetEnvelopeReader { return new ParquetEnvelopeReader(readFn, closeFn, fileStat.size); } + static async openS3(client, params) { + let fileStat = await client.headObject(params).promise(); + + let readFn = async (offset, length) => { + let Range = `bytes=${offset}-${offset+length-1}`; + let res = await client.getObject(Object.assign({Range}, params)).promise(); + return res.Body; + }; + + let closeFn = () => ({}); + + return new ParquetEnvelopeReader(readFn, closeFn, fileStat.ContentLength); + } + + static async openUrl(request, params) { + if (typeof params === 'string') + params = {url: params}; + if (!params.url) + throw new Error('URL missing'); + + params.encoding = params.encoding || null; + + let defaultHeaders = params.headers || {}; + + let filesize = await new Promise( (resolve, reject) => { + let req = request(params); + req.on('response', res => { + req.abort(); + resolve(res.headers['content-length']); + }); + req.on('error', reject); + }); + + let readFn = (offset, length) => { + let range = `bytes=${offset}-${offset+length-1}`; + let headers = Object.assign({}, defaultHeaders, {range}); + let req = Object.assign({}, params, {headers}); + return new Promise( (resolve, reject) => { + request(req, (err, res) => { + if (err) { + reject(err); + } else { + resolve(res.body); + } + }); + }); + }; + + let closeFn = () => ({}); + + return new ParquetEnvelopeReader(readFn, closeFn, filesize); + } + constructor(readFn, closeFn, fileSize) { this.read = readFn; this.close = closeFn; From cc67213dd41d4891cce034e5c1027df5119789a6 Mon Sep 17 00:00:00 2001 From: Ziggy Jonsson Date: Wed, 31 Jan 2018 17:08:39 -0500 Subject: [PATCH 2/3] Add openBuffer --- README.md | 8 ++++++++ lib/reader.js | 11 +++++++++++ 2 files changed, 19 insertions(+) diff --git a/README.md b/README.md index 72abf81d..89de90cc 100644 --- a/README.md +++ b/README.md @@ -146,6 +146,14 @@ const params = { let reader = await parquet.ParquetReader.openS3(client,params); ``` +### Reading data from a buffer + +If the complete parquet file is in buffer it can be read directly from memory without incurring any additional I/O. + +``` js +const file = fs.readFileSync('fruits.parquet'); +let reader = await parquet.ParquetReader.openBuffer(file); +``` Encodings --------- diff --git a/lib/reader.js b/lib/reader.js index b7bd315c..17e9d3b5 100644 --- a/lib/reader.js +++ b/lib/reader.js @@ -94,6 +94,11 @@ class ParquetReader { return this.openEnvelopeReader(envelopeReader); } + static async openBuffer(buffer) { + let envelopeReader = await ParquetEnvelopeReader.openBuffer(buffer); + return this.openEnvelopeReader(envelopeReader); + } + /** * Open the parquet file from S3 using the supplied aws client and params * The params have to include `Bucket` and `Key` to the file requested @@ -224,6 +229,12 @@ class ParquetEnvelopeReader { return new ParquetEnvelopeReader(readFn, closeFn, fileStat.size); } + static async openBuffer(buffer) { + let readFn = (offset, length) => buffer.slice(offset,offset+length); + let closeFn = () => ({}); + return new ParquetEnvelopeReader(readFn, closeFn, buffer.length); + } + static async openS3(client, params) { let fileStat = await client.headObject(params).promise(); From e3c70dfe3752b302e067e5e1b57a599b59cfd890 Mon Sep 17 00:00:00 2001 From: Ziggy Jonsson Date: Mon, 12 Feb 2018 16:38:09 -0500 Subject: [PATCH 3/3] Allow this.fileSize to be an async function as well Instantiating a new client blocks on retrieving filesize. But there are cases when we really don't need the filesize, for example when we have the metadata cached already. --- lib/reader.js | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/lib/reader.js b/lib/reader.js index 17e9d3b5..9704281b 100644 --- a/lib/reader.js +++ b/lib/reader.js @@ -236,7 +236,7 @@ class ParquetEnvelopeReader { } static async openS3(client, params) { - let fileStat = await client.headObject(params).promise(); + let fileStat = async () => client.headObject(params).promise().then(d => d.ContentLength); let readFn = async (offset, length) => { let Range = `bytes=${offset}-${offset+length-1}`; @@ -246,7 +246,7 @@ class ParquetEnvelopeReader { let closeFn = () => ({}); - return new ParquetEnvelopeReader(readFn, closeFn, fileStat.ContentLength); + return new ParquetEnvelopeReader(readFn, closeFn, fileStat); } static async openUrl(request, params) { @@ -259,7 +259,7 @@ class ParquetEnvelopeReader { let defaultHeaders = params.headers || {}; - let filesize = await new Promise( (resolve, reject) => { + let filesize = async () => new Promise( (resolve, reject) => { let req = request(params); req.on('response', res => { req.abort(); @@ -350,6 +350,9 @@ class ParquetEnvelopeReader { } async readFooter() { + if (typeof this.fileSize === 'function') { + this.fileSize = await this.fileSize(); + } let trailerLen = PARQUET_MAGIC.length + 4; let trailerBuf = await this.read(this.fileSize - trailerLen, trailerLen);