Writing a simple Javascript async message queue server - Part II
Marton Trencseni - Sat 11 May 2024 - Javascript
Introduction
In the past year, I wrote Python and C++ versions of a simple message queue:
- Writing a simple Python async message queue server - Part I
- Writing a simple Python async message queue server - Part II
- Writing a simple C++ async message queue server - Part I
- Writing a simple C++ async message queue server - Part II
In the previous post, I startes to implement the same message queue logic in Javascript. I will now continue adding features to this Node.js server.
Adding caching
Now, let's add a feature where the server will cache 100 messages per topic, while honoring the delivery
semantics, ie. messages with delivery
of one
will never be sent to more than one client.
The implementation is still simple: if an incoming message has all
delivery, just push it into the topic's cache. If if has one
delivery, only push it if there are no current subscribers. On the other hand, if a new subscriber connects, send them all cached messages for that topic, and then drop any messages that had one
delivery. The code follows the structure of the Python code from above:
const net = require('net');
const topics = {};
const caches = {};
const MAX_CACHE_LENGTH = 100
function handleClient(socket) {
console.log('New client connected...');
function sendCached(socket, topic) {
if (!(topic in caches))
return;
// construct a new array cache
let new_cache = new Array();
caches[topic].forEach(line => {
// write all cached messages
socket.write(line + '\n\r');
// only keep ones with delivery=all
let cmd = JSON.parse(line);
if (cmd.delivery === 'all')
new_cache.push(line);
});
caches[topic] = new_cache;
}
socket.on('data', (data) => {
const line = data.toString().trim();
if (line === '')
return;
if (line != 'quit') {
console.log(`Received: ${line}`);
let cmd = JSON.parse(line);
if (!(cmd.topic in topics))
topics[cmd.topic] = new Set();
if (cmd.command === 'subscribe') {
topics[cmd.topic].add(socket);
sendCached(socket, cmd.topic);
}
else if (cmd.command === 'send') {
var recipients = Array.from(topics[cmd.topic]);
if (recipients.length > 0 && cmd.delivery === 'one')
recipients = recipients.slice(0, 1);
recipients.forEach(recipient => {
recipient.write(line + '\n\r');
});
if (cmd.delivery === 'all' || recipients.length == 0) {
// add to cache
if (!(cmd.topic in caches))
caches[cmd.topic] = new Array();
caches[cmd.topic].push(line);
if (caches[cmd.topic].length > MAX_CACHE_LENGTH)
caches[cmd.topic].shift();
}
}
} else {
socket.end();
}
});
socket.on('end', () => {
console.log('Client disconnected...');
Object.keys(topics).forEach(topic => {
topics[topic].delete(socket);
});
});
}
const server = net.createServer(handleClient);
const port = process.argv[2];
server.listen(port, () => {
console.log(`Listening on localhost:${port}...`);
});
Last seen semantics
Let's add support for clients (re)connecting and starting to read from a topic where they left off, by specifying a last_seen
index in the subscribe message, like:
{'command':'subscribe', 'topic':'foo', 'last_seen': '123'}
To support this, we have to:
- add an index to each message (starting at 0), which increases sequentially
- the index is stored in the command itself, so it now makes more sense to cache the parsed
cmd
insteaf of the flatline
- this index is added to the
cmd
structure and sent out to subscribers - when a new client connects and specifies
last_seen
(default -1), only send out messages which have a bigger index
Make the code cleaner, let's introduce a DefaultDict
class similar to Python's defaultdict
(copied from Stackoverflow):
class DefaultDict {
constructor(defaultInit) {
return new Proxy({}, {
get: (target, name) => name in target ?
target[name] :
(target[name] = typeof defaultInit === 'function' ?
new defaultInit().valueOf() :
defaultInit)
})
}
}
This will allow the main global objects to be defined as:
const topics = new DefaultDict(Set); // default empty set
const caches = new DefaultDict(Array); // default []
const indexs = new DefaultDict(Number); // default 0
Also, let's break out the code for handing subscribe
messages into a handleSubscribe()
function and handling send
messages into handleSend()
function. With this, the code becomes:
const net = require('net');
class DefaultDict {
constructor(defaultInit) {
return new Proxy({}, {
get: (target, name) => name in target ?
target[name] :
(target[name] = typeof defaultInit === 'function' ?
new defaultInit().valueOf() :
defaultInit)
})
}
}
const topics = new DefaultDict(Set);
const caches = new DefaultDict(Array);
const indexs = new DefaultDict(Number);
const MAX_CACHE_LENGTH = 100
function writeCommand(socket, cmd) {
socket.write(JSON.stringify(cmd) + '\n\r');
}
function handleClient(socket) {
console.log('New client connected...');
function sendCached(socket, topic, last_seen) {
if (!(topic in caches))
return;
// construct a new array cache
let new_cache = new Array();
caches[topic].forEach(cmd => {
// write all cached messages the client has not seen yet
if (cmd.index > last_seen)
writeCommand(socket, cmd);
// only keep ones with delivery=all
if (cmd.index <= last_seen || cmd.delivery === 'all')
new_cache.push(cmd);
});
caches[topic] = new_cache;
}
function handleSubscribe(socket, cmd) {
topics[cmd.topic].add(socket);
const last_seen = 'last_seen' in cmd ? cmd.last_seen : -1;
sendCached(socket, cmd.topic, last_seen);
}
function handleSend(cmd) {
cmd.index = indexs[cmd.topic];
indexs[cmd.topic] += 1;
var recipients = Array.from(topics[cmd.topic]);
if (recipients.length > 0 && cmd.delivery === 'one')
recipients = recipients.slice(0, 1);
recipients.forEach(socket => {
writeCommand(socket, cmd);
});
if (cmd.delivery === 'all' || recipients.length == 0) {
// add to cache
caches[cmd.topic].push(cmd);
if (caches[cmd.topic].length > MAX_CACHE_LENGTH)
caches[cmd.topic].shift();
}
}
socket.on('data', (data) => {
const line = data.toString().trim();
if (line === '')
return;
if (line != 'quit') {
console.log(`Received: ${line}`);
try {
let cmd = JSON.parse(line);
if (cmd.command === 'subscribe')
handleSubscribe(socket, cmd);
else if (cmd.command === 'send')
handleSend(cmd);
} catch (e) {
socket.write('Error processing command\n\r');
}
} else {
socket.end();
}
});
socket.on('end', () => {
console.log('Client disconnected...');
Object.keys(topics).forEach(topic => {
topics[topic].delete(socket);
});
});
}
const server = net.createServer(handleClient);
const port = process.argv[2];
server.listen(port, () => {
console.log(`Listening on localhost:${port}...`);
});
Conclusion
In a follow-up post, I will reflect on the differences of the Python, C++ and Javascript implementation.