When I first started working with js-libp2p, I had never encountered pull-streams, the stream abstraction used throughout js-libp2p.
It took a bit to wrap my head around and I thought it might be worth writing up some thoughts & observations. Note that this will all hopefully be irrelevant one day, since the Awesome Async Iterator Endevor is underway!
First of all, the motivation for using pull-streams is right in the name. The stream works by consumers pulling data through the pipe, which signals the producer to send data across.
This is in contrast to node.js streams, which are push-driven, with producers stuffing the pipe as full as they can. With push streams, the consumer has to implement some kind of backpressure mechanism to avoid getting overwhelmed. For example, they might buffer incoming messages and drop them if the buffer gets full.
With pull streams, the consumer “pulls” data through by signaling to the producer that it’s ready to accept another round. The producer can then either generate the data on the fly or pull it from a buffer. Pull streams essentially move the need for backpressure into the producer, which may need to rate limit itself if the consumer doesn’t pull frequently enough to drain its buffer.
Basics
Here’s the usage example from the pull-streams README:
pull(
pull.values(['file1', 'file2', 'file3']),
pull.asyncMap(fs.stat),
pull.collect(function (err, array) {
console.log(array)
})
)
Here you have a pipeline defined using the pull
function, which composes a chain of sources, throughs, and sinks.
A source generates values and puts them on the stream. The pull.values
method takes an array of static values, but you can create sources for any kind of data producer.
A sink is a destination for your data, and it lives at the end of the pipeline. Here, pull.collect
will invoke the callback once, when the stream closes (or if it errors out). The callback will receive all the values in the stream as an array. Each pipeline must have one sink, and it’s the sink’s job to do the actual “pulling”, or requesting data from the source.
A through stream (or just “through”) is both a source and a sink, so it consumes data and can spit it out, often with some manipulation along the way. In the example above, the pull.asyncMap
function will pull in filenames, invoke fs.stat
, and send the result through to the next step (pull.collect
).
Duplex streams have both a source and a sink that are independent of each other. They’re used for things like network streams, where you want to both read from and write to a remote process.
pull-streams in libp2p
libp2p uses duplex streams to represent transport connections and multiplexed streams, so you encounter them wherever you need to send or receive from a remote peer.
Here’s an example pulled from one of the js transport examples:
node2.handle('/print', (protocol, conn) => {
pull(
conn,
pull.map((v) => v.toString()),
pull.log()
)
})
This is a handler function for the /print
protocol we’re defining. Here conn
is a duplex stream, and by putting it at the beginning of the chain, the pull
helper knows to use it as a source instead of a sink. We pull values from the connection, stringify using pull.map
, then log using the pull.log
helper sink function.
Later, another node dials this one and gets its own conn
, which it uses as a sink to send values to:
node1.dialProtocol(node2.peerInfo, '/print', (err, conn) => {
if (err) { throw err }
pull(pull.values(['Hello', ' ', 'p2p', ' ', 'world', '!']), conn)
})
It’s also possible to read from and write to the conn
object in the same pipeline, for example if you want a call and response type pattern.
This reads from the stream and sends back an uppercase version:
pull(
conn,
pull.map(v => v.toString().toUpperCase()),
conn
)
More about pull-streams
There’s a big list of helpful utility libraries at https://pull-stream.github.io/ - it’s worth skimming through to see what kind of operations have already been written.
Anyway, hopefully that’s helpful. If you have questions or helpful tips about using pull-streams, why not chime in?