Skip to content

Commit

Permalink
Merge branch 'transform-stream' of https://github.com/witq/parquetjs
Browse files Browse the repository at this point in the history
…into transform
  • Loading branch information
asmuth committed Dec 4, 2017
2 parents d517333 + f2bc0bf commit 8c897cc
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 0 deletions.
99 changes: 99 additions & 0 deletions lib/writer.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict';
const fs = require('fs');
const stream = require('stream');
const thrift = require('thrift');
const parquet_thrift = require('../gen-nodejs/parquet_types')
const parquet_shredder = require('./shred')
Expand Down Expand Up @@ -236,6 +237,103 @@ class ParquetEnvelopeWriter {

}

/**
* Create a parquet transform stream that can be piped to any readable stream
*/

class ParquetTransformer extends stream.Transform {
constructor(schema, opts = {}) {
super({
objectMode: true,
highWaterMark: PARQUET_DEFAULT_ROW_GROUP_SIZE,
});
this.schema = schema;
this.rowBuffer = {};
this.rowGroupSize = PARQUET_DEFAULT_ROW_GROUP_SIZE;
this.offset = 0;
this.rowCount = 0;
this.rowGroups = [];
this.pageSize = PARQUET_DEFAULT_PAGE_SIZE;
this.useDataPageV2 = ('useDataPageV2' in opts) ? opts.useDataPageV2 : true;
this.writtenHeader = false;
this.userMetadata = {};
}

_transform(row, encoding, callback) {
if (!this.writtenHeader) {
this.writeHeader();
}
parquet_shredder.shredRecord(this.schema, row, this.rowBuffer);

if (this.rowBuffer.rowCount >= this.rowGroupSize) {
this.writeRowGroup(this.rowBuffer);
this.rowBuffer = {};
}

callback();
}

_flush(callback) {
if (this.rowBuffer.rowCount > 0) {
this.writeRowGroup(this.rowBuffer);
this.rowBuffer = {};
}
this.writeFooter();

callback();
}

writeHeader() {
this.writeSection(Buffer.from(PARQUET_MAGIC));

return this.writtenHeader = true;
}

writeRowGroup(records) {
let rgroup = encodeRowGroup(
this.schema,
records,
{
baseOffset: this.offset,
pageSize: this.pageSize,
useDataPageV2: this.useDataPageV2
});

this.rowCount += records.rowCount;
this.rowGroups.push(rgroup.metadata);

return this.writeSection(rgroup.body);
}

writeSection(buf) {
this.offset += buf.length;

return this.push(buf);
}

writeFooter(userMetadata = {}) {
if (this.rowCount === 0) {
throw 'cannot write parquet file with zero rows';
}

if (this.schema.fieldList.length === 0) {
throw 'cannot write parquet file with zero fieldList';
}

return this.writeSection(
encodeFooter(this.schema, this.rowCount, this.rowGroups, userMetadata)
);
};

setMetadata(key, value) {
this.userMetadata[key.toString()] = value.toString();
}

setPageSize(cnt) {
this.pageSize = cnt;
}
}

/**
* Encode a consecutive array of data using one of the parquet encodings
*/
Expand Down Expand Up @@ -520,5 +618,6 @@ function encodeFooter(schema, rowCount, rowGroups, userMetadata) {
module.exports = {
ParquetEnvelopeWriter,
ParquetWriter,
ParquetTransformer
};

1 change: 1 addition & 0 deletions parquet.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ module.exports = {
ParquetReader: reader.ParquetReader,
ParquetEnvelopeWriter: writer.ParquetEnvelopeWriter,
ParquetWriter: writer.ParquetWriter,
ParquetTransformer: writer.ParquetTransformer,
ParquetSchema: schema.ParquetSchema,
ParquetShredder: shredder
};

0 comments on commit 8c897cc

Please sign in to comment.