Writing a simple Javascript async message queue server - Part I
Marton Trencseni - Sun 05 May 2024 - Javascript
Introduction
Previously, I wrote Python and C++ versions of a simple message queue:
- Writing a simple Python async message queue server - Part I
- Writing a simple Python async message queue server - Part II
- Writing a simple C++ async message queue server - Part I
- Writing a simple C++ async message queue server - Part II
I will now write the Javascript equivalent using Node. As before, I will again use async / event-driven programming and avoid multi-threading. Async event-driven is the de facto standard way to write Node servers, so this exercise is straightforward.
Simple async echo server
Let's start with writing the simplest possible skeleton server, which listens on a port, accepts TCP connections, reads incoming bytes, and echoes back whatever was sent:
const net = require('net');
function handleClient(socket) {
console.log('New client connected...');
socket.on('data', (data) => {
const line = data.toString().trim();
if (line === '')
return;
if (line != 'quit') {
console.log(`Received: ${line}`);
socket.write(data);
} else {
socket.end();
}
});
socket.on('end', () => {
console.log('Client disconnected...');
});
}
const server = net.createServer(handleClient);
const port = process.argv[2];
server.listen(port, () => {
console.log(`Listening on localhost:${port}...`);
});
You can test it by running it in a terminal, like:
$ node async_echo.js 7777
Then, in another terminal, connect to it:
$ telnet
> open localhost 7777
foo
foo
bar
bar
We create a server
object with net.createServer()
and pass in the handleClient()
event handler. This is called when a client connects, and the appropriate socket
passed. handleClient()
then decorates the socket
using the socket::on()
function, and sets the main data handler that gets called when the client sends data. In the main portion, we use server.listen()
to start the main event loop. Note that this is a minimalistic example with no error handling.
Simple async message queue in Javascript
The above skeleton can be extended to a simple uni-directional message queue. It accepts two kinds of structured messages, subscribe
and send
.
{"command":"subscribe", "topic":"foo"}
{"command":"send", "topic":"foo", "msg":"blah"}
The semantics are simple: after subscribe
, that client receives messages sent to that topic by other clients, sent after the client connected. In this simple implementation, all subscribers get the message.
The implementation is barely longer than the echo server, since the problem is very similar: we just have to maintain a list of socket
s for each topic, and when we receive a message for a topic, we go through those writers and send the message.
const net = require('net');
const topics = {};
function handleClient(socket) {
console.log('New client connected...');
socket.on('data', (data) => {
const line = data.toString().trim();
if (line === '')
return;
if (line != 'quit') {
console.log(`Received: ${line}`);
let cmd = JSON.parse(line);
if (cmd.command === 'subscribe') {
if (!topics[cmd.topic])
topics[cmd.topic] = new Set();
topics[cmd.topic].add(socket);
}
else if (cmd.command === 'send') {
const subscribers = topics[cmd.topic];
if (subscribers) {
subscribers.forEach(subscriber => {
subscriber.write(data);
});
}
}
} else {
socket.end();
}
});
socket.on('end', () => {
console.log('Client disconnected...');
Object.keys(topics).forEach(topic => {
topics[topic].delete(socket);
});
});
}
const server = net.createServer(handleClient);
const port = process.argv[2];
server.listen(port, () => {
console.log(`Listening on localhost:${port}...`);
});
A big caveat is that this implementation assumes that messages are passed on a single line. You can test it by running it in a terminal, like:
$ node async_unidir.js 7777
Then, in a second terminal, connect to it:
$ telnet
> open localhost 7777
{"command":"subscribe", "topic":"foo"}
Then, in a third terminal:
$ telnet
> open localhost 7777
{"command":"send", "topic":"foo", "msg":"blah"}
The message will appear in the second terminal.
Note that in traditional multi-threaded programming, the shared objects would have to be protected by locks to avoid corrupting the data structure. With Node, this is not an issue, there is no way to corrupt the internal state of the objects, since the program is actually single-threaded.
Conclusion
In the next article, I will add some features to this simple skeleton.