Writing a simple C++20 async message queue server - Part II

Marton Trencseni - Sat 08 April 2023 - C++


In 2023, I want to get back into playful systems programming. My plan is to write a simple message queue (MQ), and for my own education write the same toy implementation in modern Python, modern C++ and Rust. To make things interesting, I will use async / event-driven programming and avoid multi-threading.

In the first 3 posts, I have written Python implementations, and played around with ChatGPT-3.5 to see if it can write code of this (not too great) complexity:


In the previous post, I wrote the simple version of the message queue in C++:

In this article, similar to Part II in the Python series, I will add features:

  • cache messages
  • specificy delivery semantics (all or one)
  • specify last_seen in subscribe message to pick up where the client left off

Additionally, even short C++ code like this is better factored into multiple files. Unlike Python code, where types can be omitted, in C++ types must be declared. This makes code mixing different levels of abstractions noisy and hard to read. In this case, networking code such as tcp::socket, co-routine and asio:: usage is best separated. To achieve this, the code is separated into 3 files: Utils.hpp for utility functiouns, MessageQueue.hpp has the MQ logic, and the main program in Main.cpp handles networking.

A good (but old) book which also covers the topic of physical design (breaking programs into files) is Large-Scale C++ Software Design by John Lakos, recommended reading.

The code shown in this article is up on Github:


I will omit the function bodies to save space, as it's very similar/identical to the functions in the previous post, with a few additions:

#ifndef __UTILS_HPP__
#define __UTILS_HPP__

#include <map>
using namespace std;

void trim(string& str)

bool startswith(string_view s, string_view prefix)

vector<string> split(const string& s, char delim)

typedef map<string, string> Dict;
Dict parse_dict(const string& dict_str)

string serialize_dict(const Dict& d)


The most interesting about these functions is that they are so simple, ChatGPT-4 can write flawless implementations using modern C++20!


The MessageQueue implementation starts by declaring a Client class to contain the specifics of our TCP/IP networking. The main MessageQueue class will just use this Client, ignoring the specifics of I/O:

class Client
    tcp::socket& socket;

    Client(tcp::socket& s) : socket(s) {}

    void Write(const Dict& msg)
        auto s = serialize_dict(msg) + "\r\n";
        asio::write(socket, asio::buffer(s));

One thing to note here is that unlike in the previous post, here we're using synchronous asio::write(). This is because almost always, even this asio::write() will never block since the operation just hands over the buffer to the OS, which will perform the write at a later time. The bonus is, this way we keep all co_awaits out of the rest of the MessageQueue class, so it doesn't know that this is an asynchronous program with co-routines! Now the main MessageQueue class:

class MessageQueue
    const unsigned short cache_length = 100;
    unordered_map<string, unordered_set<Client*>> subscribers;
    unordered_map<Client*, unordered_set<string>> subscriptions;
    unordered_map<string, deque<Dict>> cache;
    unordered_map<string, long> max_index;

    void OnConnect(Client& client)

    void OnDisconnect(Client& client)
        for (const string& topic : subscriptions[&client])

    void OnMessage(Client& client, string& line)
        Dict msg = parse_dict(line);

        if (msg["command"] == "subscribe")
            OnSubscribe(client, msg);
            OnSend(client, msg);

    void OnSubscribe(Client& client, Dict& msg)
        long last_seen;
        if (msg["last_seen"] == "")
            last_seen = -1;
            last_seen = stoi(msg["last_seen"]);
        CachePlayback(client, msg, last_seen);

    void OnSend(Client& client, Dict& msg)
        if (msg["delivery"] == "")
            msg["delivery"] = "all";
        for (auto& client : subscribers[msg["topic"]])
            if (msg["delivery"] == "one")
        if ((subscribers[msg["topic"]].size() == 0) || (msg["delivery"] == "all"))

    void AddIndex(Dict& msg)
        msg["index"] = to_string(max_index[msg["topic"]]);
        max_index[msg["topic"]] += 1;

    void CachePush(Dict& msg)
        if (cache[msg["topic"]].size() > cache_length)

    void CachePlayback(Client& client, Dict& msg, long last_seen)
        bool reCache = false;
        for (auto& cached_msg : cache[msg["topic"]])
            if (stoi(cached_msg["index"]) > last_seen)
                if (cached_msg["delivery"] == "one")
                    reCache = true;
        if (reCache)
            deque<Dict> newCache;
            for (auto& cached_msg : cache[msg["topic"]])
                if ((stoi(cached_msg["index"]) <= last_seen) || (cached_msg["delivery"] == "all"))
            cache[msg["topic"]] = newCache;

The code is very similar to the previous Python code, except it's nicely factored into functions.


The code in the main file is very similar to the previous version, so I will just show the function which utilizes the new MessageQueue class. In this setup, the only thing the main code knows is that each line is a message (but no further formatting is assumed here), and that it needs to call MessageQueue::OnConnect() (which is actually unused), MessageQueue::OnMessage() and MessageQueue::OnDisconnect(). As previously, the code is not handling all errors to keep things clean and readable.

static MessageQueue mq;

awaitable<void> session(tcp::socket socket)
        Dict msg; 
        string data;
        Client client(socket);

        cout << "Client connected: " << socket.remote_endpoint() << '\n';
        while (data != "quit")
            co_await asio::async_read_until(socket, asio::dynamic_buffer(data), '\n', use_awaitable); // line-by-line reading
            auto lines = split(data, '\n');
            for (auto& line : lines)
                if (line != "") {
                    cout << "Received: " << line << endl;
                    mq.OnMessage(client, line);
        cout << "Client disconnected: " << socket.remote_endpoint() << '\n';
    catch (const std::exception& e)
        cout << "Exception in session: " << e.what() << '\n';

awaitable<void> listener(io_context& ctx, unsigned short port)

int main(int argc, char* argv[])

To compile this code, you will need to get a ASIO library. You can test it by running it in a terminal, like:

$ ./AsyncEchoServer 7777

Then, in a second terminal, connect to it:

$ telnet
> open localhost 7777
{'command':'send', 'topic':'foo', 'msg':'0', 'delivery':'one'}
{'command':'send', 'topic':'foo', 'msg':'1', 'delivery':'all'}
{'command':'send', 'topic':'foo', 'msg':'2', 'delivery':'all'}
{'command':'send', 'topic':'foo', 'msg':'3', 'delivery':'one'}
{'command':'send', 'topic':'foo', 'msg':'4', 'delivery':'one'}
{'command':'send', 'topic':'foo', 'msg':'5', 'delivery':'all'}
{'command':'send', 'topic':'foo', 'msg':'6', 'delivery':'all'}
{'command':'send', 'topic':'foo', 'msg':'7', 'delivery':'one'}
{'command':'send', 'topic':'foo', 'msg':'8', 'delivery':'all'}

Then, in a third terminal:

$ telnet
> open localhost 7777
{'command':'subscribe', 'topic':'foo'}
> open localhost 7777
{'command':'subscribe', 'topic':'foo', 'last_seen': '5'}

The message(s) will appear in the second terminal!


With C++20, the code ended up being significantly more complicated than Python. In terms of LOC, it's about 5x, due to having to include utility funcitons, #includes, namespaces, class declarations, and because the code is factored into files and classes, a must in C++. The actual logic is identical and equally readable. In the third part of this series, I will write the message queue in Rust.