Writing a simple C++20 async message queue server - Part II

Marton Trencseni - Sat 08 April 2023 - C++

Introduction

In 2023, I want to get back into playful systems programming. My plan is to write a simple message queue (MQ), and for my own education write the same toy implementation in modern Python, modern C++ and Rust. To make things interesting, I will use async / event-driven programming and avoid multi-threading.

In the first 3 posts, I have written Python implementations, and played around with ChatGPT-3.5 to see if it can write code of this (not too great) complexity:

.

In the previous post, I wrote the simple version of the message queue in C++:

In this article, similar to Part II in the Python series, I will add features:

  • cache messages
  • specificy delivery semantics (all or one)
  • specify last_seen in subscribe message to pick up where the client left off

Additionally, even short C++ code like this is better factored into multiple files. Unlike Python code, where types can be omitted, in C++ types must be declared. This makes code mixing different levels of abstractions noisy and hard to read. In this case, networking code such as tcp::socket, co-routine and asio:: usage is best separated. To achieve this, the code is separated into 3 files: Utils.hpp for utility functiouns, MessageQueue.hpp has the MQ logic, and the main program in Main.cpp handles networking.

A good (but old) book which also covers the topic of physical design (breaking programs into files) is Large-Scale C++ Software Design by John Lakos, recommended reading.

The code shown in this article is up on Github:

Utils

I will omit the function bodies to save space, as it's very similar/identical to the functions in the previous post, with a few additions:

#ifndef __UTILS_HPP__
#define __UTILS_HPP__

#include <map>
...
using namespace std;

void trim(string& str)
{
    ...
}

bool startswith(string_view s, string_view prefix)
{
    ...
}

vector<string> split(const string& s, char delim)
{
    ...
}

typedef map<string, string> Dict;
Dict parse_dict(const string& dict_str)
{
    ...
}

string serialize_dict(const Dict& d)
{
    ...
}

#endif

The most interesting about these functions is that they are so simple, ChatGPT-4 can write flawless implementations using modern C++20!

MessageQueue

The MessageQueue implementation starts by declaring a Client class to contain the specifics of our TCP/IP networking. The main MessageQueue class will just use this Client, ignoring the specifics of I/O:

class Client
{
    tcp::socket& socket;

public:
    Client(tcp::socket& s) : socket(s) {}

    void Write(const Dict& msg)
    {
        auto s = serialize_dict(msg) + "\r\n";
        asio::write(socket, asio::buffer(s));
    }
};

One thing to note here is that unlike in the previous post, here we're using synchronous asio::write(). This is because almost always, even this asio::write() will never block since the operation just hands over the buffer to the OS, which will perform the write at a later time. The bonus is, this way we keep all co_awaits out of the rest of the MessageQueue class, so it doesn't know that this is an asynchronous program with co-routines! Now the main MessageQueue class:

class MessageQueue
{
    const unsigned short cache_length = 100;
    unordered_map<string, unordered_set<Client*>> subscribers;
    unordered_map<Client*, unordered_set<string>> subscriptions;
    unordered_map<string, deque<Dict>> cache;
    unordered_map<string, long> max_index;

public:
    void OnConnect(Client& client)
    {
    }

    void OnDisconnect(Client& client)
    {
        for (const string& topic : subscriptions[&client])
            subscribers[topic].erase(&client);
    }

    void OnMessage(Client& client, string& line)
    {
        Dict msg = parse_dict(line);

        if (msg["command"] == "subscribe")
            OnSubscribe(client, msg);
        else
            OnSend(client, msg);
    }

    void OnSubscribe(Client& client, Dict& msg)
    {
        long last_seen;
        subscribers[msg["topic"]].insert(&client);
        subscriptions[&client].insert(msg["topic"]);
        if (msg["last_seen"] == "")
            last_seen = -1;
        else
            last_seen = stoi(msg["last_seen"]);
        CachePlayback(client, msg, last_seen);
    }

    void OnSend(Client& client, Dict& msg)
    {
        AddIndex(msg);
        if (msg["delivery"] == "")
            msg["delivery"] = "all";
        for (auto& client : subscribers[msg["topic"]])
        {
            client->Write(msg);
            if (msg["delivery"] == "one")
                break;
        }
        if ((subscribers[msg["topic"]].size() == 0) || (msg["delivery"] == "all"))
            CachePush(msg);
    }

private:
    void AddIndex(Dict& msg)
    {
        msg["index"] = to_string(max_index[msg["topic"]]);
        max_index[msg["topic"]] += 1;
    }

