Skip to content

chain()

Eugene Lazutkin edited this page Nov 23, 2022 · 7 revisions

chain(), which is the default export of the stream-chain package, is a factory function that returns a stream based on Duplex. It chains its dependents in a single pipeline optionally binding error events. It accepts an array of streams of functions and combines them together efficiently. The result can be used as a regular stream.

Many details about this package can be discovered by looking at test files located in tests/ and in the source code (main.js).

chain(fns[, options])

The function accepts the following arguments:

  • fns is an array. Its items can be one of:
    • A function. Allowed: regular functions, asynchronous functions, generator functions, asynchronous generator functions.
    • Transform stream.
    • Duplex stream.
    • The very first stream can be Readable.
    • The very last stream can be Writable.
    • An array of functions, streams, or other arrays. Such arrays will be flattened and their elements are included verbatim.
    • All falsy values are simply ignored.
    • Notes on how different values are handled can be found below.
  • options is an optional object detailed in the Node's documentation.
    • If options is not specified, or falsy, it is assumed to be an empty object.
    • If it doesn't specify writableObjectMode it is assumed to be true.
    • If it doesn't specify readableObjectMode it is assumed to be true.
    • Always make sure that writableObjectMode is the same as the corresponding object mode of the first stream, and readableObjectMode is the same as the corresponding object mode of the last stream.
      • Eventually both these modes can be deduced, but Node does not define the standard way to determine it, so currently it cannot be done reliably.
    • Additionally the following custom properties are recognized:
      • noGrouping is an optional boolean flag. If it is falsy (the default), all subsequent functions are going to be grouped together using chain.gen() function. The grouping of functions usually produces faster pipelines. Otherwise, every function will be wrapped as separate steam.
      • skipEvents is an optional boolean flag. If it is falsy (the default), 'error' events from all streams are forwarded to the created instance. If it is truthy, no event forwarding is made. A user can always do so externally or in a constructor of derived classes.

A resulting instance can be used to attach handlers for stream events.

const pipeline = new chain([x => x * x, x => [x - 1, x, x + 1]]);
pipeline.on('error', error => console.error(error));
dataSource.pipe(pipeline);

Notes on how different fns values are handled

Functions

Values returned by a regular function can be interpreted differently. If a function throws an exception, it will be caught and passed to a callback generating a stream error.

Grouped functions

Functions can be grouped directly or indirectly (see options.noGrouping above) using gen(). This function produces an asynchronous generator function, which can consume values and produce multiple (or no) results.

Regular functions

To be continued from this point...

Returned value: null or undefined or chain.none

It means "no value was produced", which effectively terminates the processing of the current value.

Returned value: array

Deprecated since 2.1.0 — use many() instead.

It is assumed that a function has produced several values (0 or more). All of them will be pushed to a stream.

The array cannot have any special values.

How to return an array and pass it as a single value? Just wrap it into an array:

x => [[x, x + 1], []]

Returned value: Promise

It can be a Promise or "thenable" (an object with a property then()). The processing will be delayed until the promise resolves or fails.

Restrictions on returned values:

  • It cannot return a generator object.

This case covers asynchronous functions.

Returned value: generator

It can be a generator or an object with a property named next(). It will be iterated according to the generator protocol. The results should be regular values.

Restrictions on returned values:

  • It cannot return a generator object.

This case covers generator functions.

next() can return a Promise according to the asynchronous generator protocol.

This case covers asynchronous generator functions.

Returned value: Chain.Many

Chain.Many is produced by Chain.many(). It is a wrap for an array holding multiple values (0 or more). It cannot contain any special values. All values will be pushed to a stream.

Returned value: anything else

Any other value is passed to a stream unchanged.

Asynchronous function

Chain will wait until an asynchronous function is finished or throws an exception.

Restrictions on returned values:

  • It cannot return a generator object.

Generator function

Chain will iterate it until exhausted. Asynchronous generator functions are allowed.

Restrictions on returned values:

  • It cannot return a generator object.

Array

Since 2.1.0

