From 7ea27d3938ee8c165a551435d2705efee351e18c Mon Sep 17 00:00:00 2001 From: Ziggy Jonsson Date: Thu, 1 Feb 2018 21:13:27 -0500 Subject: [PATCH] If there is an error in transform, pass error to callback The error will in turn be emitted as an `error` event --- lib/writer.js | 4 +++- test/integration.js | 26 ++++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/lib/writer.js b/lib/writer.js index f1130e81..0f50aa3d 100644 --- a/lib/writer.js +++ b/lib/writer.js @@ -264,7 +264,9 @@ class ParquetTransformer extends stream.Transform { _transform(row, encoding, callback) { if (row) { - this.writer.appendRow(row).then(callback); + this.writer.appendRow(row) + .then(callback) + .catch(e => callback(e)); } else { callback(); } diff --git a/test/integration.js b/test/integration.js index 5cfb2089..152fa734 100644 --- a/test/integration.js +++ b/test/integration.js @@ -371,6 +371,32 @@ describe('Parquet', function() { istream.pipe(transform).pipe(ostream); }); + it('an error in transform is emitted in stream', async function() { + const opts = { useDataPageV2: true, compression: 'GZIP' }; + let schema = mkTestSchema(opts); + let transform = new parquet.ParquetTransformer(schema, opts); + transform.writer.setMetadata("myuid", "420"); + transform.writer.setMetadata("fnord", "dronf"); + + var ostream = fs.createWriteStream('fruits_stream.parquet'); + let testRows = mkTestRows(); + testRows[4].quantity = 'N/A'; + let istream = objectStream.fromArray(testRows); + return new Promise( (resolve, reject) => { + setTimeout(() => resolve('no_error'),1000); + istream + .pipe(transform) + .on('error', reject) + .pipe(ostream) + .on('finish',resolve); + }) + .then( + () => { throw new Error('Should emit error'); }, + () => undefined + ); + + }); + }); });