-
-
Notifications
You must be signed in to change notification settings - Fork 47
StreamBase
All provided streamers use StreamBase
as their foundation. It is a base class meant to be extended, which provides common facilities used by streamers.
StreamBase
is based on Transform. It operates in object mode consuming a token stream produced by a parser or filters, which will be transformed into a stream of partially assembled JavaScript objects.
This document describes the user-facing interface only. If you want to build your own filter, feel free to inspect the code to gain more insights.
Internally StreamBase
uses Assembler to assemble objects and to keep track of a current state.
options
is an optional object described in details in node.js' Stream documentation. The following optional custom properties are recognized:
-
objectFilter
is an optional function.- If specified, it is used to inspect incomplete objects as they are being assembled.
- It is called in the context of a streamer with one argument: an internal Assembler instance.
- A filter function can return three values:
- Truthy. This response means that we should include this object in an output stream. The object will be assembled from an input stream without further checks.
-
false
. This response means that we should not output this object. A partially assembled object will be discarded, and the rest of the object will be ignored without further checks. - Anything else (usually
undefined
) signifies that the filter has not enough information to make a decision, the next token should be processed, and the filter will be called again on the same object.
- If not specified (the default), all objects are going to be included in an output stream.
-
includeUndecided
is a flag. It controls how to handle objects, which were not decided one way or another.- If it is truthy, an undecided object (already fully assembled) will be included in the output.
- Otherwise (the default), an undecided object will be discarded.
The same options
object is passed to Assembler. See its documentation to understand what properties can be specified.
objectFilter
and token stream filters serve different purposes. The latter edit a token stream, while the former governs assembling individual already selected values. In the end, objectFilter
is an optimization feature.
For example, we want to read in all employee records and select only employees whose department is "accounting".
A simple way (preferred in some cases) is to use stream-chain facilities:
const {chain} = require('stream-chain');
const {parser} = require('stream-json');
const {streamArray} = require('stream-json/streamers/StreamArray');
const fs = require('fs');
const zlib = require('zlib');
const pipeline = chain([
fs.createReadStream('sample.json.gz'),
zlib.createGunzip(),
parser(),
streamArray(),
data => {
if (data.value.department === 'accounting') return data;
// return undefined by default skipping this item
}
]);
Potentially a more efficient way to do the same while assembling objects:
const {chain} = require('stream-chain');
const {parser} = require('stream-json');
const {streamArray} = require('stream-json/streamers/StreamArray');
const fs = require('fs');
const zlib = require('zlib');
const pipeline = chain([
fs.createReadStream('sample.json.gz'),
zlib.createGunzip(),
parser(),
streamArray({objectFilter: asm => {
const value = asm.current; // the value we are working on
// the value can be incomplete, check if we have necessary properties
if (value && typeof value.department == 'string') {
// we have 'department' value and can make the final decision now
return value.department === 'accounting';
// depending on the return value above we made a final decision:
// we accepted or rejected an object,
// now it will be speedily assembled or skipped.
}
// return undefined by default meaning "we are undecided yet"
}})
]);
Now an object can be discarded before being fully assembled. It can save some CPU and memory. OTOH, in the first case our filter is called once per object, while in the second case it can be called multiple times. So there is a trade-off, which should be decided on a case-by-case basis possibly using trials.
Stefan です needed to make a dynamic decision on what kind of container is used for streaming: an array or a dictionary object (see Issue #81). Arrays should be streamed element by element, while dictionary objects should be passed as a whole. stream-json
provides two separate streamers: StreamArray and StreamObject, but they do not provide the exact functionality and user should choose them statically. He ended up writing his own custom streamer (see the comment) included here for convenience (slightly reworked to add exports):
const StreamBase = require("stream-json/streamers/StreamBase")
/**
* Streamer for stream-json that streams objects or arrays.
* - If it's an object: Only a single value (the full assembled object) will be emitted.
* - If it's an array: Values of the array will be emitted.
*
* The code is basically combining StreamArray and StreamObject and adjusted to our needs.
*/
class StreamArrayOrObject extends StreamBase {
static make(options) {
return new StreamArrayOrObject(options);
}
constructor(options) {
super(options);
this._level = 1;
}
_wait(chunk, _, callback) {
if (chunk.name === "startObject") {
this._lastKey = null
// We're assembling the object in this._object
this._object = {}
} else if (chunk.name === "startArray") {
this._counter = 0
} else {
return callback(new Error("Top-level object should be an array or object."))
}
this._transform = this._filter
return this._transform(chunk, _, callback)
}
_push(discard) {
if (this._object) {
// Object in this._object
if (this._lastKey === null) {
this._lastKey = this._assembler.key
} else {
if (!discard) {
// Assemble object from keys and values
this._object[this._lastKey] = this._assembler.current[this._lastKey]
}
this._assembler.current = {}
this._lastKey = null
}
} else {
// Otherwise we're streaming an array
if (this._assembler.current.length) {
this._counter += 1
if (discard) {
this._assembler.current.pop()
} else {
// Push array values directly
this.push(this._assembler.current.pop())
}
}
}
}
_flush(callback) {
// Push single object before end of stream
this._object && this.push(this._object)
callback()
}
}
StreamArrayOrObject.streamArrayOrObject = StreamArrayOrObject.make;
StreamArrayOrObject.make.Constructor = StreamArrayOrObject;
module.exports = StreamArrayOrObject;