From 7ea27d3938ee8c165a551435d2705efee351e18c Mon Sep 17 00:00:00 2001 From: Ziggy Jonsson Date: Thu, 1 Feb 2018 21:13:27 -0500 Subject: [PATCH 1/2] If there is an error in transform, pass error to callback The error will in turn be emitted as an `error` event --- lib/writer.js | 4 +++- test/integration.js | 26 ++++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/lib/writer.js b/lib/writer.js index f1130e81..0f50aa3d 100644 --- a/lib/writer.js +++ b/lib/writer.js @@ -264,7 +264,9 @@ class ParquetTransformer extends stream.Transform { _transform(row, encoding, callback) { if (row) { - this.writer.appendRow(row).then(callback); + this.writer.appendRow(row) + .then(callback) + .catch(e => callback(e)); } else { callback(); } diff --git a/test/integration.js b/test/integration.js index 5cfb2089..152fa734 100644 --- a/test/integration.js +++ b/test/integration.js @@ -371,6 +371,32 @@ describe('Parquet', function() { istream.pipe(transform).pipe(ostream); }); + it('an error in transform is emitted in stream', async function() { + const opts = { useDataPageV2: true, compression: 'GZIP' }; + let schema = mkTestSchema(opts); + let transform = new parquet.ParquetTransformer(schema, opts); + transform.writer.setMetadata("myuid", "420"); + transform.writer.setMetadata("fnord", "dronf"); + + var ostream = fs.createWriteStream('fruits_stream.parquet'); + let testRows = mkTestRows(); + testRows[4].quantity = 'N/A'; + let istream = objectStream.fromArray(testRows); + return new Promise( (resolve, reject) => { + setTimeout(() => resolve('no_error'),1000); + istream + .pipe(transform) + .on('error', reject) + .pipe(ostream) + .on('finish',resolve); + }) + .then( + () => { throw new Error('Should emit error'); }, + () => undefined + ); + + }); + }); }); From 3e75d9604e8fa88180cfef9e722c12639eb2fb6e Mon Sep 17 00:00:00 2001 From: Ziggy Jonsson Date: Wed, 7 Feb 2018 16:43:03 -0500 Subject: [PATCH 2/2] Also ensure errors in `this.writer.close` that are not sent to the callback are captured subsequently and included in the callback. --- lib/writer.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/writer.js b/lib/writer.js index 0f50aa3d..0c2f2c83 100644 --- a/lib/writer.js +++ b/lib/writer.js @@ -265,15 +265,15 @@ class ParquetTransformer extends stream.Transform { _transform(row, encoding, callback) { if (row) { this.writer.appendRow(row) - .then(callback) - .catch(e => callback(e)); + .then(d => callback(null,d), callback); } else { callback(); } } _flush(callback) { - this.writer.close(callback); + this.writer.close(callback) + .then(d => callback(null, d), callback); } }