-
-
Notifications
You must be signed in to change notification settings - Fork 47
Recipe: splitting objects
Frequently we need to split objects to have a different pipeline for different properties. Obviously, it could be done at a data processing level but it will complicate the code. It could be implemented as separate programs that pick different features but we'll spend extra time parsing the same stream over and over.
The problem was stated in #99.
One way to improve the separation is to split at the stream level. For example, it can be done after parsing.
We have data in data-99.json
(clearly fake to demonstrate the principle):
{
"results": [
{"data": "data #1", "metadata": "metadata #1", "otherdata": "otherdata #1"},
{"data": "data #2", "metadata": "metadata #2", "otherdata": "otherdata #2"},
{"data": "data #3", "metadata": "metadata #3", "otherdata": "otherdata #3"}
]
}
We want to process data
, metadata
, and otherdata
differently ignoring the rest. For the sake of argument, we want to save them in different files as JSONL for further processing.
It can be done with this simple program:
const fs = require('fs');
const {chain} = require('stream-chain');
const {parser} = require('stream-json');
const {pick} = require('stream-json/filters/Pick');
const {streamValues} = require('stream-json/streamers/StreamValues');
const {stringer} = require('stream-json/jsonl/Stringer');
const processSection = (source, sectionName) =>
chain([
source,
pick({filter: new RegExp('\\b' + sectionName + '\\b')}),
streamValues(),
data => (console.log(sectionName.toUpperCase() + ':', data), data.value)
// , stringer(),
// fs.createWriteStream(`data-99-${sectionName}-sample.jsonl`)
]);
const main = () => {
const parsed = chain([
fs.createReadStream('./data-99.json', {encoding: 'utf8'}),
parser(),
pick({filter: 'results'})
]);
// process different parts in parallel
processSection(parsed, 'data');
processSection(parsed, 'metadata');
processSection(parsed, 'otherdata');
};
try {
main();
} catch (error) {
console.error('ERROR:', error);
}
It produces the following output:
DATA: { key: 0, value: 'data #1' }
METADATA: { key: 0, value: 'metadata #1' }
OTHERDATA: { key: 0, value: 'otherdata #1' }
DATA: { key: 1, value: 'data #2' }
METADATA: { key: 1, value: 'metadata #2' }
OTHERDATA: { key: 1, value: 'otherdata #2' }
DATA: { key: 2, value: 'data #3' }
METADATA: { key: 2, value: 'metadata #3' }
OTHERDATA: { key: 2, value: 'otherdata #3' }
Note that in some cases the order of DATA
, METADATA
, and OTHERDATA
parts can be different. The only constant is that data #1
will go always before data #2
and so on. In most cases, e.g., writing to a file, it doesn't matter.
The idea is to separate a stream into substreams and combine them accordingly.
In this particular example we have 4 substreams:
- The main one reads from a file, parses it, and picks the part we need.
- Optionally it can clean up data more using
ignore()
,replace()
, morepick()
, and so on. - It may have more custom steps or streamers like
streamArray()
. - The idea is that it produces a stream in a format that can be processed directly by other substreams.
- Optionally it can clean up data more using
- 3 processing substreams to continue processing different parts of the stream produced by the main substream.
- In this particular case they pick a relevant subobject and process it.
- The processing can do anything. In the example, we print data and save it in a separate file.
The important part here is that to split a stream we don't need to do anything special. We just connect our processing substreams in parallel to the main substream.
While splitting a stream is trivial, joining them back can be more difficult. You may consider my other micro-library for that: stream-join.