Skip to content

chain()

Eugene Lazutkin edited this page Aug 24, 2024 · 10 revisions

chain(), which is the default export of the stream-chain package, is a factory function that returns a stream based on Duplex. It chains its dependents in a single pipeline optionally binding error events. It accepts an array of streams of functions and combines them together efficiently. The result can be used as a regular stream.

Many details about this package can be discovered by looking at test files located in tests/ and in the source code (index.js).

chain(fns[, options])

The function accepts the following arguments:

  • fns is an array. Its items can be one of:
    • A function. Allowed: regular functions, asynchronous functions, generator functions, asynchronous generator functions.
    • Transform stream.
    • Duplex stream.
    • The very first stream can be Readable.
    • The very last stream can be Writable.
    • An array of functions, streams, or other arrays. Such arrays will be flattened and their elements are included verbatim.
    • All falsy values are simply ignored.
    • (Since 3.1.0) A web stream object. It is adapted to a corresponding Node stream:
      • ReadableStreamReadable (for the very first stream).
      • WritableStreamWritable (for the very last stream).
      • {readable, writable} pair ⇒ Duplex.
      • Notes:
        • As of 8/24/2024 Node's support for web streams is still mostly experimental.
        • The fromWeb() functions are used to adapt web streams to Node streams with the {objectMode: true} option. If you want to specify something different, you can use the fromWeb() functions directly.
    • Notes on how different values are handled can be found below.
  • options is an optional object detailed in the Node's documentation.
    • If options is not specified, or falsy, it is assumed to be an empty object.
    • If it doesn't specify writableObjectMode it is assumed to be true.
    • If it doesn't specify readableObjectMode it is assumed to be true.
    • Always make sure that writableObjectMode is the same as the corresponding object mode of the first stream, and readableObjectMode is the same as the corresponding object mode of the last stream.
      • Eventually both these modes can be deduced, but Node does not define the standard way to determine it, so currently it cannot be done reliably.
    • Additionally the following custom properties are recognized:
      • noGrouping is an optional boolean flag. If it is falsy (the default), all subsequent functions are going to be grouped together using chain.gen() function. The grouping of functions usually produces faster pipelines. Otherwise, every function will be wrapped as a separate steam.
      • skipEvents is an optional boolean flag. If it is falsy (the default), 'error' events from all streams are forwarded to the created instance. If it is truthy, no event forwarding is made. A user can always do so externally or in a constructor of derived classes.

A resulting instance can be used to attach handlers for stream events.

import chain from 'stream-chain';
// const {chain} = require('stream-chain');

const pipeline = chain([x => x * x, x => [x - 1, x, x + 1]]);
pipeline.on('error', error => console.error(error));
dataSource.pipe(pipeline);

Notes on how different fns values are handled

Streams

Streams are used as is.

(Since 3.1.0) A web stream object can be used as a regular Node stream. It will be adapted automatically to a corresponding Node stream with the {objectMode: true} option.

Grouped functions

Functions can be grouped directly or indirectly (see options.noGrouping above) using gen(). This function produces an asynchronous generator function, which can consume values and produce multiple (or no) results.

chain() can extract grouped functions and re-group them for efficiency. Users can create custom pipelines and do not pay the price of the grouping.

Functions

Functions are called in the order they are passed to chain() with two arguments: chunk and encoding (see Node's documentation). The result of the function call is passed to the next function in the chain.

Values returned by a regular function can be interpreted differently. If a function throws an exception, it will be caught and passed to a callback generating a stream error.

Returned value: null or undefined or chain.none

It means "no value was produced", which effectively terminates the processing of the current value. Nothing will be passed to the next function in the chain.

Returned value: chain.stop

This value means that nothing will be passed to the next function in the chain and the iterations will be stopped. It is usually used to terminate potentially infinite loops.

Important: chain.stop works only within function chain segments created by gen() or fun(). The native streams do not support this feature treating it as chain.none.

Returned value: Promise

Asynchronous functions can return a Promise or "thenable" (an object with a property then()). The processing will be delayed until the promise resolves or fails.

Returned value: generator

It can be a generator or an object with a property named next(). It will be iterated according to the generator protocol.

This case covers generator functions.

next() can return a Promise according to the asynchronous generator protocol.

This case covers asynchronous generator functions.

Returned value: chain.many()

chain.many() is a wrapper for an array holding multiple values (0 or more). It cannot contain any special values. All values will be pushed to a stream sequentially.

Returned value: chain.finalValue()

chain.finalValue() is a wrapper for a single value. This value is not passed to the next function in the chain. Instead it will be used as a final value of the chain.

Important: chain.finalValue() works only within function chain segments created by gen() or fun(). The native streams do not support this feature treating its payload as a regular value.

Returned value: anything else

Any other value is passed to the next function in the chain unchanged.

Properties

The following public properties are available: streams, input, output.

streams

streams is an array of streams created by the constructor. Its values either Transform streams that use corresponding functions from a constructor parameter, or user-provided streams. All streams are already piped sequentially starting from the beginning.

This array is provided mainly to attach event handlers to individual components.

input

input is the beginning of the pipeline. Effectively it is the first item of streams.

output

output is the end of the pipeline. Effectively it is the last item of streams.

Examples

Generally, a Chain instance should be used to represent a chain:

const pipeline = chain([
  x => x * x,
  x => [x - 1, x, x + 1],
  new Transform({
    writableObjectMode: true,
    transform(chunk, _, callback) {
      callback(null, chunk.toString());
    }
  })
]);
dataSource
  .pipe(pipeline)
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('output.txt.gz'));

But in some cases input and output provide better control over how a data processing pipeline should be organized:

pipeline.output
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('output.txt.gz'));
dataSource.pipe(pipeline.input);

Please select what style you want to use, and never mix them together with the same object!

Static properties

Imports

For convenience of users, the module defines static properties on chain imported from other modules:

  • gen(...fns) from gen module.
  • asStream(fn) from asStream module.
  • Multiple imports from defs module:
    • Symbols: none, stop, finalSymbol, manySymbol, flushSymbol, fListSymbol.
    • Errors: Stop.
    • Makers of return values: finalValue(value), many(values).
    • Testers: isFinalValue(value), isMany(value), isFlushable(value), isFunctionList(value).
    • Getters: getFinalValue(value), getManyValues(value), getFunctionList(value).
    • Setters: flushable(value, final), setFunctionList(value, fns).

See corresponding modules for details.

chain.dataSource(fn)

This is a helper function. It takes a function or an iterable object and returns a function:

  • If it is a function, it returns the function.
  • If it is an asynchronously iterable object, it returns a function that returns an asynchronous iterator.
  • If it is an iterable object, it returns a function that returns an iterator.

The returned iterator function is bound to its object.

Clone this wiki locally