Why aren't my libp2p PubSub messages being delivered?

I’ve set up a toy example to test out the facilities provided by github.com/libp2p/go-libp2p-pubsub. In the following example, 10 (Host, GossipSub) tuples are started, and a single message is broadcast from one host to all others.

When I run this test, the subscribers hang until the receiver context expires.

N.B.: This test case is structured as a Go unit test. It must be run with go test.

Why isn’t my message being delivered? How can this be fixed?

package example_test

import (
	"context"
	"testing"
	"time"

	datastore "github.com/ipfs/go-datastore"
	sync "github.com/ipfs/go-datastore/sync"
	p2p "github.com/libp2p/go-libp2p"
	discovery "github.com/libp2p/go-libp2p-discovery"
	host "github.com/libp2p/go-libp2p-host"
	dht "github.com/libp2p/go-libp2p-kad-dht"
	pubsub "github.com/libp2p/go-libp2p-pubsub"
	"github.com/pkg/errors"
	"golang.org/x/sync/errgroup"

	"github.com/stretchr/testify/require"
)

type psHost struct {
	host host.Host
	ps   *pubsub.PubSub
}

func TestPubSub(t *testing.T) {

	hosts := make([]*psHost, 0, 10)

	for _, addr := range []string{
		"/ip4/127.0.0.1/tcp/2020",
		"/ip4/127.0.0.1/tcp/2021",
		"/ip4/127.0.0.1/tcp/2022",
		"/ip4/127.0.0.1/tcp/2023",
		"/ip4/127.0.0.1/tcp/2024",
		"/ip4/127.0.0.1/tcp/2025",
		"/ip4/127.0.0.1/tcp/2026",
		"/ip4/127.0.0.1/tcp/2027",
		"/ip4/127.0.0.1/tcp/2028",
		"/ip4/127.0.0.1/tcp/2029",
	} {
		h, err := start(addr)
		if err != nil {
			require.NoError(t, err)
		}
		defer h.host.Close()

		hosts = append(hosts, h)
	}

	discover(hosts)

	require.NoError(t, broadcast(hosts))
}

func discover(hosts []*psHost) {
	for i, h := range hosts {
		if i == 0 {
			continue
		}

		prev := hosts[i-1].host

		h.host.Peerstore().AddAddrs(prev.ID(), prev.Addrs(), time.Hour)
	}
}

func broadcast(hosts []*psHost) error {
	var g errgroup.Group
	g.Go(recv(hosts[1:]))

	time.Sleep(time.Millisecond * 100) // wait for recvers to start

	g.Go(send(hosts[0]))
	return g.Wait()
}

func start(addr string) (*psHost, error) {
	h, err := p2p.New(context.Background(),
		p2p.ListenAddrStrings(addr))

	if err != nil {
		return nil, err
	}

	ps, err := pubsub.NewGossipSub(context.Background(), h,
		// XXX: maybe the Host needs to be a RoutedHost?
		pubsub.WithDiscovery(discovery.NewRoutingDiscovery(newDHT(h))))

	if err != nil {
		return nil, err
	}

	return &psHost{host: h, ps: ps}, nil
}

func newDHT(h host.Host) *dht.IpfsDHT {
	return dht.NewDHT(context.Background(), h,
		sync.MutexWrap(datastore.NewMapDatastore()))
}

func send(h *psHost) func() error {
	return func() error {
		topic, err := h.ps.Join("test")
		if err != nil {
			return err
		}

		ctx, cancel := context.WithTimeout(context.Background(), time.Second)
		defer cancel()

		return errors.Wrap(topic.Publish(ctx, []byte("hello, world!")), "send")
	}
}

func recv(hs []*psHost) func() error {
	return func() error {
		var g errgroup.Group
		for _, h := range hs {
			g.Go(rcv(h))
		}
		return g.Wait()
	}
}

func rcv(h *psHost) func() error {
	return func() error {
		topic, err := h.ps.Join("test")
		if err != nil {
			return err
		}

		sub, err := topic.Subscribe()
		if err != nil {
			return err
		}

		ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
		defer cancel()

		msg, err := sub.Next(ctx)
		if err != nil {
			return errors.Wrap(err, "recv")
		}

		if msg.String() != "hello, world!" {
			return errors.New("invalid data")
		}

		return nil
	}
}

It seems like this is a bug. ETA to fix: a few weeks.