Writing a simple Javascript async message queue server - Part I

Marton Trencseni - Sun 05 May 2024 - Javascript

Introduction

Previously, I wrote Python and C++ versions of a simple message queue:

I will now write the Javascript equivalent using Node. As before, I will again use async / event-driven programming and avoid multi-threading. Async event-driven is the de facto standard way to write Node servers, so this exercise is straightforward.

Simple async echo server

Let's start with writing the simplest possible skeleton server, which listens on a port, accepts TCP connections, reads incoming bytes, and echoes back whatever was sent:

const net = require('net');

function handleClient(socket) {
    console.log('New client connected...');
    socket.on('data', (data) => {
        const line = data.toString().trim();
        if (line === '')
            return;
        if (line != 'quit') {
            console.log(`Received: ${line}`);
            socket.write(data);
        } else {
            socket.end();
        }
    });

    socket.on('end', () => {
        console.log('Client disconnected...');
    });
}

const server = net.createServer(handleClient);

const port = process.argv[2];
server.listen(port, () => {
    console.log(`Listening on localhost:${port}...`);
});

You can test it by running it in a terminal, like:

$ node async_echo.js 7777

Then, in another terminal, connect to it:

$ telnet
> open localhost 7777
foo
foo
bar
bar

We create a server object with net.createServer() and pass in the handleClient() event handler. This is called when a client connects, and the appropriate socket passed. handleClient() then decorates the socket using the socket::on() function, and sets the main data handler that gets called when the client sends data. In the main portion, we use server.listen() to start the main event loop. Note that this is a minimalistic example with no error handling.

Simple async message queue in Javascript

The above skeleton can be extended to a simple uni-directional message queue. It accepts two kinds of structured messages, subscribe and send.

{"command":"subscribe", "topic":"foo"}
{"command":"send", "topic":"foo", "msg":"blah"}

The semantics are simple: after subscribe, that client receives messages sent to that topic by other clients, sent after the client connected. In this simple implementation, all subscribers get the message.

The implementation is barely longer than the echo server, since the problem is very similar: we just have to maintain a list of sockets for each topic, and when we receive a message for a topic, we go through those writers and send the message.

const net = require('net');

const topics = {};

function handleClient(socket) {
    console.log('New client connected...');

    socket.on('data', (data) => {
        const line = data.toString().trim();
        if (line === '')
            return;

        if (line != 'quit') {
            console.log(`Received: ${line}`);
            let cmd = JSON.parse(line);
            if (cmd.command === 'subscribe') {
                if (!topics[cmd.topic])
                    topics[cmd.topic] = new Set();
                topics[cmd.topic].add(socket);
            }
            else if (cmd.command === 'send') {
                const subscribers = topics[cmd.topic];
                if (subscribers) {
                    subscribers.forEach(subscriber => {
                        subscriber.write(data);
                    });
                }
            }
        } else {
            socket.end();
        }
    });

    socket.on('end', () => {
        console.log('Client disconnected...');
        Object.keys(topics).forEach(topic => {
            topics[topic].delete(socket);
        });
    });
}

const server = net.createServer(handleClient);

const port = process.argv[2];
server.listen(port, () => {
    console.log(`Listening on localhost:${port}...`);
});

A big caveat is that this implementation assumes that messages are passed on a single line. You can test it by running it in a terminal, like:

$ node async_unidir.js 7777

Then, in a second terminal, connect to it:

$ telnet
> open localhost 7777
{"command":"subscribe", "topic":"foo"}

Then, in a third terminal:

$ telnet
> open localhost 7777
{"command":"send", "topic":"foo", "msg":"blah"}

The message will appear in the second terminal.

Note that in traditional multi-threaded programming, the shared objects would have to be protected by locks to avoid corrupting the data structure. With Node, this is not an issue, there is no way to corrupt the internal state of the objects, since the program is actually single-threaded.

Conclusion

In the next article, I will add some features to this simple skeleton.