Writing a simple Javascript async message queue server - Part II

Marton Trencseni - Sat 11 May 2024 - Javascript

Introduction

In the past year, I wrote Python and C++ versions of a simple message queue:

In the previous post, I startes to implement the same message queue logic in Javascript. I will now continue adding features to this Node.js server.

Adding caching

Now, let's add a feature where the server will cache 100 messages per topic, while honoring the delivery semantics, ie. messages with delivery of one will never be sent to more than one client.

The implementation is still simple: if an incoming message has all delivery, just push it into the topic's cache. If if has one delivery, only push it if there are no current subscribers. On the other hand, if a new subscriber connects, send them all cached messages for that topic, and then drop any messages that had one delivery. The code follows the structure of the Python code from above:

const net = require('net');

const topics = {};
const caches = {};
const MAX_CACHE_LENGTH = 100

function handleClient(socket) {
    console.log('New client connected...');

    function sendCached(socket, topic) {
        if (!(topic in caches))
            return;
        // construct a new array cache
        let new_cache = new Array();
        caches[topic].forEach(line => {
            // write all cached messages
            socket.write(line + '\n\r');
            // only keep ones with delivery=all
            let cmd = JSON.parse(line);
            if (cmd.delivery === 'all')
                new_cache.push(line);
        });
        caches[topic] = new_cache;
    }

    socket.on('data', (data) => {
        const line = data.toString().trim();
        if (line === '')
            return;
        if (line != 'quit') {
            console.log(`Received: ${line}`);
            let cmd = JSON.parse(line);
            if (!(cmd.topic in topics))
                topics[cmd.topic] = new Set();
            if (cmd.command === 'subscribe') {
                topics[cmd.topic].add(socket);
                sendCached(socket, cmd.topic);
            }
            else if (cmd.command === 'send') {
                var recipients = Array.from(topics[cmd.topic]);
                if (recipients.length > 0 && cmd.delivery === 'one')
                    recipients = recipients.slice(0, 1);
                recipients.forEach(recipient => {
                    recipient.write(line + '\n\r');
                });
                if (cmd.delivery === 'all' || recipients.length == 0) {
                    // add to cache
                    if (!(cmd.topic in caches))
                        caches[cmd.topic] = new Array();
                    caches[cmd.topic].push(line);
                    if (caches[cmd.topic].length > MAX_CACHE_LENGTH)
                        caches[cmd.topic].shift();
                }
            }
        } else {
            socket.end();
        }
    });

    socket.on('end', () => {
        console.log('Client disconnected...');
        Object.keys(topics).forEach(topic => {
            topics[topic].delete(socket);
        });
    });
}

const server = net.createServer(handleClient);

const port = process.argv[2];
server.listen(port, () => {
    console.log(`Listening on localhost:${port}...`);
});

Last seen semantics

Let's add support for clients (re)connecting and starting to read from a topic where they left off, by specifying a last_seen index in the subscribe message, like:

{'command':'subscribe', 'topic':'foo', 'last_seen': '123'}

To support this, we have to:

  • add an index to each message (starting at 0), which increases sequentially
  • the index is stored in the command itself, so it now makes more sense to cache the parsed cmd insteaf of the flat line
  • this index is added to the cmd structure and sent out to subscribers
  • when a new client connects and specifies last_seen (default -1), only send out messages which have a bigger index

Make the code cleaner, let's introduce a DefaultDict class similar to Python's defaultdict (copied from Stackoverflow):

class DefaultDict {
    constructor(defaultInit) {
        return new Proxy({}, {
            get: (target, name) => name in target ?
                target[name] :
                (target[name] = typeof defaultInit === 'function' ?
                    new defaultInit().valueOf() :
                    defaultInit)
        })
    }
}

This will allow the main global objects to be defined as:

const topics = new DefaultDict(Set);    // default empty set
const caches = new DefaultDict(Array);  // default []
const indexs = new DefaultDict(Number); // default 0

Also, let's break out the code for handing subscribe messages into a handleSubscribe() function and handling send messages into handleSend() function. With this, the code becomes:

const net = require('net');

class DefaultDict {
    constructor(defaultInit) {
        return new Proxy({}, {
            get: (target, name) => name in target ?
                target[name] :
                (target[name] = typeof defaultInit === 'function' ?
                    new defaultInit().valueOf() :
                    defaultInit)
        })
    }
}

const topics = new DefaultDict(Set);
const caches = new DefaultDict(Array);
const indexs = new DefaultDict(Number);
const MAX_CACHE_LENGTH = 100

function writeCommand(socket, cmd) {
    socket.write(JSON.stringify(cmd) + '\n\r');
}

function handleClient(socket) {
    console.log('New client connected...');

    function sendCached(socket, topic, last_seen) {
        if (!(topic in caches))
            return;
        // construct a new array cache
        let new_cache = new Array();
        caches[topic].forEach(cmd => {
            // write all cached messages the client has not seen yet
            if (cmd.index > last_seen)
                writeCommand(socket, cmd);
            // only keep ones with delivery=all
            if (cmd.index <= last_seen || cmd.delivery === 'all')
                new_cache.push(cmd);

        });
        caches[topic] = new_cache;
    }

    function handleSubscribe(socket, cmd) {
        topics[cmd.topic].add(socket);
        const last_seen = 'last_seen' in cmd ? cmd.last_seen : -1;
        sendCached(socket, cmd.topic, last_seen);
    }

    function handleSend(cmd) {
        cmd.index = indexs[cmd.topic];
        indexs[cmd.topic] += 1;
        var recipients = Array.from(topics[cmd.topic]);
        if (recipients.length > 0 && cmd.delivery === 'one')
            recipients = recipients.slice(0, 1);
        recipients.forEach(socket => {
            writeCommand(socket, cmd);
        });
        if (cmd.delivery === 'all' || recipients.length == 0) {
            // add to cache
            caches[cmd.topic].push(cmd);
            if (caches[cmd.topic].length > MAX_CACHE_LENGTH)
                caches[cmd.topic].shift();
        }        
    }

    socket.on('data', (data) => {
        const line = data.toString().trim();
        if (line === '')
            return;
        if (line != 'quit') {
            console.log(`Received: ${line}`);
            try {
                let cmd = JSON.parse(line);
                if (cmd.command === 'subscribe')
                    handleSubscribe(socket, cmd);
                else if (cmd.command === 'send')
                    handleSend(cmd);
            } catch (e) {
                socket.write('Error processing command\n\r');
            }
        } else {
            socket.end();
        }
    });

    socket.on('end', () => {
        console.log('Client disconnected...');
        Object.keys(topics).forEach(topic => {
            topics[topic].delete(socket);
        });
    });
}

const server = net.createServer(handleClient);

const port = process.argv[2];
server.listen(port, () => {
    console.log(`Listening on localhost:${port}...`);
});

Conclusion

In a follow-up post, I will reflect on the differences of the Python, C++ and Javascript implementation.