Are Libp2p stream only usable once?

I took the code example from it-pipe here and updated it to this:

    const result1 = await pipe(
        // A source is just an iterable, this is shorthand for () => [1, 2, 3]
        [uint8ArrayFromString("hey 1"), uint8ArrayFromString("hey 2"), uint8ArrayFromString("hey 3")],
        stream,
        async function collect (source) {
            const vals = []
            for await (const val of source) {
                vals.push(uint8ArrayToString(val.subarray()))
            }
            return vals
        }
    )

    console.log(result1)

The stream in the above was created by calling:

const stream = await node1.dialProtocol(node2.getMultiaddrs()[0], '/echoo/1.0.0')

When I run the following code, it prints the following:

[ 'hey 1', 'hey 2', 'hey 3' ]

which is as expected.

Then I decide to run the code twice with the same stream, that is:

    const result1 = await pipe(
        // A source is just an iterable, this is shorthand for () => [1, 2, 3]
        [uint8ArrayFromString("hey 1"), uint8ArrayFromString("hey 2"), uint8ArrayFromString("hey 3")],
        stream,
        async function collect (source) {
            const vals = []
            for await (const val of source) {
                vals.push(uint8ArrayToString(val.subarray()))
            }
            return vals
        }
    )

    console.log(result1)

    const result2 = await pipe(
        // A source is just an iterable, this is shorthand for () => [1, 2, 3]
        [uint8ArrayFromString("hi 1"), uint8ArrayFromString("hi 2"), uint8ArrayFromString("hi 3")],
        stream,
        async function collect (source) {
            const vals = []
            for await (const val of source) {
                vals.push(uint8ArrayToString(val.subarray()))
            }
            return vals
        }
    )

    console.log(result2)

I get the following output:

[ 'hey 1', 'hey 2', 'hey 3' ]
[]

It seems the stream only worked once, but not on the second run.

Note that if I run the original code in the example twice (that is not using stream) then it behaves as expected. That is:

const result1 = await pipe(
        // A source is just an iterable, this is shorthand for () => [1, 2, 3]
        [1, 2, 3],
        // A transform takes a source, and returns a source.
        // This transform doubles each value asynchronously.
        function transform (source) {
            return (async function * () { // A generator is async iterable
                for await (const val of source) yield val * 2
            })()
        },
        // A sink, it takes a source and consumes it, optionally returning a value.
        // This sink buffers up all the values from the source and returns them.
        async function collect (source) {
            const vals = []
            for await (const val of source) {
                vals.push(val)
            }
            return vals
        }
    )

    console.log(result1)

    const result2 = await pipe(
        // A source is just an iterable, this is shorthand for () => [1, 2, 3]
        [4, 5, 6],
        // A transform takes a source, and returns a source.
        // This transform doubles each value asynchronously.
        function transform (source) {
            return (async function * () { // A generator is async iterable
                for await (const val of source) yield val * 2
            })()
        },
        // A sink, it takes a source and consumes it, optionally returning a value.
        // This sink buffers up all the values from the source and returns them.
        async function collect (source) {
            const vals = []
            for await (const val of source) {
                vals.push(val)
            }
            return vals
        }
    )

    console.log(result2)

I rightly see this in the console:

[ 2, 4, 6 ]
[ 8, 10, 12 ]

Is there anything about the stream that makes it unusable in this way?

These streams can take a little getting used to, I had a similar issue a when I first started using them. The stream closes when the iterable runs out of items, you probably want to turn it async. I made a bunch of notes to reference when answering my post: Understanding streaming iterables and application protocols - #2 by saul.

I also found myself using the same patterns on the streams so I created a module for it:

The source code may also help you.