Using pull-streams in js-libp2p

When I first started working with js-libp2p, I had never encountered pull-streams, the stream abstraction used throughout js-libp2p.

It took a bit to wrap my head around and I thought it might be worth writing up some thoughts & observations. Note that this will all hopefully be irrelevant one day, since the Awesome Async Iterator Endevor is underway!

First of all, the motivation for using pull-streams is right in the name. The stream works by consumers pulling data through the pipe, which signals the producer to send data across.

This is in contrast to node.js streams, which are push-driven, with producers stuffing the pipe as full as they can. With push streams, the consumer has to implement some kind of backpressure mechanism to avoid getting overwhelmed. For example, they might buffer incoming messages and drop them if the buffer gets full.

With pull streams, the consumer “pulls” data through by signaling to the producer that it’s ready to accept another round. The producer can then either generate the data on the fly or pull it from a buffer. Pull streams essentially move the need for backpressure into the producer, which may need to rate limit itself if the consumer doesn’t pull frequently enough to drain its buffer.

Basics

Here’s the usage example from the pull-streams README:

pull(
  pull.values(['file1', 'file2', 'file3']),
  pull.asyncMap(fs.stat),
  pull.collect(function (err, array) {
    console.log(array)
  })
)

Here you have a pipeline defined using the pull function, which composes a chain of sources, throughs, and sinks.

A source generates values and puts them on the stream. The pull.values method takes an array of static values, but you can create sources for any kind of data producer.

A sink is a destination for your data, and it lives at the end of the pipeline. Here, pull.collectwill invoke the callback once, when the stream closes (or if it errors out). The callback will receive all the values in the stream as an array. Each pipeline must have one sink, and it’s the sink’s job to do the actual “pulling”, or requesting data from the source.

A through stream (or just “through”) is both a source and a sink, so it consumes data and can spit it out, often with some manipulation along the way. In the example above, the pull.asyncMap function will pull in filenames, invoke fs.stat, and send the result through to the next step (pull.collect).

Duplex streams have both a source and a sink that are independent of each other. They’re used for things like network streams, where you want to both read from and write to a remote process.

pull-streams in libp2p

libp2p uses duplex streams to represent transport connections and multiplexed streams, so you encounter them wherever you need to send or receive from a remote peer.

Here’s an example pulled from one of the js transport examples:

  node2.handle('/print', (protocol, conn) => {
    pull(
      conn,
      pull.map((v) => v.toString()),
      pull.log()
    )
  })

This is a handler function for the /print protocol we’re defining. Here conn is a duplex stream, and by putting it at the beginning of the chain, the pull helper knows to use it as a source instead of a sink. We pull values from the connection, stringify using pull.map, then log using the pull.log helper sink function.

Later, another node dials this one and gets its own conn, which it uses as a sink to send values to:

node1.dialProtocol(node2.peerInfo, '/print', (err, conn) => {
  if (err) { throw err }

  pull(pull.values(['Hello', ' ', 'p2p', ' ', 'world', '!']), conn)
})

It’s also possible to read from and write to the conn object in the same pipeline, for example if you want a call and response type pattern.

This reads from the stream and sends back an uppercase version:

pull(
  conn,
  pull.map(v => v.toString().toUpperCase()),
  conn
)

More about pull-streams

There’s a big list of helpful utility libraries at https://pull-stream.github.io/ - it’s worth skimming through to see what kind of operations have already been written.

Anyway, hopefully that’s helpful. If you have questions or helpful tips about using pull-streams, why not chime in?

4 Likes

Thanks for a really great introduction!

It’s also possible to read from and write to the conn object in the same pipeline, for example if you want a call and response type pattern.

What’s the best way to handle the case where you want to respond to some messages but not others? And as a bonus, the function which determines that has to call async functions.

For this it’s probably easiest to create your own sink and source functions. pull-pushable is nice for selectively pushing things onto the stream. Something like the following would allow you to process incoming data async and then potentially forward (or respond) to selective messages.

