Hi! I have question related to my Rust implementation of InboundUpgrade and OutboundUpgrade.
I have observed that reading data from socket has degraded performance for each subsequent event and I can’t figure out what is the culprit here.
Sligthly preformatted logs from the sender (build through cargo build --release
):
Upgrade outbound
Finished 368 ms
Upgrade outbound
Finished 521 ms
Upgrade outbound
Finished 641 ms
Upgrade outbound
Finished 751 ms
Upgrade outbound
Finished 828 ms
Upgrade outbound
Finished 920 ms
Upgrade outbound
Finished 942 ms
Upgrade outbound
Finished 1128 ms
Upgrade outbound
Finished 1197 ms
As you can see, the time increases monotonically for some reason.
Here are my implementations of Inbound and Outbound upgrades:
const CHUNK_SIZE: usize = 1024;
impl<TSocket> InboundUpgrade<TSocket> for TransferPayload
where
TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
type Output = TransferPayload;
type Error = io::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
Box::pin(async move {
println!("Upgrade inbound");
let mut reader = asyncio::BufReader::new(socket);
let start = now();
let now = SystemTime::now();
let timestamp = now.duration_since(UNIX_EPOCH).expect("Time failed");
let name = "file".to_string();
let path = format!("/tmp/files/{}_{}.flac", timestamp.as_secs(), name);
let mut file =
asyncio::BufWriter::new(File::create(&path).await.expect("Cannot create file"));
let mut counter: usize = 0;
let mut payloads: Vec<u8> = vec![];
loop {
let mut buff = vec![0u8; CHUNK_SIZE];
match reader.read(&mut buff).await {
Ok(n) => {
if n > 0 {
// Use simple in memory buffering for now
payloads.extend(&buff[..n]);
counter += n;
} else {
break;
}
}
Err(e) => panic!("Failed reading the socket {:?}", e),
}
}
file.write_all(&payloads).await?;
println!("Finished {:?} ms", start.elapsed().as_millis());
println!("Name: {}, Read {:?} bytes", name, counter);
let event = TransferPayload::new(name, path);
reader.close().await?;
Ok(event)
})
}
}
impl<TSocket> OutboundUpgrade<TSocket> for TransferPayload
where
TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
type Output = ();
type Error = io::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
fn upgrade_outbound(self, mut socket: TSocket, _: Self::Info) -> Self::Future {
Box::pin(async move {
println!("Upgrade outbound");
let start = now();
let filename = "file.flac";
let path = format!("/tmp/{}", filename);
let mut file = asyncio::BufReader::new(File::open(path).await.expect("File missing"));
let mut contents = vec![];
file.read_to_end(&mut contents).await.expect("Cannot read file");
socket.write_all(&contents).await?;
socket.close().await?;
println!("Finished {:?} ms", start.elapsed().as_millis());
Ok(())
})
}
}
Full code is available here: p2pshare/src/protocol.rs at master · sireliah/p2pshare · GitHub
Please note that this is not full implementation, it’s just a playbook to find how libp2p works.
My guesses:
- memory issues - some object is not being freed properly?
- I/O waits on the operating system level?
- socket or file descriptor not flushed/closed properly in code?
I would be really grateful if someone could help me find out what is the reason of the slow down of the performance. Probably it’s something really obvious I missed.