Writing a simple C++20 async message queue server - Part I

Marton Trencseni - Sun 02 April 2023 - C++

Introduction

In 2023, I want to get back into playful systems programming. My plan is to write a simple message queue (MQ), and for my own education write the same toy implementation in modern Python, modern C++ and Rust. To make things interesting, I will use async / event-driven programming and avoid multi-threading.

In the previous 3 posts, I have written Python implementations, and played around with ChatGPT-3.5 to see if it can write code of this (not too great) complexity:

.

In this article, I will write code similar to the first article: first I will write a simple TCP echo server, and then move on to a MQ server — but this time in modern C++. Similarly to the Python code, I will use co-routines, and event-driven async programming. With modern C++20, apart from helper functions, the main code looks very similar to the Python version; the implementations match almost line by line.

The code shown in this article is up on Github:

Simple async echo server

Let's start with writing the simplest possible skeleton server, which listens on a port, accepts TCP connections, reads incoming bytes, and echoes back whatever was sent:

#include <iostream>
#include <string>
#include <asio.hpp>
#include <asio/io_context.hpp>

using asio::awaitable;
using asio::co_spawn;
using asio::detached;
using asio::ip::tcp;
using asio::use_awaitable;
using asio::io_context;
using namespace std;

awaitable<void> session(tcp::socket socket)
{
    string data;
    cout << "Client connected: " << socket.remote_endpoint() << '\n';
    while (data != "quit")
    {
        data.clear();
        co_await asio::async_read_until(socket, asio::dynamic_buffer(data), '\n', use_awaitable); // line-by-line reading
        if (data != "")
            co_await asio::async_write(socket, asio::buffer(data), use_awaitable);
    }
    cout << "Client disconnected: " << socket.remote_endpoint() << '\n';
}

awaitable<void> listener(io_context& ctx, unsigned short port)
{
    tcp::acceptor acceptor(ctx, { tcp::v4(), port });
    cout << "Server listening on port " << port << "..." << endl;
    while (true)
    {
        tcp::socket socket = co_await acceptor.async_accept(use_awaitable);
        co_spawn(ctx, session(move(socket)), detached);
    }
}

int main(int argc, char* argv[])
{
    if (argc < 2)
    {
        cerr << "Usage: " << argv[0] << " <port>" << endl;
        return 1;
    }

    io_context ctx;
    string arg = argv[1];
    size_t pos;
    unsigned short port = stoi(arg, &pos);

    asio::signal_set signals(ctx, SIGINT, SIGTERM);
    signals.async_wait([&](auto, auto) { ctx.stop(); });
    auto listen = listener(ctx, port);
    co_spawn(ctx, move(listen), asio::detached);
    ctx.run();
}

The code is straightforward, assuming one has read the previous Python code. It uses the ASIO library, which is built on top of the C++20 language feature of co-routines, and adds asynchronous I/O functionality. It starts the co-routine loop by co_spawning the listener() co-routine, which in turn listens on the given port, and spawns a session() co-routine for each new connected client. For each client, we use async_read_until() to read a new line of text, and then write it back with async_write(). The client can disconnect by typing quit. Note that I skipped proper error handling here (which could be achieved by using try/catch blocks) to make the code more readable.

Note that co-routines are a language feature in C++20; co_await is a language keyword. co_spawn() is not a language feature, it's a function within ASIO that internally constructs awaitable objects and then uses co_await to trigger the co-routines.

To compile this code, you will need to get a ASIO library. You can test it by running it in a terminal, like:

$ ./TCPEchoServer 7777

Then, in another terminal, connect to it:

$ telnet
> open localhost 7777
foo
foo
bar
bar

Simple async message queue in C++20

The above skeleton can be extended to a simple bi-directional message queue. Bi-directional means that clients can both subscribe to topics and receive messages sent to that topic, as well as send messages to arbitrary topics. The server accepts two kinds of structured messages, subscribe and send.

{'command':'subscribe', 'topic':'foo'              }
{'command':'send',      'topic':'foo', 'msg':'blah'}

The semantics are simple: after subscribe, that client receives messages sent to that topic by other clients, sent after the client connected. In this simple implementation, all subscribers get the message.

The implementation is barely longer than the echo server, since the problem is very similar: we just have to maintain a list of tcp:sockets for each topic, and when we receive a message for a topic, we go through those sockets and send the message. Unlike the Python implementation, which came in just a bit longer than the echo server, the C++20 version is a fair amount longer, because we need to write some helper functions to keep the main code clean. For example, in Python parsing a dictionary like the ones we use was "free" with ast.literal_eval() which is a built-in, in C++ we need to write this function ourselves.

Let's see the boilerplate and the helper functions first:

#include <map>
#include <regex>
#include <iostream>
#include <sstream>
#include <vector>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <algorithm>
#include <cctype>
#include <asio.hpp>
#include <asio/io_context.hpp>
#include <signal.h>

using asio::awaitable;
using asio::co_spawn;
using asio::detached;
using asio::ip::tcp;
using asio::use_awaitable;
using asio::io_context;
using namespace std;

