diff --git a/lib/codec/rle.js b/lib/codec/rle.js index 947a14f1..04b4bf0e 100644 --- a/lib/codec/rle.js +++ b/lib/codec/rle.js @@ -1,8 +1,8 @@ const varint = require('varint') function encodeRunBitpacked(values, opts) { - if (values.length % 8 !== 0) { - throw 'must be a multiple of 8'; + for (let i = 0; i < values.length % 8; i++) { + values.push(0); } let buf = Buffer.alloc(Math.ceil(opts.bitWidth * (values.length / 8))); @@ -50,39 +50,35 @@ exports.encodeValues = function(type, values, opts) { } let buf = Buffer.alloc(0); - let runs = []; - for (let cur = 0; cur < values.length; cur += 8) { - let repeating = true; - for (let i = 1; i < 8; ++i) { - if (values[cur + i] !== values[cur]) { - repeating = false; + let run = []; + let repeats = 0; + + for (let i = 0; i < values.length; i++) { + // If we are at the beginning of a run and the next value is same we start + // collecting repeated values + if ( repeats === 0 && run.length % 8 === 0 && values[i] === values[i+1]) { + // If we have any data in runs we need to encode them + if (run.length) { + buf = Buffer.concat([buf, encodeRunBitpacked(run, opts)]); + run = []; } + repeats = 1; + } else if (repeats > 0 && values[i] === values[i-1]) { + repeats += 1; + } else { + // If values changes we need to post any previous repeated values + if (repeats) { + buf = Buffer.concat([buf, encodeRunRepeated(values[i-1], repeats, opts)]); + repeats = 0; + } + run.push(values[i]); } - - const append = - runs.length > 0 && - (runs[runs.length - 1][1] !== null) === repeating && - (!repeating || runs[runs.length - 1][1] === values[cur]); - - if (!append) { - runs.push([cur, repeating ? values[cur] : null]); - } - } - - for (let i = values.length - (values.length % 8); i < values.length; ++i) { - runs.push([i, values[i]]); } - for (let i = 0; i < runs.length; ++i) { - const begin = runs[i][0]; - const end = i < runs.length - 1 ? runs[i + 1][0] : values.length; - const rep = runs[i][1]; - - if (rep === null) { - buf = Buffer.concat([buf, encodeRunBitpacked(values.slice(begin, end), opts)]); - } else { - buf = Buffer.concat([buf, encodeRunRepeated(rep, end - begin, opts)]); - } + if (repeats) { + buf = Buffer.concat([buf, encodeRunRepeated(values[values.length-1], repeats, opts)]); + } else if (run.length) { + buf = Buffer.concat([buf, encodeRunBitpacked(run, opts)]); } if (opts.disableEnvelope) { @@ -94,7 +90,7 @@ exports.encodeValues = function(type, values, opts) { buf.copy(envelope, 4); return envelope; -} +}; function decodeRunBitpacked(cursor, count, opts) { if (count % 8 !== 0) { @@ -144,11 +140,11 @@ exports.decodeValues = function(type, cursor, count, opts) { values.push(...decodeRunRepeated(cursor, count, opts)); } } + values = values.slice(0,count); if (values.length !== count) { throw "invalid RLE encoding"; } return values; -} - +}; \ No newline at end of file diff --git a/test/codec_rle.js b/test/codec_rle.js index 183ca8ab..fe0be6fe 100644 --- a/test/codec_rle.js +++ b/test/codec_rle.js @@ -33,6 +33,36 @@ describe('ParquetCodec::RLE', function() { assert.deepEqual(vals, [0, 1, 2, 3, 4, 5, 6, 7]); }); + describe('number of values not a multiple of 8', function() { + it('should encode bitpacked values', function() { + let buf = parquet_codec_rle.encodeValues( + 'INT32', + [0, 1, 2, 3, 4, 5, 6, 7, 6, 5], + { + disableEnvelope: true, + bitWidth: 3 + }); + + assert.deepEqual(buf, new Buffer([0x05, 0x88, 0xc6, 0xfa, 0x2e, 0x00, 0x00])); + }); + + it('should decode bitpacked values', function() { + let vals = parquet_codec_rle.decodeValues( + 'INT32', + { + buffer: new Buffer([0x05, 0x88, 0xc6, 0xfa, 0x2e, 0x00, 0x00]), + offset: 0, + }, + 10, + { + disableEnvelope: true, + bitWidth: 3 + }); + + assert.deepEqual(vals, [0, 1, 2, 3, 4, 5, 6, 7, 6, 5]); + }); + }); + it('should encode repeated values', function() { let buf = parquet_codec_rle.encodeValues( 'INT32',