How to send and receive messages to/via multiple streams?

Hi everyone, I have a questions about the streams in libp2p (golang):

The nodes in the network are connected via streams. Everytime a new node connects, a new stream to this is opened. When I have a network with more than two nodes, they are not all directly connected with each other. Hence, when I send messages via the stream, not all nodes receive those messages at a time. Instead, only the directly connected nodes, and if there are more than one, even those only alternately. There is a video on YouTube that explains this issue around minute 5:20 (https://www.youtube.com/watch?v=hP0hSZ7E7_Y) saying it is because of the multiple streams and thus works like a round-robin. It also says it could be solved easily (around minute 09:45).

For my application, I need the messages that are sent by one node to arrive at all other nodes as well, not only the directy connected ones. I thought that is the way those streams work and make p2p easy to handle. Unfortunately, I have no idea how to solve this issue, and I hope anybody of you can help me. I hope you can understand my issue with the help of the video; if not feel free to ask. Thank you!

1 Like

Considering using pubsub functionality: https://github.com/libp2p/go-libp2p-pubsub
This will creates topics (a kind of virtual channels) that peers can subscribe or unsubscribe. A message published on a topic will reach all other peers subscribed to that topic.

3 Likes

Hey @samy27, welcome to the libp2p forums!

I just want to +1 @iulianpascalau’s suggestion to checkout pubsub - when you use it, all participants in a topic will receive messages without all of them needing direct connections to each other.

In go you can use either floodsub or gossipsub as your pubsub implementation - floodsub is widely supported by other libp2p language implementations, but is less efficient than gossipsub.

1 Like

Thank you for the hint! That seems to be exactly what I need. I will try it out!

1 Like

Hi,
I had no time so far but now I am ready to try out pubsub. Just a few questions before I start:

  • I am working with go. Travis-CI says that the build of go-libp2p pubsub is failing. Can I even use it then?
  • Currently I use streams to connect new peers with existing ones (as described above). My idea is to keep them and once a new peer connects, he subscribes to the topic and received all those messages. Is that the right way to handle it? Or can’t I keep the streams when using pubsub?

Thanks for your help!

  • I am working with go. Travis-CI says that the build of go-libp2p-pubsub is failing. Can I even use it then?

Yes, you’re fine. CI failed due to a flaky test, but you should be able to import the module correctly and it should work.

  • Is that the right way to handle it? Or can’t I keep the streams when using pubsub?

Yes, you can open streams independently of the pubsub protocol. In fact, pubsub protocol uses dedicated libp2p streams for its traffic.

1 Like

Thanks @raul !
I am using the gx package manager (GitHub - whyrusleeping/gx: A package management tool) in my code and now I run into trouble as it seems not to be compatible with pubsub. I already tried to import the last gx published version of this module (0.11.16: QmfB4oDUTiaGEqT13P1JqCEhqW7cB1wpKtq3PP4BN8PhQd) but then I get the following error:

cannot use basicHost (type “gx/ipfs/Qm…/go-libp2p-host”.Host) as type "github.com/libp2p/go-libp2p-host.Host in argument to pubsub.NewFloodSub

I cannot simply remove the gx dependencies as the whole code works with it. Any idea how to solve this?

bsicHost looks like a typo. Are you creating one manually? You should be using the libp2p constructor (libp2p.New).

Also note that we have deprecated gx and have stopped updating our packages. I would recommend switching to gomod.

Sorry, that was a spelling mistake. I create the host with

basicHost, _ := libp2p.New(context.Background(), opts…).

I have no experience with gx. I use it because I was following this tutorial:

The host is this one: “GitHub - libp2p/go-libp2p-host: [DEPRECATED] The host interface for go-libp2p; use https://github.com/libp2p/go-libp2p-core/ instead.”.
But making those gx dependencies, in my code it changed to “gx/ipfs/Qm…/go-libp2p-host”.

And not only the host, all the packages that were imported changed to gx…

Is it easy to remove the gx dependencies? I have really no experience with that. Thank you!!!

You should not use the BasicHost type directly, as the constructor is not guaranteed to return one. In fact, if you also pass the Routing option, the host will be a RoutedHost.

More generally, I would advise against using internal go-libp2p types, and just using the interfaces.

Regarding gx, you can run gx-go uw and it will unrewrite the dependencies. After that, you can simply remove gx from the workflow and use gomod.

Thank you! Are those dependencies really necessary or would it also work without them?

You don’t need anything from gx dependencies any more; gomod takes care of resolving the necessary dependencies.

1 Like

Is there any tutorial or project that explains step-by-step how to use PubSub?
Starting from constructing a PubSub instance, opening a new topic, subscribing to this, connecting with other peers, other peers also subscribing to the topic, sending and receiving messages etc.

I only found the documentation (https://godoc.org/github.com/libp2p/go-libp2p-pubsub) and the source code (https://github.com/libp2p/go-libp2p-pubsub) but no examples of how to apply it correctly.

We don’t currently have a tutorial, but we should definitely make something. Perhaps an example in go-libp2p-examples.

1 Like

Okay, thank you @vyzo !

If anybody here has already used it, I would be very glad if you could share your code! Thank you!

I will try to come up with an easy to understand example of how we use pubsub system. We rely on the Validator func as an async callback func from p2p subsystem. So, in this validator func we do the checking (test if the message is ok) and then we start a go routine that does the actual message processing. By using the validators we have achieved 2 goals:

  1. invalid messages are caught and not flooded within the network;
  2. message validation (which might include serializing/deserializing) is done only once.

Please try the following code (sorry about he length, could not made it under 177 lines)

package main

import (
	"context"
	"crypto/ecdsa"
	"crypto/rand"
	"fmt"
	"strings"
	"time"

	"github.com/btcsuite/btcd/btcec"
	"github.com/libp2p/go-libp2p"
	crypto "github.com/libp2p/go-libp2p-crypto"
	host "github.com/libp2p/go-libp2p-host"
	peer "github.com/libp2p/go-libp2p-peer"
	peerstore "github.com/libp2p/go-libp2p-peerstore"
	pubsub "github.com/libp2p/go-libp2p-pubsub"
	"github.com/multiformats/go-multiaddr"
)

func main(){
	hosts := make([]host.Host, 0)
	startingPort := 10000
	var err error

	//Step 1. Create 5 nodes
	for i := 0; i < 5; i++ {
		h, err := createHost(startingPort + i)
		if err != nil{
			fmt.Printf("Error encountered: %v\n", err)
			return
		}

		hosts = append(hosts, h)
		fmt.Printf("Node %v is %s\n", i, getLocalhostAddress(h))
	}

	defer func(){
		for _, h := range hosts{
			_ = h.Close()
		}
	}()

	//Step 2. Create pubsubs
	pubsubs := make([]*pubsub.PubSub, len(hosts))
	for i, h := range hosts{
		pubsubs[i], err = applyPubSub(h)
		if err != nil{
			fmt.Printf("Error encountered: %v\n", err)
			return
		}
	}

	//Step 3. Register to the topic and add topic validators
	topic := "test"
	for i := 0; i < len(pubsubs); i++{
		var subscr *pubsub.Subscription
		subscr, err = pubsubs[i].Subscribe(topic)
		if err != nil{
			fmt.Printf("Error encountered: %v\n", err)
			return
		}

		//just a dummy func to consume messages received by the newly created topic
		go func() {
			for {
				//here you will actually have the message received after passing all validators
				//not required since we put validators on each topic and the message has already been processed there
				_, _ = subscr.Next(context.Background())
			}
		}()

		crtHost := hosts[i]
		err = pubsubs[i].RegisterTopicValidator(topic, func(ctx context.Context, pid peer.ID, msg *pubsub.Message) bool{
			//do the message validation
			//example: deserialize msg.Data, do checks on the message, etc.

			//processing part should be done on a go routine as the validator func should return immediately
			go func(data []byte, p peer.ID, h host.Host){
				fmt.Printf("%s: Message: '%s' was received from %s\n", crtHost.ID(), msg.Data, pid.Pretty())
			}(msg.Data, pid, crtHost)

			//if the return value is true, the message will hit other peers
			//if the return value is false, this peer prevented message broadcasting
			//note that this topic validator will be called also for messages sent by self
			return true
		})
	}

	//Step 4. Connect the nodes as following:
	//
	// node0 --------- node1
	//   |               |
	//   +------------ node2
	//   |               |
	//   |             node3
	//   |               |
	//   +------------ node4
	connectHostToPeer(hosts[1], getLocalhostAddress(hosts[0]))
	connectHostToPeer(hosts[2], getLocalhostAddress(hosts[1]))
	connectHostToPeer(hosts[2], getLocalhostAddress(hosts[0]))
	connectHostToPeer(hosts[3], getLocalhostAddress(hosts[2]))
	connectHostToPeer(hosts[4], getLocalhostAddress(hosts[3]))
	connectHostToPeer(hosts[4], getLocalhostAddress(hosts[0]))

	//Step 5. Wait so that subscriptions on topic will be done and all peers will "know" of all other peers
	time.Sleep(time.Second * 2)

	fmt.Println("Broadcasting a message on node 0...")

	err = pubsubs[0].Publish(topic, []byte("a message"))
	if err != nil{
		fmt.Printf("Error encountered: %v\n", err)
		return
	}

	time.Sleep(time.Second)
}


func createHost(port int) (host.Host, error){
	prvKey, _ := ecdsa.GenerateKey(btcec.S256(), rand.Reader)
	sk := (*crypto.Secp256k1PrivateKey)(prvKey)

	opts := []libp2p.Option{
		libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port)),
		libp2p.Identity(sk),
		libp2p.DefaultTransports,
		libp2p.DefaultMuxers,
		libp2p.DefaultSecurity,
	}

	h, err := libp2p.New(context.Background(), opts...)
	if err != nil {
		return nil, err
	}

	return h, nil
}