It is expected to be an array of regular functions (no asynchronous functions, no generators — if you want to combine those too use comp()). They can return any values, but the return of a final function is a subject to interpretation as a solitary regular function (see above). The final value cannot be a Promise nor a generator object — it will be passed to a stream as is.

Returned value: Chain.none

If a function returns Chain.none it terminates a pipeline and no output value is produced.

x => Chain.none

Returned value: Chain.Final

Chain.Final is produced by Chain.final(). It terminates a pipeline (the array of functions) and returns a wrapped value as a final result.

x => Chain.final(x)

Returned value: anything else

Any other value is passed unchanged.

Properties

The following public properties are available: streams, input, output.

streams

streams is an array of streams created by the constructor. Its values either Transform streams that use corresponding functions from a constructor parameter, or user-provided streams. All streams are already piped sequentially starting from the beginning.

This array is provided mainly to attach event handlers to individual components.

input

input is the beginning of the pipeline. Effectively it is the first item of streams.

output

output is the end of the pipeline. Effectively it is the last item of streams.

Examples

Generally, a Chain instance should be used to represent a chain:

const chain = new Chain([
  x => x * x,
  x => [x - 1, x, x + 1],
  new Transform({
    writableObjectMode: true,
    transform(chunk, _, callback) {
      callback(null, chunk.toString());
    }
  })
]);
dataSource
  .pipe(chain);
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('output.txt.gz'));

But in some cases input and output provide better control over how a data processing pipeline should be organized:

chain.output
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('output.txt.gz'));
dataSource.pipe(chain.input);

Please select what style you want to use, and never mix them together with the same object!

Static properties

The following static properties are available:

  • chain(fns[, options),
  • (since 2.2.0) make(fns[, options),
  • (since 2.2.0) convertToTransform(fn),
  • (since 2.2.0) sanitize(value, stream)

All of them are documented below.

Additionally, the following properties are imported from defs module:

  • (since 2.2.0) none,
  • (since 2.1.0) final(value),
  • (since 2.2.0) isFinal(value),
  • (since 2.2.0) getFinalValue(value),
  • (since 2.1.0) many(array).
  • (since 2.2.0) isMany(value),
  • (since 2.2.0) getManyValues(value)

While the module itself was added in 2.2.0, final() and many() were defined in Chain directly. Presently all these properties are documented in defs.

chain(fns[, options)

chain(fns[, options) is a helper factory function, which has the same arguments as the constructor and returns a Chain instance.

const {chain} = require('stream-chain');

// simple
dataSource
  .pipe(chain([x => x * x, x => [x - 1, x, x + 1]]));

// all inclusive
chain([
  dataSource,
  x => x * x,
  x => [x - 1, x, x + 1],
  zlib.createGzip(),
  fs.createWriteStream('output.txt.gz')
])

chain.Constructor

(since 2.2.0) It is an alias to Chain. Useful for metaprogramming.

make(fns[, options)

(since 2.2.0) make() is an alias of chain(fns[, options). Useful for metaprogramming.

convertToTransform(fn)

(since 2.2.0) It is a helper function to wrap a function or an array of functions in a Transform stream.

The returned value is a Transform or null for empty arrays and no-function objects.

const {convertToTransform} = require('stream-chain');

const getValueStream = convertToTransform(x => x.value);
const mathStream = convertToTransform([
  x => x * x,
  x => 2 * x + 1
]);

pipeline.pipe(getValueStream).pipe(mathStream).pipe(output);

sanitize(value, stream)

(since 2.2.0) It is a procedure, which puts a value into a stream using the standard API: push(). It extracts final and multiple values and pushes them to a stream skipping undefined, null, and Chain.none values. This way sanitize() can push to the stream from 0 to many values.

Presently it interprets arrays as a wrapper for multiple values (the deprecated functionality).

Node streams assign a special meaning to undefined and null, so these values cannot possibly go the streaming infrastructure and cannot be used unwrapped. Yet they are perfectly legal inside comp() pipes.

Clone this wiki locally