void trim(string& str)
{
    auto is_space = [](char c) { return std::isspace(static_cast<unsigned char>(c)); };
    str.erase(str.begin(), std::find_if(str.begin(), str.end(), std::not_fn(is_space)));
    str.erase(std::find_if(str.rbegin(), str.rend(), std::not_fn(is_space)).base(), str.end());
}

bool startswith(string_view s, string_view prefix)
{
    return s.find(prefix, 0) == 0;
}

vector<string> split(const string& s, char delim) {
    vector<string> result;
    stringstream ss(s);
    string item;

    while (getline(ss, item, delim))
        result.push_back(item);

    return result;
}

typedef map<string, string> Dict;
Dict parse_dict(const string& dict_str) {
    Dict result;
    regex dict_pattern(R"(\s*'([^']+)'\s*:\s*'([^']+)'\s*)");
    smatch matches;

    auto search_start = dict_str.begin();
    while (regex_search(search_start, dict_str.end(), matches, dict_pattern)) {
        result[matches[1].str()] = matches[2].str();
        search_start = matches[0].second;
    }

    return result;
}

We implement four helper functions, which will help us keep the main code clean, and similar to the Python code. void trim(string& str) removes whitespace from both ends of a string. bool startswith(string_view s, string_view prefix) returns true if s starts with prefix. vector<string> split(const string s, char delim) splits s into a vector of strings on delim. Dict parse_dict(const string& dict_str) returns a map of strings to string based on what's in dict_str, assuming it's flat and well-formed, Dict is typedef'd to map<string, string>.

Now the main code:

awaitable<void> session(tcp::socket socket)
{
    try
    {
        string line;
        static unordered_map<string, unordered_set< tcp::socket* >> subscribers;
        unordered_set<string> subscriptions;
        Dict mesg;
        cout << "Client connected: " << socket.remote_endpoint() << '\n';
        while (line != "quit")
        {
            line.clear();
            co_await asio::async_read_until(socket, asio::dynamic_buffer(line), '\n', use_awaitable); // line-by-line reading
            mesg = parse_dict(line);
            if (mesg["command"] == "subscribe")
            {
                subscribers[mesg["topic"]].insert(&socket);
                subscriptions.insert(mesg["topic"]);
            }
            else if (mesg["command"] == "send")
            {
                for (auto& subscriber : subscribers[mesg["topic"]])
                    co_await asio::async_write(*subscriber, asio::buffer(line), use_awaitable);
            }
        }
        for (const string& topic : subscriptions)
            subscribers[topic].erase(&socket);
        cout << "Client disconnected: " << socket.remote_endpoint() << '\n';
    }
    catch (const std::exception& e)
    {
        cout << "Exception in session: " << e.what() << '\n';
    }
}

awaitable<void> listener(io_context& ctx, unsigned short port)
{
    tcp::acceptor acceptor(ctx, { tcp::v4(), port });
    cout << "Server listening on port " << port << "..." << endl;
    while (true)
    {
        tcp::socket socket = co_await acceptor.async_accept(use_awaitable);
        co_spawn(ctx, session(move(socket)), detached);
    }
}

int main(int argc, char* argv[])
{
    if (argc < 2)
    {
        cerr << "Usage: " << argv[0] << " <port>" << endl;
        return 1;
    }

    io_context ctx;
    string arg = argv[1];
    size_t pos;
    unsigned short port = stoi(arg, &pos);

    asio::signal_set signals(ctx, SIGINT, SIGTERM);
    signals.async_wait([&](auto, auto) { ctx.stop(); });
    auto listen = listener(ctx, port);
    co_spawn(ctx, move(listen), asio::detached);
    ctx.run();
}

The only thing that changed compared to the echo server is the session() function, and the logic is identical to the Python version. First, the dictionary is parsed. If the client is subscribing, a pointer to the client's tcp::socket object is inserted into subscribers. If the client is sending a message, the appropraite tcp::socket pointers in subscribers are iterated and we write back the original message with async_write(). As before, I skipped proper error handling to keep the code more readable.

You can test it by running it in a terminal, like:

$ ./SimpleAsyncMessageQueue 7777

Then, in a second terminal, connect to it:

$ telnet
> open localhost 7777
{'command':'subscribe', 'topic':'foo'}

Then, in a third terminal:

$ telnet
> open localhost 7777
{'command':'send', 'topic':'foo', 'msg':'blah'}

The message will appear in the second terminal!

ChatGPT

Many years ago I was a full-time / professional C++ programmer. In the meantime, a lot has happened in the language, and I've become rusty, so I used a combination of Stackoverflow and ChatGPT-4 get back up to speed. I had ChatGPT generate an initial version of the TCP server using co-routines and ASIO. This was 95% right, but it didn't compile, so I fixed it by hand. From then on, I wrote the MQ implementation without AI help, but I wrote a version that accepted plain text commands (instead of the dictionary format, since I didn't want to write the parsing code). Then I had ChatGPT write the parse_dict() function, it produced this correct version. Then I asked it to review my complete code, and it came back with 7 suggestions, out of which 5-6 were good suggestions which I implemented. Mostly it suggested more idiomatic versions of the helper functions like trim() and startswith() that use STL features to make it shorter and faster. Overall ChatGPT-4 was a big help and I recommend experts (who can validate the responses) to use it.

Conclusion

In the next article, I will add some features to this simple skeleton.