function maybeForward (writer) {
  // return the sink reader
  return function (read) {
    read(null, function next(end, data) {
      if(end === true) return
      if(end) throw end

      // Do your async check on the data (request)
      shouldForward(data, (err, yes) => {
        // selectively push to the source (response)
        if (yes) writer.push(data)
        read(null, next)
      })
    })
  }
}

const pushable = require('pull-pushable')
const writer = pushable((err) => console.log(err))
let myResponder = {
  sink: maybeForward(writer),
  source: writer
}

pull(
  connection,
  myResponder
  connection
)

It turns out that you can do this with pull-stream’s asyncMap as well, which I’d found at the time of my question but was using it wrong. That approach looks something like:

pull(
  connection,
  pull.asyncMap(async (data, cb) => {
    // do some processing
    if (shouldRespond(data)) {
      cb(null, await getResponse(data));
    } else {
      // do other stuff, just don't invoke the callback (cb)
      // if cb is never called it just won't send anything back
  }),
  connection
);

You need to call the callback in asyncMap otherwise it will block and abort. It marks itself as busy until the callback is called.

You could use pull.flatten and callback with undefined to avoid the side effects of not calling the callback and avoid passing undefined or null values

pull(
  connection,
  pull.asyncMap(async (data, cb) => {
    // do some processing
    if (shouldRespond(data)) {
      cb(null, await getResponse(data));
    } else {
      cb(null);
  }),
  pull.flatten(),
  connection
);
1 Like

Oh, interesting, thanks for saving me from that fate.

Is there a way to unblock it without sending back an empty response? I’ve been seeing that in the case where nothing is sent back deliberately.

pull.flatten() will remove the empty responses. If there are instances where you want to send an empty response then you’d need to change the behavior of this.

An alternative: I took the pull.filter code and made a basic transformAsync function which returns transformed data, or skips if the result of transformer == null. You could change the check based on your needs, in case you actually want to respond with an empty object in some instances.

function transformAsync (transformer) {
  return function (read) {
    return function next (end, cb) {
      var sync, loop = true
      while(loop) {
        loop = false
        sync = true
        read(end, async function (end, data) {
          var result = await transformer(data)
          if(!end && !result)
            return sync ? loop = true : next(end, cb)
          cb(end, result)
        })
        sync = false
      }
    }
  }
}

pull(
  connection,
  // getResponse is async
  transformAsync(getResponse),
  connection
)

I think I must be missing something, or doing something wrong on the other side of the connection then, because I don’t want to send empty responses but I’m seeing them anyways. Here’s my setup:

// for sending the initial message
node.libp2p.dialProtocol(addr, '/gravity/0.0.1', (err, conn) => {
  pull(pull.values([message]), conn, pull.collect((err2, data) => {
    console.log(`got response: ${data.toString()}`);
  }));
});
// the handler for receiving messages
node.libp2p.handle('/gravity/0.0.1', (protocolName, connection) => {
  pull(
    connection,
    pull.asyncMap(async (data, cb) => {
      console.log('received:', data.toString());

      if (oneTest) {
        // send response
        return cb(null, 'whateverResponse');
      }
      // otherwise don't send anything
      return cb(null);
    }),
    pull.flatten(),
    connection,
  );
});

(I know you don’t need to return the callback, I’m just doing it for control flow.)

I have this open in two different browsers, and it works great in the deliberate response case. However, in the case where I don’t want a response (cb(null)), I send the message from A, see the console.log on B, but then I see got response: (that’s an empty buffer) on A at the end. I assume it’s actually sending some empty ACK over the network or something, but I’d like to avoid that unnecessary traffic if possible because there are going to be a lot of them.

Okay, I think I know what’s going on. You’re only sending 1 message across, correct? If that’s the case, you will get an empty response because the listener is acknowledging the end of the stream. pull.values will send values and then close the writer, and each subsequent source will also close, which propagates the close back to the dialer.

If you send multiple messages via pull.values you shouldn’t get the empty packet, as it will accompany the final value (if there is a response).

Ah, I understand. Thank you. I think that’s exactly what’s happening.