How to send multiple messages on a Stream in TypeScript?

Loving the new libp2p JS bindings, but can’t figure out how to code imperatively with them.

I want to make a simple client-server example. The client sends multiple messages at different points in the code to the server. So far, my server receives one message, but it appears to block and not receive the second. What am I doing wrong?

Server:

async function main() {
  const prvkey = await generateKeyPair('secp256k1')
  console.log('key:', marshalPrivateKey(key).toString('hex'))

  const key = await unmarshalPrivateKey(Buffer.from(prvkey, 'hex'))
  const id = await createFromPrivKey(key)
  console.log(id)

  const node = await createLibp2p({
    addresses: {
      // add a listen address (localhost) to accept TCP connections on a random port
      listen: ['/ip4/127.0.0.1/tcp/49852']
    },
    transports: [tcp()],
    connectionEncryption: [noise()],
    streamMuxers: [mplex()],
    peerId: id
  })

  console.log(node.peerId)

  // start libp2p
  await node.start()
  console.log('libp2p has started')

  // print out listening addresses
  console.log('listening on addresses:')
  node.getMultiaddrs().forEach((addr) => {
    console.log(addr.toString())
  })

  node.addEventListener('peer:connect', (evt) => {
    const remotePeer = evt.detail
    console.log('received dial to me from:', remotePeer.toString())
  })

  await node.handle('/axon-protocol/1.0.0', async ({ stream }) => {
    console.log('conn')

    // Receive JSON data from the remote peer
    pipe(
      // Read from the stream (the source)
      stream.source,
      // (source) => {
      //   console.log(1)
      //   return source
      // },
      // Decode length-prefixed data
      // (source) => lp.decode(source),
      // Turn buffers into strings
      (source) => map(source, (buf) => uint8ArrayToString(buf.subarray())),
      // Sink function
      async function (source) {
        // For each chunk of data
        for await (const msg of source) {
          // Output the data as a utf8 string
          console.log('> ' + msg.toString().replace('\n', ''))
        }
      }
    )

  })

  // stop libp2p
  // await node.stop()
  // console.log('libp2p has stopped')
}

Client:

    const node = await createLibp2p({
      addresses: {
        // add a listen address (localhost) to accept TCP connections on a random port
        listen: ['/ip4/127.0.0.1/tcp/0']
      },
      transports: [tcp()],
      connectionEncryption: [noise()],
      streamMuxers: [mplex()]
    })

    // start libp2p
    await node.start()
    console.log('libp2p has started')

    // print out listening addresses
    console.log('listening on addresses:')
    node.getMultiaddrs().forEach((addr) => {
      console.log(addr.toString())
    })

    await node.start()

    const AGGREGATOR_ADDR = `/ip4/127.0.0.1/tcp/49852/p2p/16Uiu2HAmUvAB48Xn9aB4db7rVz53EwbqjoJhy94fSGg9xVFt6Te1`
    console.log(`connecting to axon aggregator, address ${AGGREGATOR_ADDR}`)
    const ma =  multiaddr(AGGREGATOR_ADDR)
    const stream = await node.dialProtocol(ma, ['/axon-protocol/1.0.0'])

    const messageQueue = asyncLib.queue(async (message, done) => {
      try {
        const messageString = JSON.stringify(message);
        await pipe([uint8ArrayFromString(messageString)], stream.sink);
        console.log('Sent message:', messageString);
        done();
      } catch (error) {
        done(error);
      }
    }, 1);
    
    for(let i = 0; i < 2; i++) {
      messageQueue.push("hello world - " + i);
    }

I don’t really know if async-queue is the right pattern to use, but I was struggling with trying to achieve imperative style networking logic (eg. stream.write(msg)) within the context of libp2p FP-style streams (e.g. pipe(blah, stream.sink))

Thanks

Here’s an example where we send an identify push update to a list of connections using it-protobuf-stream , mind you there was an issue we recently patched as it relates to cleanly closing streams this way which was released in v.0.46.0.

Let me know if this helps.