How can I emit a SwarmEvent::Behaviour from a derived NetworkBehaviour?

I’m looking to write a network behavior that utilizes mDNS, floodsub, and a kademlia DHT. So far, I’ve gotten each of these services to work, but haven’t been able to use the responses from these services for anything meaningful.

Ideally, I’d be able to pipe the data that comes from a behavior process event, like one that would be implemented for a Kad DHT, into the main Swarm poll loop. For example, in my case, I have a reference to a struct representing a graph that is persisted to the disk through the sled database. Ownership of this struct exists in the method that is polling the swarm. How would I go about updating this graph (e.g. adding an entry) when a KademliaEvent is received?

Solutions I’ve tried:

  • Moving ownership of the data structure I want to update to the ClientBehavior struct so I can just self.my_data_structure.add(result); from the KademliaEvent inject_event method
    • #[derive(NetworkBehaviour)] does NOT like this at all
    • Graph is just a struct, and doesn’t emit any events / implement NetworkBehaviour
  • Creating a Context struct that derives NetworkBehaviour and can be used to pipe a response back and forth between the polling method and the respective inject_event method
    • #[derive(NetworkBehaviour)] doesn’t place nice with Arcs / Mutexes

Here’s what my NetworkBehavior looks like:

/// A network behavior describing a client connected to a pub-sub compatible,
/// optionally mDNS-compatible network. Such a "behavior" may be implemented for
/// any libp2p transport, but any transport used with this behavior must implement
/// asynchronous reading & writing capabilities.
#[derive(NetworkBehaviour)]
pub struct ClientBehavior<TSubstream: AsyncRead + AsyncWrite + Send + Unpin + 'static> {
    /// Some pubsub mechanism bound to the above transport
    pub floodsub: Floodsub<TSubstream>,

    /// Some mDNS service bound to the above transport
    pub mdns: Mdns<TSubstream>,

    /// Allow for the client to do some external discovery on the global network through a KAD DHT
    pub kad_dht: Kademlia<TSubstream, MemoryStore>,
}

and what my Kademlia NetworkBehaviourEventProceess implementation looks like:

impl<TSubstream: AsyncRead + AsyncWrite + Send + Unpin + 'static>
    NetworkBehaviourEventProcess<KademliaEvent> for ClientBehavior<TSubstream>
{
    fn inject_event(&mut self, event: KademliaEvent) {
        // Some behavior logic, nothing special, really, just a bunch of matches
        // NOTE: this is when I'd want to update the `Graph`, since KademliaEvent will contain data that I need to put in the `Graph` instance
    }
}

and how I’m spawning my Swarm, and my ClientBehavior:

// Initialize a new behavior for a client that we will generate in the not-so-distant future with the given peerId, alongside
// an mDNS service handler as well as a floodsub instance targeted at the given peer
let mut behavior = ClientBehavior {
    floodsub: Floodsub::new(self.peer_id.clone()),
    mdns: Mdns::new().await?,
    kad_dht: Kademlia::new(self.peer_id.clone(), store),
};

// Iterate through bootstrap addresses
for bootstrap_peer in bootstrap_addresses {
    // NOTE: add_address is a method that isn't provided by #[derive(NetworkBehaviour)].
    // It's just a helper method I wrote that adds the peer to the Floodsub & DHT views.
    behavior.add_address(&bootstrap_peer.0, bootstrap_peer.1); // Add the bootstrap peer to the DHT
}

// Bootstrap the behavior's DHT
behavior.kad_dht.bootstrap();

// Note: here, `self` is a reference to a configuration struct holding a peer ID, and a keypair
let mut swarm = Swarm::new(
    libp2p::build_tcp_ws_secio_mplex_yamux(self.keypair.clone())?,
    behavior,
    self.peer_id.clone(),
); // Initialize a swarm

and how I’m polling the swarm:

// NOTE: Here, `self` has ownership of the aforementioned `Graph` instance. I'd like to update this
// instance from this block, or from the ClientBehavior itself--as long as I'm able to update it once a `KademliaEvent` is received.

// Try to get the address we'll listen on
if let Ok(addr) = format!("/ip4/0.0.0.0/tcp/{}", port).parse::<Multiaddr>() {
    // Try to tell the swarm to listen on this address, return an error if this doesn't work
    if let Err(e) = Swarm::listen_on(&mut swarm, addr.clone()) {
        // Convert the addr err into an io error
        let e: std::io::Error = io::ErrorKind::AddrNotAvailable.into();

        // Return an error
        return Err(e.into());
    };

    // Fetch the hash of the first transaction in the DAG from the network
    swarm
        .kad_dht
        .get_record(&Key::new(&sync::ROOT_TRANSACTION_KEY), Quorum::Majority);

    loop {
        // Poll the swarm
        match swarm.next_event().await {
            // NOTE: Initially, I was under the impression that I would be able to intercept
            // events from the ClientBehavior here. Yet, the below info! call is never reached.
            // This remains the case in libp2p example code that I have experimented with.
            SwarmEvent::Behaviour(e) => info!("idk: {:?}", e),

            _ => debug!("Some other event; this is handled specifically in the actual codebase, but for this question, all I really care about is the above behavior event."),
        };
    }
}

Will I just have to implement NetworkBehaviour myself?