Skip to content

Commit

Permalink
Add stream block event
Browse files Browse the repository at this point in the history
This is useful for example to access how many values were de/encoded.
  • Loading branch information
mtth committed May 2, 2021
1 parent 69599b1 commit edb727e
Show file tree
Hide file tree
Showing 4 changed files with 2,736 additions and 180 deletions.
17 changes: 13 additions & 4 deletions lib/containers.js
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ BlockDecoder.prototype._writeChunk = function (chunk, encoding, cb) {
nBlocks++;
this._decompress(
block.data,
this._createBlockCallback(block.count, chunkCb)
this._createBlockCallback(block.data.length, block.count, chunkCb)
);
}
chunkCb();
Expand All @@ -255,7 +255,7 @@ BlockDecoder.prototype._writeChunk = function (chunk, encoding, cb) {
}
};

BlockDecoder.prototype._createBlockCallback = function (count, cb) {
BlockDecoder.prototype._createBlockCallback = function (size, count, cb) {
var self = this;
var index = this._index++;

Expand All @@ -266,6 +266,7 @@ BlockDecoder.prototype._createBlockCallback = function (count, cb) {
self.emit('error', err);
cb();
} else {
self.emit('block', new BlockInfo(count, data.length, size));
self._queue.push(new BlockData(index, data, cb, count));
if (self._needPush) {
self._read();
Expand Down Expand Up @@ -525,7 +526,7 @@ BlockEncoder.prototype._write = function (val, encoding, cb) {
BlockEncoder.prototype._flushChunk = function (pos, cb) {
var tap = this._tap;
pos = pos || tap.pos;
this._compress(tap.buf.slice(0, pos), this._createBlockCallback(cb));
this._compress(tap.buf.slice(0, pos), this._createBlockCallback(pos, cb));
this._blockCount = 0;
};

Expand All @@ -551,7 +552,7 @@ BlockEncoder.prototype._read = function () {
}
};

BlockEncoder.prototype._createBlockCallback = function (cb) {
BlockEncoder.prototype._createBlockCallback = function (size, cb) {
var self = this;
var index = this._index++;
var count = this._blockCount;
Expand All @@ -565,6 +566,7 @@ BlockEncoder.prototype._createBlockCallback = function (cb) {
return;
}
self._pending--;
self.emit('block', new BlockInfo(count, size, data.length));
self._queue.push(new BlockData(index, data, cb, count));
if (self._needPush) {
self._needPush = false;
Expand All @@ -576,6 +578,13 @@ BlockEncoder.prototype._createBlockCallback = function (cb) {

// Helpers.

/** Summary information about a block. */
function BlockInfo(count, raw, compressed) {
this.valueCount = count;
this.rawDataLength = raw;
this.compressedDataLength = compressed;
}

/**
* An indexed block.
*
Expand Down
Loading

0 comments on commit edb727e

Please sign in to comment.