Cross posting this question from twitter, https://twitter.com/rumkin/status/1256547989096730624.
What’s the way to determine all the chunks have been sent and connection could be closed in LibP2P?
Cross posting this question from twitter, https://twitter.com/rumkin/status/1256547989096730624.
What’s the way to determine all the chunks have been sent and connection could be closed in LibP2P?
Typically what you will want to do is to set up some kind of ACK on the receiver. This ensures the receiver has an opportunity to tell you they’ve received the data.
const { stream } = await connection.newStream('/proto/v1')
const result = await pipe(
['hi there', 'a second message'],
stream,
async (source) => {
const reply = []
for await (const chunk of source) {
reply.push(chunk.toString())
}
return reply
}
)
Once the pipe
function returns, the stream will be closed because we have finished writing and reading to it.
If you only ever write, await pipe(['message'], stream)
, there’s no guarantee the receiving peer has had a chance to process the data. This also leads the read stream open so you still need to close it.
While you can just send and close the stream, this is probably not what you want. Even just having the receiver send an empty response would allow you to wait for the read.
Thanks for starting this topic.
When I’m trying this the second function never gets called and pipe returns an empty array.
OS: MacOS 10.15.4
Node: v14.0.0
{
"it-pipe": "^1.1.0",
"libp2p": "^0.27.7",
"libp2p-mplex": "^0.9.5",
"libp2p-secio": "^0.12.4",
"libp2p-websockets": "^0.13.6",
"peer-id": "^0.13.12",
"peer-info": "^0.17.5"
}
This is the gist with the code. Now I’ve checked it’s better and I’m never see result in the client.js after the pipe call (client.js:54). Also there is no chunks produced by the source in the last pipe callback (client.js:48).
I believe the problem is in the server handler code. It’s not currently responding to anything, it’s just consuming the data.
async function handleChatStream(stream) {
// Read in the chunks from the streams source
for await (const chunk of stream.source) {
console.log(chunk.toString())
}
// Send an empty ACK
await stream.sink([])
console.log('Stream is closed')
}
or with pipe:
async function handleChatStream(stream) {
await pipe(
stream,
source => (async function * () {
// Read in all messages
for await (const chunk of source) {
// Process messages here
console.log(chunk.toString())
}
// Then respond with an ACK
return []
})(),
stream
)
console.log('Server side stream is closed')
}
It worked. But this makes server to respond with something. What I actually don’t need. I just need to send data and close the connection as soon as it’s been sent. What I actually need is a way to push data into a stream, like so:
stream.write('Hello')
stream.write('World')
stream.end()
Is there any way to do so without waiting for response from a server?
Maybe it would help. My goals are:
I’d recommend just having the server immediately close the write side of the stream. You could reset the stream after you send the data, but I wouldn’t recommend that as you might get some unexpected behavior (streams closing before data is fully sent).
This would allow the receiver node to just read, and when the client is done the stream will be closed.
await pipe(
['message1', 'message2'], // use `it-pushable` or another async generator to push messages
stream,
async (source) => {. // This is a basic drain function that ignores responses
for await (const _ of source) {}
}
)
// Read only server handler
function handleMessageStream(stream) {
// Immediately close the write side of the stream and just read
await pipe([], stream, async source => {
for await (const chunk of source) {
console.log(chunk.toString())
}
})
console.log('All messages handled, stream is closed')
}
I dug into the libp2p-mplex implementation and figured out how to solve it without writing anything on the server end. This method requires to wait until all the current event-loop frame job is finished:
async function handleMessageStream(stream) {
await stream.sink(['Hello'])
// Hold `await` resolution to the last event-loop frame.
// It's also could be replaced with setTimeout in a browser.
.then(() => new Promise((resolve) => process.nextTick(resolve))
stream.close()
}
This seems working solution for me. Am I missing something there?
The main issue with this is that your server is now reliant on clients to close its writable stream. Each node should close its own writable when it is done writing or wont write at all.
FYI, stream.close
is going to be patched in the future as it doesn’t work as it should, https://github.com/libp2p/js-libp2p-mplex/issues/110, which will cause your solution to no longer work. However, if you choose to stick with this solution, you will be able to use stream.source.end()
(close the read side of the stream) instead of stream.close()
The main issue with this is that your server is now reliant on clients to close its writable stream. Each node should close its own writable when it is done writing or wont write at all.
Yes, I’m about to solve it. In my future architecture when the read end is closed, node should be able to send all responses to the write end and close the connection after it.
However, if you choose to stick with this solution, you will be able to use
stream.source.end()
(close the read side of the stream) instead ofstream.close()
Closing the write side was my goal initially. As I see what you talking about is totally fine with it.
Thanks for the answer.