Skip to content

Commit

Permalink
Fix RLE encode/decode
Browse files Browse the repository at this point in the history
* bitpacking should work for any length of data, not just multiple of 8 (last packed is padded if less than 8)

* Improve runs estimation - only start a new run if we are at a mod 8 === 0, otherwise use bitpacking
  • Loading branch information
ZJONSSON committed Mar 1, 2018
1 parent 1fa58b5 commit 07fb2fd
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 34 deletions.
64 changes: 30 additions & 34 deletions lib/codec/rle.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
const varint = require('varint')

function encodeRunBitpacked(values, opts) {
if (values.length % 8 !== 0) {
throw 'must be a multiple of 8';
for (let i = 0; i < values.length % 8; i++) {
values.push(0);
}

let buf = Buffer.alloc(Math.ceil(opts.bitWidth * (values.length / 8)));
Expand Down Expand Up @@ -50,39 +50,35 @@ exports.encodeValues = function(type, values, opts) {
}

let buf = Buffer.alloc(0);
let runs = [];
for (let cur = 0; cur < values.length; cur += 8) {
let repeating = true;
for (let i = 1; i < 8; ++i) {
if (values[cur + i] !== values[cur]) {
repeating = false;
let run = [];
let repeats = 0;

for (let i = 0; i < values.length; i++) {
// If we are at the beginning of a run and the next value is same we start
// collecting repeated values
if ( repeats === 0 && run.length % 8 === 0 && values[i] === values[i+1]) {
// If we have any data in runs we need to encode them
if (run.length) {
buf = Buffer.concat([buf, encodeRunBitpacked(run, opts)]);
run = [];
}
repeats = 1;
} else if (repeats > 0 && values[i] === values[i-1]) {
repeats += 1;
} else {
// If values changes we need to post any previous repeated values
if (repeats) {
buf = Buffer.concat([buf, encodeRunRepeated(values[i-1], repeats, opts)]);
repeats = 0;
}
run.push(values[i]);
}

const append =
runs.length > 0 &&
(runs[runs.length - 1][1] !== null) === repeating &&
(!repeating || runs[runs.length - 1][1] === values[cur]);

if (!append) {
runs.push([cur, repeating ? values[cur] : null]);
}
}

for (let i = values.length - (values.length % 8); i < values.length; ++i) {
runs.push([i, values[i]]);
}

for (let i = 0; i < runs.length; ++i) {
const begin = runs[i][0];
const end = i < runs.length - 1 ? runs[i + 1][0] : values.length;
const rep = runs[i][1];

if (rep === null) {
buf = Buffer.concat([buf, encodeRunBitpacked(values.slice(begin, end), opts)]);
} else {
buf = Buffer.concat([buf, encodeRunRepeated(rep, end - begin, opts)]);
}
if (repeats) {
buf = Buffer.concat([buf, encodeRunRepeated(values[values.length-1], repeats, opts)]);
} else if (run.length) {
buf = Buffer.concat([buf, encodeRunBitpacked(run, opts)]);
}

if (opts.disableEnvelope) {
Expand All @@ -94,7 +90,7 @@ exports.encodeValues = function(type, values, opts) {
buf.copy(envelope, 4);

return envelope;
}
};

function decodeRunBitpacked(cursor, count, opts) {
if (count % 8 !== 0) {
Expand Down Expand Up @@ -144,11 +140,11 @@ exports.decodeValues = function(type, cursor, count, opts) {
values.push(...decodeRunRepeated(cursor, count, opts));
}
}
values = values.slice(0,count);

if (values.length !== count) {
throw "invalid RLE encoding";
}

return values;
}

};
30 changes: 30 additions & 0 deletions test/codec_rle.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,36 @@ describe('ParquetCodec::RLE', function() {
assert.deepEqual(vals, [0, 1, 2, 3, 4, 5, 6, 7]);
});

describe('number of values not a multiple of 8', function() {
it('should encode bitpacked values', function() {
let buf = parquet_codec_rle.encodeValues(
'INT32',
[0, 1, 2, 3, 4, 5, 6, 7, 6, 5],
{
disableEnvelope: true,
bitWidth: 3
});

assert.deepEqual(buf, new Buffer([0x05, 0x88, 0xc6, 0xfa, 0x2e, 0x00, 0x00]));
});

it('should decode bitpacked values', function() {
let vals = parquet_codec_rle.decodeValues(
'INT32',
{
buffer: new Buffer([0x05, 0x88, 0xc6, 0xfa, 0x2e, 0x00, 0x00]),
offset: 0,
},
10,
{
disableEnvelope: true,
bitWidth: 3
});

assert.deepEqual(vals, [0, 1, 2, 3, 4, 5, 6, 7, 6, 5]);
});
});

it('should encode repeated values', function() {
let buf = parquet_codec_rle.encodeValues(
'INT32',
Expand Down

0 comments on commit 07fb2fd

Please sign in to comment.