Loving the new libp2p JS bindings, but can’t figure out how to code imperatively with them.
I want to make a simple client-server example. The client sends multiple messages at different points in the code to the server. So far, my server receives one message, but it appears to block and not receive the second. What am I doing wrong?
Server:
async function main() {
const prvkey = await generateKeyPair('secp256k1')
console.log('key:', marshalPrivateKey(key).toString('hex'))
const key = await unmarshalPrivateKey(Buffer.from(prvkey, 'hex'))
const id = await createFromPrivKey(key)
console.log(id)
const node = await createLibp2p({
addresses: {
// add a listen address (localhost) to accept TCP connections on a random port
listen: ['/ip4/127.0.0.1/tcp/49852']
},
transports: [tcp()],
connectionEncryption: [noise()],
streamMuxers: [mplex()],
peerId: id
})
console.log(node.peerId)
// start libp2p
await node.start()
console.log('libp2p has started')
// print out listening addresses
console.log('listening on addresses:')
node.getMultiaddrs().forEach((addr) => {
console.log(addr.toString())
})
node.addEventListener('peer:connect', (evt) => {
const remotePeer = evt.detail
console.log('received dial to me from:', remotePeer.toString())
})
await node.handle('/axon-protocol/1.0.0', async ({ stream }) => {
console.log('conn')
// Receive JSON data from the remote peer
pipe(
// Read from the stream (the source)
stream.source,
// (source) => {
// console.log(1)
// return source
// },
// Decode length-prefixed data
// (source) => lp.decode(source),
// Turn buffers into strings
(source) => map(source, (buf) => uint8ArrayToString(buf.subarray())),
// Sink function
async function (source) {
// For each chunk of data
for await (const msg of source) {
// Output the data as a utf8 string
console.log('> ' + msg.toString().replace('\n', ''))
}
}
)
})
// stop libp2p
// await node.stop()
// console.log('libp2p has stopped')
}
Client:
const node = await createLibp2p({
addresses: {
// add a listen address (localhost) to accept TCP connections on a random port
listen: ['/ip4/127.0.0.1/tcp/0']
},
transports: [tcp()],
connectionEncryption: [noise()],
streamMuxers: [mplex()]
})
// start libp2p
await node.start()
console.log('libp2p has started')
// print out listening addresses
console.log('listening on addresses:')
node.getMultiaddrs().forEach((addr) => {
console.log(addr.toString())
})
await node.start()
const AGGREGATOR_ADDR = `/ip4/127.0.0.1/tcp/49852/p2p/16Uiu2HAmUvAB48Xn9aB4db7rVz53EwbqjoJhy94fSGg9xVFt6Te1`
console.log(`connecting to axon aggregator, address ${AGGREGATOR_ADDR}`)
const ma = multiaddr(AGGREGATOR_ADDR)
const stream = await node.dialProtocol(ma, ['/axon-protocol/1.0.0'])
const messageQueue = asyncLib.queue(async (message, done) => {
try {
const messageString = JSON.stringify(message);
await pipe([uint8ArrayFromString(messageString)], stream.sink);
console.log('Sent message:', messageString);
done();
} catch (error) {
done(error);
}
}, 1);
for(let i = 0; i < 2; i++) {
messageQueue.push("hello world - " + i);
}
I don’t really know if async-queue is the right pattern to use, but I was struggling with trying to achieve imperative style networking logic (eg. stream.write(msg)) within the context of libp2p FP-style streams (e.g. pipe(blah, stream.sink))
Thanks