pull-streams
are a simple streaming pattern.
You do not need to depend on a module to create a pull-stream,
you just use functions in a certain way.
pull-stream can do everything node streams can do, but they also propogate errors, and tidy themselves up after use. Node streams don't even do that!
This workshop will first take you through the basic internals of pull-streams, you'll implement the main patterns in just a few lines. The official pull-stream module uses a few more to optimize, but not much more.
Part 2 is a walk around the pull-stream ecosystem. We'll demonstrate some of the most useful
pull-stream modules, to recreate some familiar UNIX tools, such as ls
, wc
, grep
, tail -f
and less
Part 1 is probably more challenging, and because it needs to be exactly right, there are tests
to check your implementation is correct.
To run a test, run node verify {exercise_number} {your_script.js}
There are no tests provided by Part 2, but you should run them on your local file system and see what they do.
-
sink stream to console.log
-
source stream of an array
-
map stream
-
pull (combine pull-streams)
-
take N elements from infinite source
-
ls (pull-defer)
-
ls -l (..., pull-paramap
-
wc (stream-to-pull-stream, pull-split, pull.reduce)
-
grep (pull-file, pull.filter)
-
cat (pull-cat)
-
tail -f (pull-file)
-
less (pull-group)
the following section is
-
trees: pull-traverse (recursively read all node_modules folders and output the current version)
-
fan in: pull-many (take many streams and read from each of them in parallel, as fast as possible)
-
fan out: split one stream into many, and read as fast as the slowest stream.
-
multiplexing: create a stream that multiplexes multiple streams through a single duplex stream.
A pull-stream is just an async function that is repeatedly called, until it says "stop!".
Write a function that takes a callback,
and returns a function that is passed a read
function,
and then calls it with a callback cb(end, data)
and if end
is truthy then stop.
Otherwise,
print out data
with console.log(data)
,
and then read again.
(cb
must be in the second position.
The first argument is abort
but we will come back to that later.)
to get a pull stream, use helpers/random.js
you can start with this code:
module.exports = function (cb) {
return function (read) {
//your pull-stream reader...
read(null, function next (end, data) {
...
})
}
}
to test, run node verify 1 exercise01.js
Congratulations, you have just written a pull-stream sink!
a stream is like an array in time.
write a function that takes an array,
and returns an async function named read
that calls back each item in the array in turn.
module.exports = function (array) {
return function read (abort, cb) {
// read the next item
}
}
-
the
read
function must take two arguments.abort
andcb
. you can ignore abort for now, but we will come back to it. -
when there is no more data, callback
cb(true)
to indicate the end of the stream.
when all the items are read, cb(true) to indicate the end of the stream.
to run test node verify.js 2 exercise02.js
transform streams represent throughput.
a transform stream takes a source, and returns a sink.
implement a stream that takes a map function,
a sink stream is a function that takes a source stream and calls it.
function sink (read) {
...
}
a source stream is an async function to be called repeatedly.
function source (abort, cb) {
...
}
a through stream is a sink stream that returns a source stream.
function sink (read) {
return function source (abort, cb) {
...
}
}
Implement a through stream that takes a map function, and applies it to the source stream...
module.exports = function (map) {
return function (read) {
return function (abort, cb) {
...
}
}
}
a sink takes a source, you can just pass it directly.
sink(source)
and that is a valid way to connect a pull-stream.
a map returns a source too, so you can connect a pipeline like that.
sink(map(source))
but that reads right to left, and we are used to left to write.
implement a function, pull()
that takes streams, and passes one to another.
pull(source, map, sink)
module.exports = function pull () {
var args = [].slice.call(arguments)
...
}
to test, run node verify.js 4 exercise04.js
sometimes we don't want to read the entire stream.
when calling a source stream, the first argument is called abort
.
If that argument is true, it tells the stream to end.
write a through stream that reads N items, then calls read(true, cb)
instead, terminating the stream.
module.exports = function (n) {
return function (read) {
return function (abort, cb) {
//how many times called?
read(abort, cb)
}
}
}
to test node verify.js 5 exercise05.js
this allows us to read from infinite streams!
ls
lists a directory. given a directory, it outputs the files in that directory.
use node's fs module to read create a stream of filenames in a directory.
create a function that returns a pull-stream:
var pull = require('pull-stream')
pull(
LS(dir),
pull.log()
)
use fs.readdir
to read a directory (which takes a callback)
but we also want to return a stream immediately. How to do this?
one way is to use pull-defer
also see solution in solutions/06.js
the ls -l
option outputs extra information about files.
this comes from the fs.stat
system call.
we want to do lots of async calls to fs.stat
so lets do them in parallel.
How do we do that? well, the answer to any pull-stream question is: there is a module for that!
also see solution in solutions/07.js
the wc
command counts characters, words, and lines in an input stream.
split the input stream into lines, and then count them. (you'll need pull-split) output the total number of lines. (for extra points, also output characters and words)
to convert standard input into a pull-stream, use stream-to-pull-stream
var toPull = require('stream-to-pull-stream')
var pull = require('pull-stream')
pull(
toPull.source(process.stdin),
//your wc implementation
)
grep
filters lines that patch patterns inside streams.
take a string or regular expression as the first argument, read stdin and output lines that match the pattern.
SOURCE | node 09.js PATTERN
where SOURCE
is a stream, i.e. cat {file}
or the output of another streaming program.
also see solution is solutions/09.js
the cat
command takes multiple input streams and concatenates them into a single stream.
cat foo.txt bar.js baz.html
should concatenate those files. read each input with pull-file,
and output a stream that is all of each stream in sequence.
hint: there is a pull-stream module that implements cat, but for a learning challenge, you may implement it yourself!
with tail -f
you can appends to an open file as they come.
> tail -f log
...
then in another terminal:
> date >> log
> date >> log
> date >> log
and see the date show up in the terminal in which tail -f log
is running in.
implement tail -f
a pull-stream using pull-file
more
is a "pager". taking a long input file, it displays one page worth at a time,
and no more until the next user input. so a human is able to read it.
more
is a way to get less data, it provides "back pressure".
create a pull-stream that reads standard input, and outputs one page and no more, until the user presses a key (hint: read data on process.stdin)
pull(
File(process.argv[2]),
//Read a N line page and output it when user reads more.
Page(40),
toPull.sink(process.stdout)
)
take your code for exercise 7, and extend it to read directories recursively. if a file is a node_modules directory, it should expand it, and stream inside of it.
there are different ways you can traverse a tree, choose the way that seems most appropriate!
see also pull-traverse and pull-glob
take an array of pull-streams, and read them into one stream. read the streams as fast as possible, but you must still respect back pressure. if the sink stops, you must wait, but if they are reading, give them the fastest stream that responds.
also see pull-many
the opposite of exercise 14, make one stream that expands out.
write an essay about what makes this more difficult to implement, and are there situations where you genuinely need this?
should a fan out stream go as fast as the slowest stream, or as fast as the fastest stream?
If you read as the fastest stream, what happens to the data the slower stream was waiting for?
pull-stream all the things! pull-streams are great! make all your apis pull-streams. but does that mean you need one connection per pull-stream? no, use multiplexing!
muxrpc allows you to expose many pull-stream apis over one tcp or web socket connection.
but there is one problem, to my shame I got stumped when I tried to implement correct back pressure on it.
Solve the problem of multiplexed back pressure and make a pull request to muxrpc.
if you complete this exercise, you get an instant A, and are not required to complete the other tasks
MIT