-
-
Notifications
You must be signed in to change notification settings - Fork 11
Intro
Just examples. Think you would do them without stream-chain
.
const Chain = require('stream-chain');
const fs = require('fs');
const zlib = require('zlib');
const {Transform} = require('stream');
// the chain will work on a stream of number objects
const chain = new Chain([
// transforms a value
x => x * x,
// returns several values
x => [x - 1, x, x + 1],
// waits for an asynchronous operation
async x => await getTotalFromDatabaseByKey(x),
// returns multiple values with a generator
function* (x) {
for (let i = x; i >= 0; --i) {
yield i;
}
},
// filters out even values
x => x % 2 ? x : null,
// uses an arbitrary transform stream
new Transform({
writableObjectMode: true,
transform(x, _, callback) {
// transform to text
callback(null, x.toString());
}
}),
// compress
zlib.createGzip()
]);
// log errors
chain.on('error', error => console.log(error));
// use the chain, and save the result to a file
dataSource.pipe(chain).pipe(fs.createWriteStream('output.txt.gz'));
const {chain} = require('stream-chain');
const family = chain([
async function*(person) {
yield person;
// asynchronously retrieve parents
if (person.father) {
yield await getPersonFromDB(person.father);
}
if (person.mother) {
yield await getPersonFromDB(person.mother);
}
// asynchronously retrieve children, if any
for (let i = 0; i < person.children; ++i) {
yield await getPersonFromDB(person.children[i]);
}
},
new Transform({
writableObjectMode: true,
transform(x, _, callback) {
// transform to text
callback(null, JSON.stringify(x));
}
}),
zlib.createGzip(),
fs.createWriteStream('families.json-stream.gz')
]);
people.pipe(family);
A block of regular functions can be separated and included in an array. Functions in such block will be combined without using streams to improve the performance.
const lessEfficient = chain([
x => x * x,
x => 2 * x
// ... more stages
]);
const moreEfficient = chain([
[
x => x * x,
x => 2 * x
]
// ... more stages
]);
Returning Chain.none
terminates the pipeline. In the example below, it is used to filter out all even values.
const {chain, none, final} = require('stream-chain');
const pipeline = chain([
[
x => x % 2 ? x : none,
x => x * x,
x => 2 * x
]
// ... more stages
]);
// input: 1, 2, 3, 4
// output: 2, 18
Wrapping value in final()
terminates a pipeline and uses the value as the final result. The example below does not double odd values.
const {chain, none, final} = require('stream-chain');
const pipeline = chain([
[
x => x * x,
x => x % 2 ? final(x) : x,
x => 2 * x
]
// ... more stages
]);
// input: 1, 2, 3, 4
// output: 1, 8, 9, 32
This example processes only 5 items from the beginning of a stream.
const take = require('stream-json/utils/take');
const pipeline = chain([
take(5)
// ... more stages
]);
This example skips 5 items from the beginning of a stream.
const skip = require('stream-json/utils/skip');
const pipeline = chain([
skip(5)
// ... more stages
]);
This example skips 5 items from the beginning of a stream and takes the next 5.
const lessEfficient = chain([
skip(5),
take(5)
// ... more stages
]);
const moreEfficient = chain([
take({n: 5, skip: 5})
// ... more stages
]);
Takes while a condition is true.
const takeWhile = require('stream-json/utils/takeWhile');
const pipeline = chain([
takeWhile(item => item !== 'separator')
// ... more stages
]);
Skips while a condition is true.
const skipWhile = require('stream-json/utils/skipWhile');
const pipeline = chain([
skipWhile(item => item !== 'separator')
// ... more stages
]);
Processes data between first two separators.
const pipeline = chain([
skipWhile(item => item !== 'separator'),
skip(1), // skip the separator
takeWhile(item => item !== 'separator')
// ... more stages
]);
It is the same as reduce() in JavaScript's arrays.
const fold = require('stream-json/utils/fold');
const pipeline = chain([
fold((acc, x) => acc + x, 0)
// ... more stages
]);
// input: 1, 2, 3
// output: 6
scan()
is like fold()
but outputs all intermediate values of its accumulator.
const scan = require('stream-json/utils/scan');
const pipeline = chain([
scan((acc, x) => acc + x, 0)
// ... more stages
]);
// input: 1, 2, 3
// output: 1, 3, 6
Reduce
is a Writable stream, which is used at the end of a pipeline to accumulate items. Its accumulator is available as a property. It can be used like fold()
and scan()
.
const {reduce} = require('stream-json/utils/Reduce');
const toArray = reduce((acc, x) => {
acc.push(x);
return acc;
}, []);
const pipeline = chain([
// ... more stages
toArray
]);
// input: 1, 2, 3
// toArray.accumulator is [1, 2, 3]
Unlike array-combined functions comp()
can combine asynchronous and regular functions and generators. It allows functions to return multiple values wrapped in many()
. Of course, none
and final()
works too.
const {comp} = require('stream-chain/utils/comp');
const pipeline = chain([
comp(
function*(x) {
yield x;
yield 10 * x;
},
x => 2 * x
)
// ... more stages
]);
// input: 1, 2, 3
// output: 2, 20, 4, 40, 6, 60
const {chain, many} = require('stream-chain');
const {comp} = require('stream-chain/utils/comp');
const pipeline = chain([
comp(
x => many([x, 10 * x]),
x => 2 * x
)
// ... more stages
]);
// input: 1, 2, 3
// output: 2, 20, 4, 40, 6, 60
const pipeline = chain([
comp(
async x => await getItemNumberFromDB(x),
x => 2 * x
)
// ... more stages
]);
Make a separate function has its benefits — you can use it with streams or without streams. comp.asFun()
returns an asynchronous function.
const doubler = comp.asFun(
async x => await getItemNumberFromDB(x),
x => 2 * x
);
const pipeline = chain([
doubler
// ... more stages
]);
doubler(42)
.then(value => console.log(value))
.catch(error => console.error(error));