Understanding streaming iterables and application protocols

Seems I have had a bit of a brain fart when thinking about async iterators and what the sink function is actually expecting. (I think the docs could elaborate on this a bit more.)

The sink function is expecting an iterable and when this iterable signals that the iteration is complete the virtual stream closes. The sink does not expect to be called multiple times, this is why when you pass an array to the sink function it immediately closes and you cannot send any more data. Take this piece of code for example:

const stream = await server.dialProtocol(peerId, protocol);

await stream.sink([uint8ArrayFromString("first chunk")]));
await stream.sink([uint8ArrayFromString("second chunk")]));

The first sink call will iterate over the iterable you passed it (An array in this case.) and send each item until the iterable’s next function signals that it is done (When the entire array is iterated.). When the sink sees that it is done it will close the virtual stream and resolve the promise. Now that our first sink call is complete the next one does not do anything because the stream has been closed. Now that we know it wants an iterator we can pass it an iterator that does not signal done and therefore keeps the virtual stream open:

const itr = {
	[Symbol.asyncIterator]: () => ({
		next: async () => {
			await new Promise(resolve => setTimeout(resolve, 100));
			const data = uint8ArrayFromString("some data");
			return { done: false, value: data };
		}
	})
};

await stream.sink(itr);

This code will send data every 100ms without closing the stream. This code can be made cleaner by using a generator function instead of manually creating an iterable, for example:

async function* run() {
	for (let i = 0; i < 10; i++) {
		yield uint8ArrayFromString(`Iteration ${i}`);
		await new Promise(resolve => setTimeout(resolve, 100));
	}
}

await stream.sink(run());

This will send 10 packets of data 100ms apart then it will complete closing the stream. This can also be done on the handler side like so:

await node.handle(protocol, async ({ stream }) => {
	async function* respond() {
		for await (const data of stream.source) {
			const string = uint8ArrayToString(data.subarray());

			console.log(`Handler: ${string}`);

			yield uint8ArrayFromString(string.toUpperCase());
		}
	}

	await stream.sink(respond());
});

This will log the text that it receives and return it to the sender after converting it to uppercase. You can see here that we are getting the input data by iterating over the stream.source iterator - this can be done on the other side of the stream also.

Now that the stream itself can be understood, we can more easily understand how to use those iterable pipes. The iterable pipe takes an iterable source and passes it to an iterable sink. For example:

// Old version
// await stream.sink(respond());

// New version
await pipe(respond(), stream.sink);

Now the iterable pipe has some handy features that make this a bit more straight forward: if the first argument it a function it will call it and expect it to return an iterable and if any argument is a duplex streaming iterable (contains the sink and the source properties) it will use those properties, so the above example becomes:

// Old version
await pipe(respond(), stream.sink);

// New version
await pipe(respond, stream);

Now we can take this a step further by making the generator function more generic as a transform streaming iterable:

async function* toUpper(source) {
	for await (const data of source) {
		const string = uint8ArrayToString(data.subarray());

		yield uint8ArrayFromString(string.toUpperCase());
	}
}

Now that generic function can be used anywhere to transform the stream, for example:

await pipe(stream, toUpper, stream);

This will take incoming data from the stream, convert it to uppercase and send it back.

Another cool thing to note is that readable streams are also iterable! This means you can wrap the libp2p stream in a legacy stream:

const inputStream = new PassThrough();
const outputStream = new PassThrough();

// Don't await.
pipe(outputStream, stream, async source => {
	for await (const data of source) {
		inputStream.write(uint8ArrayToString(data.subarray()));
	}
}).catch(console.error);

// Handle the incoming data.
inputStream.on("data", data => console.log(data.toString()));

// Send some outgoing data.
outputStream.write("First data");
// Give time for the stream to flush.
await new Promise(resolve => setTimeout(resolve, 100));
outputStream.write("Second");
await new Promise(resolve => setTimeout(resolve, 100));
outputStream.write("Last");

As you can see these streaming iterables are pretty cool, I am hoping these examples will help anyone else who is struggling to understand how they work. For more reading check out the streaming iterables definition or the libraries made to work with it.

1 Like