Rust: Reading from socket slower with every event

Hi! I have question related to my Rust implementation of InboundUpgrade and OutboundUpgrade.

I have observed that reading data from socket has degraded performance for each subsequent event and I can’t figure out what is the culprit here.

Sligthly preformatted logs from the sender (build through cargo build --release):

Upgrade outbound
Finished 368 ms

Upgrade outbound
Finished 521 ms

Upgrade outbound
Finished 641 ms

Upgrade outbound
Finished 751 ms

Upgrade outbound
Finished 828 ms

Upgrade outbound
Finished 920 ms

Upgrade outbound
Finished 942 ms

Upgrade outbound
Finished 1128 ms

Upgrade outbound
Finished 1197 ms

As you can see, the time increases monotonically for some reason.

Here are my implementations of Inbound and Outbound upgrades:

const CHUNK_SIZE: usize = 1024;

impl<TSocket> InboundUpgrade<TSocket> for TransferPayload
where
    TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
    type Output = TransferPayload;
    type Error = io::Error;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;

    fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
        Box::pin(async move {
            println!("Upgrade inbound");
            let mut reader = asyncio::BufReader::new(socket);
            let start = now();
            let now = SystemTime::now();
            let timestamp = now.duration_since(UNIX_EPOCH).expect("Time failed");

            let name = "file".to_string();
            let path = format!("/tmp/files/{}_{}.flac", timestamp.as_secs(), name);
            let mut file =
                asyncio::BufWriter::new(File::create(&path).await.expect("Cannot create file"));

            let mut counter: usize = 0;
            let mut payloads: Vec<u8> = vec![];
            loop {
                let mut buff = vec![0u8; CHUNK_SIZE];
                match reader.read(&mut buff).await {
                    Ok(n) => {
                        if n > 0 {
                            // Use simple in memory buffering for now
                            payloads.extend(&buff[..n]);
                            counter += n;
                        } else {
                            break;
                        }
                    }
                    Err(e) => panic!("Failed reading the socket {:?}", e),
                }
            }
            file.write_all(&payloads).await?;
            println!("Finished {:?} ms", start.elapsed().as_millis());
            println!("Name: {}, Read {:?} bytes", name, counter);
            let event = TransferPayload::new(name, path);
            reader.close().await?;
            Ok(event)
        })
    }
}


impl<TSocket> OutboundUpgrade<TSocket> for TransferPayload
where
    TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
    type Output = ();
    type Error = io::Error;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;

    fn upgrade_outbound(self, mut socket: TSocket, _: Self::Info) -> Self::Future {
        Box::pin(async move {
            println!("Upgrade outbound");
            let start = now();
            let filename = "file.flac";
            let path = format!("/tmp/{}", filename);
            let mut file = asyncio::BufReader::new(File::open(path).await.expect("File missing"));
            let mut contents = vec![];
            file.read_to_end(&mut contents).await.expect("Cannot read file");
            socket.write_all(&contents).await?;
            socket.close().await?;
            println!("Finished {:?} ms", start.elapsed().as_millis());
            Ok(())
        })
    }
}

Full code is available here: https://github.com/sireliah/p2pshare/blob/master/src/protocol.rs
Please note that this is not full implementation, it’s just a playbook to find how libp2p works.

My guesses:

  • memory issues - some object is not being freed properly?
  • I/O waits on the operating system level?
  • socket or file descriptor not flushed/closed properly in code?

I would be really grateful if someone could help me find out what is the reason of the slow down of the performance. Probably it’s something really obvious I missed. :stuck_out_tongue:

Oh, I’ve found the root cause now - it’s the multiplexing!

I’ve had:

let mplex = mplex::MplexConfig::new();
let transport = TcpConfig::new()
    .upgrade(Version::V1)
    .authenticate(SecioConfig::new(local_keys.clone()))
    .multiplex(mplex);

Libp2p provides:

Ok(CommonTransport::new()?
    .upgrade(core::upgrade::Version::V1)
    .authenticate(secio::SecioConfig::new(keypair))
    .multiplex(core::upgrade::SelectUpgrade::new(yamux::Config::default(), mplex::MplexConfig::new()))
    .map(|(peer, muxer), _| (peer, core::muxing::StreamMuxerBox::new(muxer)))
    .timeout(Duration::from_secs(20)))

https://docs.rs/libp2p/0.16.2/src/libp2p/lib.rs.html#245

It seems that I didn’t separate streams for each peer using StreamMuxerBoxand didn’t use SelectUpgrade, however it’s not clear for me why this caused increasing time of processing on the socket.