    void CachePush(Dict& msg)
    {
        cache[msg["topic"]].push_back(msg);
        if (cache[msg["topic"]].size() > cache_length)
            cache[msg["topic"]].pop_front();
    }

    void CachePlayback(Client& client, Dict& msg, long last_seen)
    {
        bool reCache = false;
        for (auto& cached_msg : cache[msg["topic"]])
        {
            if (stoi(cached_msg["index"]) > last_seen)
            {
                client.Write(cached_msg);
                if (cached_msg["delivery"] == "one")
                    reCache = true;
            }
        }
        if (reCache)
        {
            deque<Dict> newCache;
            for (auto& cached_msg : cache[msg["topic"]])
                if ((stoi(cached_msg["index"]) <= last_seen) || (cached_msg["delivery"] == "all"))
                    newCache.push_back(cached_msg);
            cache[msg["topic"]] = newCache;
        }
    }
};

The code is very similar to the previous Python code, except it's nicely factored into functions.

Main

The code in the main file is very similar to the previous version, so I will just show the function which utilizes the new MessageQueue class. In this setup, the only thing the main code knows is that each line is a message (but no further formatting is assumed here), and that it needs to call MessageQueue::OnConnect() (which is actually unused), MessageQueue::OnMessage() and MessageQueue::OnDisconnect(). As previously, the code is not handling all errors to keep things clean and readable.

static MessageQueue mq;

awaitable<void> session(tcp::socket socket)
{
    try
    {
        Dict msg; 
        string data;
        Client client(socket);

        cout << "Client connected: " << socket.remote_endpoint() << '\n';
        mq.OnConnect(client);
        while (data != "quit")
        {
            data.clear();
            co_await asio::async_read_until(socket, asio::dynamic_buffer(data), '\n', use_awaitable); // line-by-line reading
            trim(data);
            auto lines = split(data, '\n');
            for (auto& line : lines)
            {
                trim(line);
                if (line != "") {
                    cout << "Received: " << line << endl;
                    mq.OnMessage(client, line);
                }
            }
        }
        mq.OnDisconnect(client);
        cout << "Client disconnected: " << socket.remote_endpoint() << '\n';
    }
    catch (const std::exception& e)
    {
        cout << "Exception in session: " << e.what() << '\n';
    }
}

awaitable<void> listener(io_context& ctx, unsigned short port)
{
    ...
}

int main(int argc, char* argv[])
{
    ...
}

To compile this code, you will need to get a ASIO library. You can test it by running it in a terminal, like:

$ ./AsyncEchoServer 7777

Then, in a second terminal, connect to it:

$ telnet
> open localhost 7777
{'command':'send', 'topic':'foo', 'msg':'0', 'delivery':'one'}
{'command':'send', 'topic':'foo', 'msg':'1', 'delivery':'all'}
{'command':'send', 'topic':'foo', 'msg':'2', 'delivery':'all'}
{'command':'send', 'topic':'foo', 'msg':'3', 'delivery':'one'}
{'command':'send', 'topic':'foo', 'msg':'4', 'delivery':'one'}
{'command':'send', 'topic':'foo', 'msg':'5', 'delivery':'all'}
{'command':'send', 'topic':'foo', 'msg':'6', 'delivery':'all'}
{'command':'send', 'topic':'foo', 'msg':'7', 'delivery':'one'}
{'command':'send', 'topic':'foo', 'msg':'8', 'delivery':'all'}

Then, in a third terminal:

$ telnet
> open localhost 7777
{'command':'subscribe', 'topic':'foo'}
quit
> open localhost 7777
{'command':'subscribe', 'topic':'foo', 'last_seen': '5'}
quit

The message(s) will appear in the second terminal!

Conclusion

With C++20, the code ended up being significantly more complicated than Python. In terms of LOC, it's about 5x, due to having to include utility funcitons, #includes, namespaces, class declarations, and because the code is factored into files and classes, a must in C++. The actual logic is identical and equally readable. In the third part of this series, I will write the message queue in Rust.