Skip to content

Commit

Permalink
simplified ParquetTransform
Browse files Browse the repository at this point in the history
  • Loading branch information
asmuth committed Dec 4, 2017
1 parent 8c897cc commit eb396f4
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 100 deletions.
122 changes: 32 additions & 90 deletions lib/writer.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,7 @@ class ParquetWriter {
outputStream,
opts);

try {
await envelopeWriter.writeHeader();
return new ParquetWriter(schema, envelopeWriter, opts);
} catch (err) {
envelopeWriter.close();
throw err;
}
return new ParquetWriter(schema, envelopeWriter, opts);
}

/**
Expand All @@ -79,6 +73,13 @@ class ParquetWriter {
this.rowGroupSize = opts.rowGroupSize || PARQUET_DEFAULT_ROW_GROUP_SIZE;
this.closed = false;
this.userMetadata = {};

try {
envelopeWriter.writeHeader();
} catch (err) {
envelopeWriter.close();
throw err;
}
}

/**
Expand All @@ -104,7 +105,7 @@ class ParquetWriter {
* method twice on the same object or add any rows after the close() method has
* been called
*/
async close() {
async close(callback) {
if (this.closed) {
throw 'writer was closed';
}
Expand All @@ -119,6 +120,10 @@ class ParquetWriter {
await this.envelopeWriter.writeFooter(this.userMetadata);
await this.envelopeWriter.close();
this.envelopeWriter = null;

if (callback) {
callback();
}
}

/**
Expand Down Expand Up @@ -238,100 +243,37 @@ class ParquetEnvelopeWriter {
}

/**
* Create a parquet transform stream that can be piped to any readable stream
* Create a parquet transform stream
*/

class ParquetTransformer extends stream.Transform {

constructor(schema, opts = {}) {
super({
objectMode: true,
highWaterMark: PARQUET_DEFAULT_ROW_GROUP_SIZE,
});
this.schema = schema;
this.rowBuffer = {};
this.rowGroupSize = PARQUET_DEFAULT_ROW_GROUP_SIZE;
this.offset = 0;
this.rowCount = 0;
this.rowGroups = [];
this.pageSize = PARQUET_DEFAULT_PAGE_SIZE;
this.useDataPageV2 = ('useDataPageV2' in opts) ? opts.useDataPageV2 : true;
this.writtenHeader = false;
this.userMetadata = {};
}
super({ objectMode: true });

_transform(row, encoding, callback) {
if (!this.writtenHeader) {
this.writeHeader();
}
parquet_shredder.shredRecord(this.schema, row, this.rowBuffer);

if (this.rowBuffer.rowCount >= this.rowGroupSize) {
this.writeRowGroup(this.rowBuffer);
this.rowBuffer = {};
}
let writeProxy = (function(t) {
return function(b) {
t.push(b);
}
})(this);

callback();
this.writer = new ParquetWriter(
schema,
new ParquetEnvelopeWriter(schema, writeProxy, function() {}, 0, opts),
opts);
}

_flush(callback) {
if (this.rowBuffer.rowCount > 0) {
this.writeRowGroup(this.rowBuffer);
this.rowBuffer = {};
_transform(row, encoding, callback) {
if (row) {
this.writer.appendRow(row).then(callback);
} else {
callback();
}
this.writeFooter();

callback();
}

writeHeader() {
this.writeSection(Buffer.from(PARQUET_MAGIC));

return this.writtenHeader = true;
}

writeRowGroup(records) {
let rgroup = encodeRowGroup(
this.schema,
records,
{
baseOffset: this.offset,
pageSize: this.pageSize,
useDataPageV2: this.useDataPageV2
});

this.rowCount += records.rowCount;
this.rowGroups.push(rgroup.metadata);

return this.writeSection(rgroup.body);
}

writeSection(buf) {
this.offset += buf.length;

return this.push(buf);
_flush(callback) {
this.writer.close(callback);
}

writeFooter(userMetadata = {}) {
if (this.rowCount === 0) {
throw 'cannot write parquet file with zero rows';
}

if (this.schema.fieldList.length === 0) {
throw 'cannot write parquet file with zero fieldList';
}

return this.writeSection(
encodeFooter(this.schema, this.rowCount, this.rowGroups, userMetadata)
);
};

setMetadata(key, value) {
this.userMetadata[key.toString()] = value.toString();
}

setPageSize(cnt) {
this.pageSize = cnt;
}
}

/**
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"bson": "^1.0.4",
"int53": "^0.2.4",
"lzo": "^0.4.0",
"object-stream": "0.0.1",
"snappyjs": "^0.6.0",
"thrift": "^0.10.0",
"varint": "^5.0.0"
Expand Down
55 changes: 45 additions & 10 deletions test/integration.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
'use strict';
const chai = require('chai');
const fs = require('fs');
const os = require('os');
const assert = chai.assert;
const parquet = require('../parquet.js');
const objectStream = require('object-stream');

const TEST_NUM_ROWS = 10000;
const TEST_VTIME = new Date();

// write a new file 'fruits.parquet'
async function writeTestFile(opts) {
let schema = new parquet.ParquetSchema({
function mkTestSchema(opts) {
return new parquet.ParquetSchema({
name: { type: 'UTF8', compression: opts.compression },
//quantity: { type: 'INT64', encoding: 'RLE', typeLength: 6, optional: true, compression: opts.compression }, // parquet-mr actually doesnt support this
quantity: { type: 'INT64', optional: true, compression: opts.compression },
Expand All @@ -27,13 +29,13 @@ async function writeTestFile(opts) {
colour: { type: 'UTF8', repeated: true, compression: opts.compression },
meta_json: { type: 'BSON', optional: true, compression: opts.compression },
});
};

let writer = await parquet.ParquetWriter.openFile(schema, 'fruits.parquet', opts);
writer.setMetadata("myuid", "420");
writer.setMetadata("fnord", "dronf");
function mkTestRows(opts) {
let rows = [];

for (let i = 0; i < TEST_NUM_ROWS; ++i) {
await writer.appendRow({
rows.push({
name: 'apples',
quantity: 10,
price: 2.6,
Expand All @@ -48,7 +50,7 @@ async function writeTestFile(opts) {
colour: [ 'green', 'red' ]
});

await writer.appendRow({
rows.push({
name: 'oranges',
quantity: 20,
price: 2.7,
Expand All @@ -63,7 +65,7 @@ async function writeTestFile(opts) {
colour: [ 'orange' ]
});

await writer.appendRow({
rows.push({
name: 'kiwi',
price: 4.2,
day: new Date('2017-11-26'),
Expand All @@ -78,7 +80,7 @@ async function writeTestFile(opts) {
meta_json: { expected_ship_date: TEST_VTIME }
});

await writer.appendRow({
rows.push({
name: 'banana',
price: 3.2,
day: new Date('2017-11-26'),
Expand All @@ -90,6 +92,22 @@ async function writeTestFile(opts) {
});
}

return rows;
}

async function writeTestFile(opts) {
let schema = mkTestSchema(opts);

let writer = await parquet.ParquetWriter.openFile(schema, 'fruits.parquet', opts);
writer.setMetadata("myuid", "420");
writer.setMetadata("fnord", "dronf");

let rows = mkTestRows(opts);

for (let row of rows) {
await writer.appendRow(row);
}

await writer.close();
}

Expand Down Expand Up @@ -338,4 +356,21 @@ describe('Parquet', function() {

});

describe('using the Stream/Transform API', function() {

it('write a test file', async function() {
const opts = { useDataPageV2: true, compression: 'GZIP' };
let schema = mkTestSchema(opts);
let transform = new parquet.ParquetTransformer(schema, opts);
transform.writer.setMetadata("myuid", "420");
transform.writer.setMetadata("fnord", "dronf");

var ostream = fs.createWriteStream('fruits_stream.parquet');
let istream = objectStream.fromArray(mkTestRows());
istream.pipe(transform).pipe(ostream);
});

});

});

0 comments on commit eb396f4

Please sign in to comment.