func applyPubSub(h host.Host) (*pubsub.PubSub, error){
	optsPS := []pubsub.Option{
		pubsub.WithMessageSigning(true),
	}

	return pubsub.NewGossipSub(context.Background(), h, optsPS...)
}

func getLocalhostAddress(h host.Host) string{
	for _, addr := range h.Addrs(){
		if strings.Contains(addr.String(), "127.0.0.1"){
			return addr.String() + "/p2p/" + h.ID().Pretty()
		}
	}

	return ""
}

func connectHostToPeer(h host.Host, connectToAddress string){
	multiAddr, err := multiaddr.NewMultiaddr(connectToAddress)
	if err != nil {
		fmt.Printf("Error encountered: %v\n", err)
		return
	}

	pInfo, err := peerstore.InfoFromP2pAddr(multiAddr)
	if err != nil {
		fmt.Printf("Error encountered: %v\n", err)
		return
	}

	err = h.Connect(context.Background(), *pInfo)
	if err != nil {
		fmt.Printf("Error encountered: %v\n", err)
	}
}
2 Likes

Hi @iulianpascalau, thank you for your help. It works! Just two points:

  1. Is the topic validator really necessary or is it optional?
    I can see the benefit that only one node checks if the messages are correct and by this reduces the message traffic but on the other hand it could prevent the forwarding of messages and thus be a single point of failure in the network.

  2. Is it possible to change the pre-defined message type (see below) to a customized one? Or should own data completely be written into “data”? I tried like in the example code but do not understand how I can write data into the message fields.

