I took the code example from it-pipe
here and updated it to this:
const result1 = await pipe(
// A source is just an iterable, this is shorthand for () => [1, 2, 3]
[uint8ArrayFromString("hey 1"), uint8ArrayFromString("hey 2"), uint8ArrayFromString("hey 3")],
stream,
async function collect (source) {
const vals = []
for await (const val of source) {
vals.push(uint8ArrayToString(val.subarray()))
}
return vals
}
)
console.log(result1)
The stream
in the above was created by calling:
const stream = await node1.dialProtocol(node2.getMultiaddrs()[0], '/echoo/1.0.0')
When I run the following code, it prints the following:
[ 'hey 1', 'hey 2', 'hey 3' ]
which is as expected.
Then I decide to run the code twice with the same stream, that is:
const result1 = await pipe(
// A source is just an iterable, this is shorthand for () => [1, 2, 3]
[uint8ArrayFromString("hey 1"), uint8ArrayFromString("hey 2"), uint8ArrayFromString("hey 3")],
stream,
async function collect (source) {
const vals = []
for await (const val of source) {
vals.push(uint8ArrayToString(val.subarray()))
}
return vals
}
)
console.log(result1)
const result2 = await pipe(
// A source is just an iterable, this is shorthand for () => [1, 2, 3]
[uint8ArrayFromString("hi 1"), uint8ArrayFromString("hi 2"), uint8ArrayFromString("hi 3")],
stream,
async function collect (source) {
const vals = []
for await (const val of source) {
vals.push(uint8ArrayToString(val.subarray()))
}
return vals
}
)
console.log(result2)
I get the following output:
[ 'hey 1', 'hey 2', 'hey 3' ]
[]
It seems the stream only worked once, but not on the second run.
Note that if I run the original code in the example twice (that is not using stream) then it behaves as expected. That is:
const result1 = await pipe(
// A source is just an iterable, this is shorthand for () => [1, 2, 3]
[1, 2, 3],
// A transform takes a source, and returns a source.
// This transform doubles each value asynchronously.
function transform (source) {
return (async function * () { // A generator is async iterable
for await (const val of source) yield val * 2
})()
},
// A sink, it takes a source and consumes it, optionally returning a value.
// This sink buffers up all the values from the source and returns them.
async function collect (source) {
const vals = []
for await (const val of source) {
vals.push(val)
}
return vals
}
)
console.log(result1)
const result2 = await pipe(
// A source is just an iterable, this is shorthand for () => [1, 2, 3]
[4, 5, 6],
// A transform takes a source, and returns a source.
// This transform doubles each value asynchronously.
function transform (source) {
return (async function * () { // A generator is async iterable
for await (const val of source) yield val * 2
})()
},
// A sink, it takes a source and consumes it, optionally returning a value.
// This sink buffers up all the values from the source and returns them.
async function collect (source) {
const vals = []
for await (const val of source) {
vals.push(val)
}
return vals
}
)
console.log(result2)
I rightly see this in the console:
[ 2, 4, 6 ]
[ 8, 10, 12 ]
Is there anything about the stream
that makes it unusable in this way?