How to flush write on a direct network.Stream connection?

Hi! I have a direct connection created with host.NewStream but if I write data to this stream, the remote peer does not immediately receive it. The stream internally buffers writes. Wrapping the stream into bufio.Reader and bufio.Writer like in examples does not have any effect, bufio.Flush() after a bufio.Write does not make a difference to the internal buffering. However, I know the data is written and can be received because when stream.Close() is called all the data is received at the remote peer. I cannot find a Flush() API anywhere, not in network.Stream not in host or connectionManager.

How do I flush the writes without closing the stream?

There is no internal buffering. However, the other side won’t see the stream until you actually try to write to it. Is that the issue you’re seeing?

If you have a minimal reproduction, I can try to figure out what’s going on.

Hi stebalien! Thanks for the reply! That’s weird. What you describe is not the issue (I’m aware of it from the examples). Unfortunately, I don’t have a minimal example but I can assure you that I’m writing directly to the stream delivered by host.NewStream using standard io.Read and io.Write without buffering on my side. Here is how I establish the connection:

// Connect to remote peer peer.ID with addresses in addrs and given timeout. Returns and error if the connection
// failed.
func (n *Net) Connect(remote peer.AddrInfo, cmdport bool, timeout int) (io.ReadWriteCloser, context.CancelFunc, error) {
	c, cancel := context.WithDeadline(n.ctx, time.Now().Add(time.Duration(timeout)*time.Millisecond))
	err := n.host.Connect(c, remote)
	stream, err := n.host.NewStream(n.ctx, remote.ID, protocol.ID(Protocol))
	rw := NewNetReaderWriter(stream)
	if cmdport {
		rw.Write([]byte("CMD "))
	} else {
		rw.Write([]byte("RAW "))
	}
	return rw, cancel, err
}

I use the following wrapper:

// NetReaderWriter is a wrapper for a libp2p network stream to an individual peer.
type NetReaderWriter struct {
	stream network.Stream
}

// NewNetReaderWriter creates a new ReadWriteCloser based on a ReadCloser and a WriteCloser.
func NewNetReaderWriter(stream network.Stream) *NetReaderWriter {
	return &NetReaderWriter{
		stream: stream,
	}
}

// Read like in io.Reader
func (rwc *NetReaderWriter) Read(p []byte) (int, error) {
	return rwc.stream.Read(p)
}

// Write like in io.Writer
func (rwc *NetReaderWriter) Write(p []byte) (int, error) {
	k, err := rwc.stream.Write(p)
	// <== flush here, but how???
	return k, err
}

// Close like in io.Closer - this closes both the reader and writer.
func (rwc *NetReaderWriter) Close() error {
	err := rwc.stream.Close()
	if err != nil {
		return fmt.Errorf("unable to close network stream: %v", err)
	}
	return nil
}

When I write to this stream returned by Connect, nothing is received. Only once I Close the stream with the above wrapper to network.Stream.Close() is all data received on the other end at once (as if it was sent with a single Write). Both peers are on the same machine but use different IDs and keys for testing.

Is it possible this depends on the transport used? I allow all kinds of connections in my network startup and believe libp2p by default chooses QUIC. Maybe I should try TCP instead?

The transport really shouldn’t matter, but you could try disabling QUIC to rule that out. There may be a small amount of write coalescing (delay of at most a few milliseconds) but the other side should see some data.

FYI, the context passed to NewStream controls stream establishment only, not reading/writing.