message Message { optional string from = 1; optional bytes data = 2; optional bytes seqno = 3; repeated string topicIDs = 4; optional bytes signature = 5; optional bytes key = 6; }
(from https://github.com/libp2p/specs/tree/master/pubsub)

Thank you for your help so far! Really great support in this forum!

  1. Topic validator is optional. If you choose not to use validators, the message can be received when calling subscribe.Next (from the above code, line 69). So validator registration from line 74 can be omitted. The validators were actually set on all hosts so each and every host will check if the message sent is valid. We use this behavior in our project (blockchain project) when transmitting transactions. Each node checks the received transaction for not nil fields, if the transaction is correctly signed and so on. That is why we use validators. Indeed, if one chooses to broadcast its transaction to only one peer from the network and that peer does censorship, the transaction will not hit the other peers. But that is how clients to our system will be implemented and that is a different story :slight_smile:
  2. We did not bother to use another message type to broadcast (I am not sure if it is even possible as how pubsub is written), we just use the publish func with our object serialized (start with json serializer, than you can use some more optimized serializers like proto buffs or cap’n’proto). When receiving the message, you can use the deserializing function to transfor msg.Data() to an usable object.
1 Like

Thanks for the clearification! :slight_smile:

Regarding the messages: how does pubsub handle this concretely? Raul mentioned that “pubsub protocol uses dedicated libp2p streams for its traffic”.

So far I was using buffered streams. When I finished writing data into them, I could send them using “Flush” and after reading them they were empty again:

rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))
// send message:
rw.WriteString(“msg”)
rw.Flush()
//read message
rw.ReadString()

I tried out pubsub now:

//send message to a topic:
pubsub.Publish(topic, byte(“msg”)
// receive message:
subscr.Next(context.Background())

The receiving is written in a go-routine.
The first message that I send does arrive correctly. But when I send another message to the same topic, (for example: respond to it/sending feedback to the sender, ) it does not arrive, even though I use the go-routine. Is it possible that this is somehow blocked? Do I have to delete/empty it first before receiving new messages? Thank you!