I’ve set up a toy example to test out the facilities provided by github.com/libp2p/go-libp2p-pubsub
. In the following example, 10 (Host
, GossipSub
) tuples are started, and a single message is broadcast from one host to all others.
When I run this test, the subscribers hang until the receiver context expires.
N.B.: This test case is structured as a Go unit test. It must be run with go test
.
Why isn’t my message being delivered? How can this be fixed?
package example_test
import (
"context"
"testing"
"time"
datastore "github.com/ipfs/go-datastore"
sync "github.com/ipfs/go-datastore/sync"
p2p "github.com/libp2p/go-libp2p"
discovery "github.com/libp2p/go-libp2p-discovery"
host "github.com/libp2p/go-libp2p-host"
dht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"github.com/stretchr/testify/require"
)
type psHost struct {
host host.Host
ps *pubsub.PubSub
}
func TestPubSub(t *testing.T) {
hosts := make([]*psHost, 0, 10)
for _, addr := range []string{
"/ip4/127.0.0.1/tcp/2020",
"/ip4/127.0.0.1/tcp/2021",
"/ip4/127.0.0.1/tcp/2022",
"/ip4/127.0.0.1/tcp/2023",
"/ip4/127.0.0.1/tcp/2024",
"/ip4/127.0.0.1/tcp/2025",
"/ip4/127.0.0.1/tcp/2026",
"/ip4/127.0.0.1/tcp/2027",
"/ip4/127.0.0.1/tcp/2028",
"/ip4/127.0.0.1/tcp/2029",
} {
h, err := start(addr)
if err != nil {
require.NoError(t, err)
}
defer h.host.Close()
hosts = append(hosts, h)
}
discover(hosts)
require.NoError(t, broadcast(hosts))
}
func discover(hosts []*psHost) {
for i, h := range hosts {
if i == 0 {
continue
}
prev := hosts[i-1].host
h.host.Peerstore().AddAddrs(prev.ID(), prev.Addrs(), time.Hour)
}
}
func broadcast(hosts []*psHost) error {
var g errgroup.Group
g.Go(recv(hosts[1:]))
time.Sleep(time.Millisecond * 100) // wait for recvers to start
g.Go(send(hosts[0]))
return g.Wait()
}
func start(addr string) (*psHost, error) {
h, err := p2p.New(context.Background(),
p2p.ListenAddrStrings(addr))
if err != nil {
return nil, err
}
ps, err := pubsub.NewGossipSub(context.Background(), h,
// XXX: maybe the Host needs to be a RoutedHost?
pubsub.WithDiscovery(discovery.NewRoutingDiscovery(newDHT(h))))
if err != nil {
return nil, err
}
return &psHost{host: h, ps: ps}, nil
}
func newDHT(h host.Host) *dht.IpfsDHT {
return dht.NewDHT(context.Background(), h,
sync.MutexWrap(datastore.NewMapDatastore()))
}
func send(h *psHost) func() error {
return func() error {
topic, err := h.ps.Join("test")
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
return errors.Wrap(topic.Publish(ctx, []byte("hello, world!")), "send")
}
}
func recv(hs []*psHost) func() error {
return func() error {
var g errgroup.Group
for _, h := range hs {
g.Go(rcv(h))
}
return g.Wait()
}
}
func rcv(h *psHost) func() error {
return func() error {
topic, err := h.ps.Join("test")
if err != nil {
return err
}
sub, err := topic.Subscribe()
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
msg, err := sub.Next(ctx)
if err != nil {
return errors.Wrap(err, "recv")
}
if msg.String() != "hello, world!" {
return errors.New("invalid data")
}
return nil
}
}