diff --git a/lib/writer.js b/lib/writer.js index bd23a805..9d27c398 100644 --- a/lib/writer.js +++ b/lib/writer.js @@ -1,5 +1,6 @@ 'use strict'; const fs = require('fs'); +const stream = require('stream'); const thrift = require('thrift'); const parquet_thrift = require('../gen-nodejs/parquet_types') const parquet_shredder = require('./shred') @@ -236,6 +237,103 @@ class ParquetEnvelopeWriter { } +/** + * Create a parquet transform stream that can be piped to any readable stream + */ + +class ParquetTransformer extends stream.Transform { + constructor(schema, opts = {}) { + super({ + objectMode: true, + highWaterMark: PARQUET_DEFAULT_ROW_GROUP_SIZE, + }); + this.schema = schema; + this.rowBuffer = {}; + this.rowGroupSize = PARQUET_DEFAULT_ROW_GROUP_SIZE; + this.offset = 0; + this.rowCount = 0; + this.rowGroups = []; + this.pageSize = PARQUET_DEFAULT_PAGE_SIZE; + this.useDataPageV2 = ('useDataPageV2' in opts) ? opts.useDataPageV2 : true; + this.writtenHeader = false; + this.userMetadata = {}; + } + + _transform(row, encoding, callback) { + if (!this.writtenHeader) { + this.writeHeader(); + } + parquet_shredder.shredRecord(this.schema, row, this.rowBuffer); + + if (this.rowBuffer.rowCount >= this.rowGroupSize) { + this.writeRowGroup(this.rowBuffer); + this.rowBuffer = {}; + } + + callback(); + } + + _flush(callback) { + if (this.rowBuffer.rowCount > 0) { + this.writeRowGroup(this.rowBuffer); + this.rowBuffer = {}; + } + this.writeFooter(); + + callback(); + } + + writeHeader() { + this.writeSection(Buffer.from(PARQUET_MAGIC)); + + return this.writtenHeader = true; + } + + writeRowGroup(records) { + let rgroup = encodeRowGroup( + this.schema, + records, + { + baseOffset: this.offset, + pageSize: this.pageSize, + useDataPageV2: this.useDataPageV2 + }); + + this.rowCount += records.rowCount; + this.rowGroups.push(rgroup.metadata); + + return this.writeSection(rgroup.body); + } + + writeSection(buf) { + this.offset += buf.length; + + return this.push(buf); + } + + writeFooter(userMetadata = {}) { + if (this.rowCount === 0) { + throw 'cannot write parquet file with zero rows'; + } + + if (this.schema.fieldList.length === 0) { + throw 'cannot write parquet file with zero fieldList'; + } + + return this.writeSection( + encodeFooter(this.schema, this.rowCount, this.rowGroups, userMetadata) + ); + }; + + setMetadata(key, value) { + this.userMetadata[key.toString()] = value.toString(); + } + + setPageSize(cnt) { + this.pageSize = cnt; + } +} + /** * Encode a consecutive array of data using one of the parquet encodings */ @@ -520,5 +618,6 @@ function encodeFooter(schema, rowCount, rowGroups, userMetadata) { module.exports = { ParquetEnvelopeWriter, ParquetWriter, + ParquetTransformer }; diff --git a/parquet.js b/parquet.js index 0b9ebd3c..9ac590f4 100644 --- a/parquet.js +++ b/parquet.js @@ -8,6 +8,7 @@ module.exports = { ParquetReader: reader.ParquetReader, ParquetEnvelopeWriter: writer.ParquetEnvelopeWriter, ParquetWriter: writer.ParquetWriter, + ParquetTransformer: writer.ParquetTransformer, ParquetSchema: schema.ParquetSchema, ParquetShredder: shredder };