Stream always reads a maximum of 4049 bytes

Hi,
I made a simple program in which I exchange a simple json string between two peers. That string can be a little long, for example 5000 chars. I read the stream with:

read := make([]byte, 9999999) 
lenRead, err := rw.Read(read)
fmt.Println(DEBUG, lenRead)
read = read[:nRead]

So I should have in read the entire json string that I’m interested to read. The problem is that at line 3, in the debug print, it always says that it reads 4049 bytes (if the message is longer, if not it read the exact amount of chars). How can I avoid this? Is it a limit by libp2p or is there a way to make him read the full stream?

Also, I would like to know if there is a way to read the whole stream, without using the slices like I’m using now (for example something like the ReadString, but I don’t have a delimiter). Maybe the problems are correlated, I don’t know.

I would appreciate your help, specially for the first question. Thanks

EDIT: I see here a maximum message size, but it’s set to 4 MB and 5000 chars are a lot less than that

As long as you don’t get err == io.EOF from Read, you should keep reading. Please read io - The Go Programming Language. Also see io - The Go Programming Language.

I wrote this now:

read := make([]byte, 9999999) 
nRead := 0
for {
	len, err := rw.Read(read)
	fmt.Println(DEBUG, len)
	nRead+=len
	if err != nil {
		if err == io.EOF {
			break
		} else {
			log.Println("- Error reading from the stream:", err)
			return
		}
	}
}
fmt.Println(DEBUG, nRead)
read = read[:nRead]

The problem is that now it continues on reading because it never reads an EOF after the json string. It doesn’t even print the second line in the for cycle and I don’t know why.

EDIT: I don’t use io, I use bufio
EDIT2: When I close the stream from the other peer (closing it) it prints several debug messages one after the other and several “Error reading from stream”

This is still wrong.

The main goal of this API is to avoid allocating lots of new objects when calling read.
So what happen is that whoever is responding to your Read call will write at the start of the buffer, overwriting old data that could have been there.

The goal of the Read API is to create ciphers algorithm (for example, AES works on blocks of 16 bytes).
A cipher algorithm work by eating X bytes at a time, then spitting it out. What is good is that in memory if you need let’s say 4096 bytes long blocks, well you can allocate once 4096 bytes and overwrite them reusing the same buffer each time (and not allocating anything new).

So the issue you have currently is that Read will just overwrite all the data starting from the start of the buffer, to fix that you need to send a buffer into Read starting where you would like Read to continue:

read := make([]byte, 9999999) 
nRead := 0
for {
	len, err := rw.Read(read[nRead:]) // See here
	fmt.Println(DEBUG, len)
	nRead+=len
	if err != nil {
		if err == io.EOF {
			break
		} else {
			log.Println("- Error reading from the stream:", err)
			return
		}
	}
}
fmt.Println(DEBUG, nRead)
read = read[:nRead]

Then I have other issues with this code, mainly that it is very memory wastefull.
For information in go when you create a subslice for a slice they both share the same underlying storage array (this is exploited in my modification step, because all modification to the subslice done in read will impact the read buffer but at an offset).

But that also means that in the last step read = read[:nRead] well even if read is now 1% of what it was previously, go isn’t able to truncate it and thus keep the whole ~10Mb buffer in memory until you stop using read completely.
To do so add a copy step :

{ // Scoping to avoid keeping nBuf
  nBuf := make([]byte, nRead) // Make a new buffer which fits the data perfectly
  copy(nBuf, read) // Copy the read buffer into the nBuf one, no need to slice read, if both buffers aren't the same length, copy takes the smallest of both
  read = nBuf
}
// At this point the old 10Mb buffer will be fread upon the next GC

But even with this, this code can’t handle with buffers longer than 9999999 bytes and is not very optimised.

The probably best way you want to do this

If you want that for parsing it into json.
Well just know that json has a cipher implementation, you can use it that way :

import "encoding/json"

// ...
// Later in your code
decoder := json.NewDecoder(rw) // Create a cipher json decoder
var result ObjectTypeToDecodeInto
decoder.Decode(&result)
// If you want to stream multiple json objects you can also do it perfectly (on the server side, just append them side by side) :
var otherResult OtherObjectTypeToDecodeInto
decoder.Decode(&otherResult)

If you just want the complete bytes, then use io.ReadAll, it handle with the bigger edge cases and has been optimised to not eat 10Mb of memory while working :

import "io" // Use "io/ioutil" if you are using go < 1.16 (go 1.15.x and older)

// ...
read, err := io.ReadAll(rw) // Read all there is to read
if err != nil {
  // No need to check for io.EOF, ReadAll returns nil upon success
  log.Println("- Error reading from the stream:", err)
  return
}
// Do whatever you want with read

But if you can, you probably don’t want to do this, as this require holding the complete buffer in memory at once, where most things can just hold the part of data they are working on (like with encoding/json#Decoder).

1 Like

Thanks for your answer, it helped me a lot.
I used the thing that you said was the best, and it works quite well, I only have one problem.
When one of the two peers quits the network (and it should close the stream), the other one continues on reading (I think) the latest message received, since I have that code inside a for{} loop.
How can I avoid that? Do I do something wrong when closing the stream from the peer that quits the network or is there a way to stop the peer from reading that message infinitely?

EDIT: I should use stream.reset, but every stream is handled with a stream handler, and I don’t know how to reach every stream to reset it. Is there a way to know if the other peer of the stream is offline in that for{} loop? If so, I can use if offline { stream.reset } from the online peer, but I don’t find anything

EDIT 2: I managed to solve that issue by checking json fields, cause it sees an empty json when the stream is closed. I don’t think this is the perfect way to do it but at least it works.

I should use stream.reset, but every stream is handled with a stream handler, and I don’t know how to reach every stream to reset it. Is there a way to know if the other peer of the stream is offline in that for{} loop? If so, I can use if offline { stream.reset } from the online peer, but I don’t find anything

Sorry for the late response.

Your empty json object work I guess :slight_smile: and I don’t see any particular issues with it, as long as you never send empty jsons then you can’t mistake them.

However decoder#Decode actually return an error.
So the standart way of doing this would be :

import "encoding/json"

// ...
// Later in your code
decoder := json.NewDecoder(rw) // Create a cipher json decoder
var result ObjectTypeToDecodeInto // This is placed here to reuse the same allocation
// If you need to check for unpopulated fields, you would probably put it in the for loop instead
for {
  err := decoder.Decode(&result)
  if err != nil {
    // Have caught a reset likely (or other error)
    break
  }
  // Do stuff with result ...
}
// Connection has ended
1 Like

Thanks for your reply and your help.
I didn’t see that decoder#Decode returned an error :sweat_smile: