Skip to content
Eugene Lazutkin edited this page Dec 8, 2018 · 12 revisions

Just examples. Think you would do them without stream-chain.

Simple pipeline

const Chain = require('stream-chain');

const fs = require('fs');
const zlib = require('zlib');
const {Transform} = require('stream');

// the chain will work on a stream of number objects
const chain = new Chain([
  // transforms a value
  x => x * x,
  // returns several values
  x => [x - 1, x, x + 1],
  // waits for an asynchronous operation
  async x => await getTotalFromDatabaseByKey(x),
  // returns multiple values with a generator
  function* (x) {
    for (let i = x; i >= 0; --i) {
      yield i;
    }
  },
  // filters out even values
  x => x % 2 ? x : null,
  // uses an arbitrary transform stream
  new Transform({
    writableObjectMode: true,
    transform(x, _, callback) {
      // transform to text
      callback(null, x.toString());
    }
  }),
  // compress
  zlib.createGzip()
]);
// log errors
chain.on('error', error => console.log(error));
// use the chain, and save the result to a file
dataSource.pipe(chain).pipe(fs.createWriteStream('output.txt.gz'));

Using asynchronous generators

const {chain} = require('stream-chain');

const family = chain([
  async function*(person) {
    yield person;
    // asynchronously retrieve parents
    if (person.father) {
      yield await getPersonFromDB(person.father);
    }
    if (person.mother) {
      yield await getPersonFromDB(person.mother);
    }
    // asynchronously retrieve children, if any
    for (let i = 0; i < person.children; ++i) {
      yield await getPersonFromDB(person.children[i]);
    }
  },
  new Transform({
    writableObjectMode: true,
    transform(x, _, callback) {
      // transform to text
      callback(null, JSON.stringify(x));
    }
  }),
  zlib.createGzip(),
  fs.createWriteStream('families.json-stream.gz')
]);

people.pipe(family);

Combined functions

A block of regular functions can be separated and included in an array. Functions in such block will be combined without using streams to improve the performance.

const lessEfficient = chain([
  x => x * x,
  x => 2 * x
  // ... more stages
]);

const moreEfficient = chain([
  [
    x => x * x,
    x => 2 * x
  ]
  // ... more stages
]);

Filter out values

Returning Chain.none terminates the pipeline. In the example below, it is used to filter out all even values.

const {chain, none, final} = require('stream-chain');

const pipeline = chain([
  [
    x => x % 2 ? x : none,
    x => x * x,
    x => 2 * x
  ]
  // ... more stages
]);

// input: 1, 2, 3, 4
// output: 2, 18

Return a final value prematurely

Wrapping value in final() terminates a pipeline and uses the value as the final result. The example below does not double odd values.

const {chain, none, final} = require('stream-chain');

const pipeline = chain([
  [
    x => x * x,
    x => x % 2 ? final(x) : x,
    x => 2 * x
  ]
  // ... more stages
]);

// input: 1, 2, 3, 4
// output: 1, 8, 9, 32

Slicing streams

Take

This example processes only 5 items from the beginning of a stream.

const take = require('stream-json/utils/take');

const pipeline = chain([
  take(5)
  // ... more stages
]);

Skip

This example skips 5 items from the beginning of a stream.

const skip = require('stream-json/utils/skip');

const pipeline = chain([
  skip(5)
  // ... more stages
]);

Take & skip together

This example skips 5 items from the beginning of a stream and takes the next 5.

const lessEfficient = chain([
  skip(5),
  take(5)
  // ... more stages
]);

const moreEfficient = chain([
  take({n: 5, skip: 5})
  // ... more stages
]);

Conditional take

Takes while a condition is true.

const takeWhile = require('stream-json/utils/takeWhile');

const pipeline = chain([
  takeWhile(item => item !== 'separator')
  // ... more stages
]);

Conditional skip

Skips while a condition is true.

const skipWhile = require('stream-json/utils/skipWhile');

const pipeline = chain([
  skipWhile(item => item !== 'separator')
  // ... more stages
]);

Conditional take & skip together

Processes data between first two separators.

const pipeline = chain([
  skipWhile(item => item !== 'separator'),
  skip(1), // skip the separator
  takeWhile(item => item !== 'separator')
  // ... more stages
]);
Clone this wiki locally