I’m looking to write a network behavior that utilizes mDNS, floodsub, and a kademlia DHT. So far, I’ve gotten each of these services to work, but haven’t been able to use the responses from these services for anything meaningful.
Ideally, I’d be able to pipe the data that comes from a behavior process event, like one that would be implemented for a Kad DHT, into the main Swarm poll
loop. For example, in my case, I have a reference to a struct representing a graph that is persisted to the disk through the sled
database. Ownership of this struct exists in the method that is polling the swarm. How would I go about updating this graph (e.g. adding an entry) when a KademliaEvent
is received?
Solutions I’ve tried:
- Moving ownership of the data structure I want to update to the
ClientBehavior
struct so I can justself.my_data_structure.add(result);
from theKademliaEvent
inject_event
method-
#[derive(NetworkBehaviour)]
does NOT like this at all -
Graph
is just a struct, and doesn’t emit any events / implementNetworkBehaviour
-
- Creating a
Context
struct thatderive
sNetworkBehaviour
and can be used to pipe a response back and forth between the polling method and the respectiveinject_event
method-
#[derive(NetworkBehaviour)]
doesn’t place nice withArc
s /Mutex
es
-
Here’s what my NetworkBehavior looks like:
/// A network behavior describing a client connected to a pub-sub compatible,
/// optionally mDNS-compatible network. Such a "behavior" may be implemented for
/// any libp2p transport, but any transport used with this behavior must implement
/// asynchronous reading & writing capabilities.
#[derive(NetworkBehaviour)]
pub struct ClientBehavior<TSubstream: AsyncRead + AsyncWrite + Send + Unpin + 'static> {
/// Some pubsub mechanism bound to the above transport
pub floodsub: Floodsub<TSubstream>,
/// Some mDNS service bound to the above transport
pub mdns: Mdns<TSubstream>,
/// Allow for the client to do some external discovery on the global network through a KAD DHT
pub kad_dht: Kademlia<TSubstream, MemoryStore>,
}
and what my Kademlia NetworkBehaviourEventProceess
implementation looks like:
impl<TSubstream: AsyncRead + AsyncWrite + Send + Unpin + 'static>
NetworkBehaviourEventProcess<KademliaEvent> for ClientBehavior<TSubstream>
{
fn inject_event(&mut self, event: KademliaEvent) {
// Some behavior logic, nothing special, really, just a bunch of matches
// NOTE: this is when I'd want to update the `Graph`, since KademliaEvent will contain data that I need to put in the `Graph` instance
}
}
and how I’m spawning my Swarm
, and my ClientBehavior
:
// Initialize a new behavior for a client that we will generate in the not-so-distant future with the given peerId, alongside
// an mDNS service handler as well as a floodsub instance targeted at the given peer
let mut behavior = ClientBehavior {
floodsub: Floodsub::new(self.peer_id.clone()),
mdns: Mdns::new().await?,
kad_dht: Kademlia::new(self.peer_id.clone(), store),
};
// Iterate through bootstrap addresses
for bootstrap_peer in bootstrap_addresses {
// NOTE: add_address is a method that isn't provided by #[derive(NetworkBehaviour)].
// It's just a helper method I wrote that adds the peer to the Floodsub & DHT views.
behavior.add_address(&bootstrap_peer.0, bootstrap_peer.1); // Add the bootstrap peer to the DHT
}
// Bootstrap the behavior's DHT
behavior.kad_dht.bootstrap();
// Note: here, `self` is a reference to a configuration struct holding a peer ID, and a keypair
let mut swarm = Swarm::new(
libp2p::build_tcp_ws_secio_mplex_yamux(self.keypair.clone())?,
behavior,
self.peer_id.clone(),
); // Initialize a swarm
and how I’m polling the swarm:
// NOTE: Here, `self` has ownership of the aforementioned `Graph` instance. I'd like to update this
// instance from this block, or from the ClientBehavior itself--as long as I'm able to update it once a `KademliaEvent` is received.
// Try to get the address we'll listen on
if let Ok(addr) = format!("/ip4/0.0.0.0/tcp/{}", port).parse::<Multiaddr>() {
// Try to tell the swarm to listen on this address, return an error if this doesn't work
if let Err(e) = Swarm::listen_on(&mut swarm, addr.clone()) {
// Convert the addr err into an io error
let e: std::io::Error = io::ErrorKind::AddrNotAvailable.into();
// Return an error
return Err(e.into());
};
// Fetch the hash of the first transaction in the DAG from the network
swarm
.kad_dht
.get_record(&Key::new(&sync::ROOT_TRANSACTION_KEY), Quorum::Majority);
loop {
// Poll the swarm
match swarm.next_event().await {
// NOTE: Initially, I was under the impression that I would be able to intercept
// events from the ClientBehavior here. Yet, the below info! call is never reached.
// This remains the case in libp2p example code that I have experimented with.
SwarmEvent::Behaviour(e) => info!("idk: {:?}", e),
_ => debug!("Some other event; this is handled specifically in the actual codebase, but for this question, all I really care about is the above behavior event."),
};
}
}
Will I just have to implement NetworkBehaviour
myself?