Writing a simple C++20 async message queue server - Part III

Marton Trencseni - Fri 20 June 2025 - C++

Introduction

In previous posts, I wrote a simple async message queue server in Python, C++ and Javascript:

In this final post on the toy C++ async message queue server implementation, I re-write it to be similar to the Python and Javascript versions, and also make it wire compatible.

The code is up on Github.

Improvements

I want to make all 3 versions (Python, C++ and Javascript) wire compatible, so like in the other versions, I standardized on JSON messages. Overall, the changes I made in this final version:

  • flat, single-file C++ code, similar to the Python and Javascript versions
    • continue to use boost::asio for asynchronous, event-based architecture
  • JSON wire protocol
    • use boost::json for serializing and deserializing JSON
  • support the following commands:
    • subscribe
      • last_seen (optional, int, default 0): if supplied, only send messages not yet seen
      • cache (optional, bool, default true): if supplied and false, don't send caches messages on subscribe
    • unsubscribe
    • send
      • delivery: all or one
      • cache (optional, bool, default true): if supplied and false, do not cache this messages, only send to currently connected subscribers, overriding delivery semantics
  • server always sends JSON responses, including error messages
  • add reasonable error handling to the code, but avoid cascading exceptions where possible
  • refactor code for small wins

Code structure

Similar to the Python and Javascript versions, there are now global variables storing the message queue state:

constexpr int DEFAULT_PORT       = 7000;
constexpr int DEFAULT_CACHE_SIZE = 100;
static    int cache_size         = DEFAULT_CACHE_SIZE; // set at runtime

// global state
unordered_map<string, set<SessionPtr>>   topics;     // topic   -> subscribers
unordered_map<SessionPtr, set<string>>   topics_rev; // session -> subscribed topics
unordered_map<string, deque<js::object>> caches;     // topic   -> ring buffer
unordered_map<string, int>               indexs;     // topic   -> next index

The main() function creates a Server object and starts the async event loop:

int main(int argc, char* argv[])
{
    if (argc != 1 && argc != 2 && argc != 3) {
        cerr << "Usage: aiomemq <port> <cache_size>\n";
        cerr << "  <port>       - optional, default " << DEFAULT_PORT << '\n';
        cerr << "  <cache_size> - optional, default " << DEFAULT_CACHE_SIZE << '\n';
        return 1;
    }

    int port = DEFAULT_PORT;
    if (argc >= 2) port = stoi(argv[1]);
    if (argc == 3) cache_size = stoi(argv[2]);

    asio::io_context io;
    Server srv(io, static_cast<uint16_t>(port));

    cerr << "Listening on 127.0.0.1:" << port << "\n";
    io.run();
}

The Server object creates a Session object and moves execution there:

class Server
{
    asio::io_context& io_;
    tcp::acceptor     acc_;

public:
    Server(asio::io_context& io, uint16_t port)
        : io_(io),
          acc_(io, tcp::endpoint(asio::ip::make_address("127.0.0.1"), port))
    {   do_accept(); }

private:
    void do_accept()
    {
        acc_.async_accept([this](boost::system::error_code err, tcp::socket s) {
            if (!err)
                make_shared<Session>(std::move(s))->start();
            do_accept();
        });
    }
};

The Session class has handlers similar to the Python handlers:

class Session : public enable_shared_from_this<Session>
{

// ...

    void handle_subscribe(const js::object& cmd)
    {
        string topic = string(cmd.at("topic").as_string());
        SessionPtr self = shared_from_this();

        topics[topic].insert(self);
        topics_rev[self].insert(topic);

        int last_seen = -1;
        if (auto it = cmd.find("last_seen"); it != cmd.end())
            last_seen = static_cast<int>(it->value().as_int64());

        send_success(sock_);

        bool want_cache = cmd.contains("cache") ? cmd.at("cache").as_bool() : true;
        if (want_cache)
            send_cached(sock_, topic, last_seen);
    }

// ...

}

Another noteworthy detail is that the templates for verifying commands are remarkably similar to the implementations in the other languages:

const Template template_subscribe {
    {"command",   {js::kind::string, true , {}}},
    {"topic",     {js::kind::string, true , {}}},
    {"last_seen", {js::kind::int64 , false, {}}},
    {"cache",     {js::kind::bool_ , false, {}}}
};

const Template template_unsubscribe {
    {"command",   {js::kind::string, true , {}}},
    {"topic",     {js::kind::string, true , {}}}
};

const Template template_send {
    {"command",   {js::kind::string, true , {}}},
    {"topic",     {js::kind::string, true , {}}},
    {"msg",       {js::kind::string, true , {}}},
    {"delivery",  {js::kind::string, true , {js::value("all"), js::value("one")}}},
    {"cache",     {js::kind::bool_ , false, {}}}
};

The associated template matching code:

bool template_match(const Template& t, const js::object& o)
{
    for (auto& [k, _] : o)
        if (!t.contains(k))
            return false;

    for (auto& [k, spec] : t) {
        if (spec.required && !o.contains(k))
            return false;
        if (!o.contains(k))
            continue;

        const js::value& v = o.at(k);
        if (v.kind() != spec.kind)
            return false;

        if (spec.values.empty())
            continue;
        if (find(spec.values.begin(), spec.values.end(), v) == spec.values.end())
            return false;
    }
    return true;
}

bool verify_command(const js::object& o)
{
    auto it = o.find("command");
    if (it == o.end() || it->value().kind() != js::kind::string)
        return false;

    string cmd = string(it->value().as_string());
    if (cmd == "subscribe")
        return template_match(template_subscribe, o);
    if (cmd == "unsubscribe")
        return template_match(template_unsubscribe, o);
    if (cmd == "send")
        return template_match(template_send, o);

    return false;
}

Makefile

Primarily to document the build command I wrote a Makefile:

BOOST_DIR       := boost
BOOST_LIB_DIR   := $(BOOST_DIR)/stage/lib

CXX             := g++
CXXFLAGS        := -w -std=c++20 -I$(BOOST_DIR)
LD_STATIC       := -Wl,-Bstatic
LD_DYNAMIC      := -Wl,-Bdynamic
LDLIBS          := -lboost_json

TARGET          := aiomemq
SRC             := aiomemq.cpp

# ------------------------------------------------------------
# Sanity checks
# ------------------------------------------------------------
ifeq ($(wildcard $(BOOST_DIR)),)
$(error Boost directory '$(BOOST_DIR)' not found. Clone, symlink or point BOOST_DIR to your local Boost tree.)
endif

# Optional: warn if Boost.JSON archive not present (non-fatal)
ifneq ($(wildcard $(BOOST_LIB_DIR)/libboost_json*.a),)
  JSON_LIB_OK := yes
else
  $(warning ** libboost_json*.a not found in $(BOOST_LIB_DIR). Make sure you've built/staged Boost with the JSON library.)
endif

# ------------------------------------------------------------
# Build rules
# ------------------------------------------------------------
.PHONY: all clean
all: $(TARGET)

$(TARGET): $(SRC)
    $(CXX) $(CXXFLAGS) -L$(BOOST_LIB_DIR) -o $@ $< $(LD_STATIC) $(LDLIBS) $(LD_DYNAMIC)

clean:
    $(RM) $(TARGET)

Unit tests

Having written unit tests for the Python and Javascript versions paid off handsomely here. I just added 2 lines for the C++ target, and was able to verify correctness (to the degree that these unit tests do) and wire compatibility!

    if target == 'python':
        server_process = subprocess.Popen(['python3', '../python/aiomemq.py', str(SERVER_PORT), str(CACHE_SIZE)])
    elif target == 'javascript':
        server_process = subprocess.Popen(['node', '../javascript/aiomemq.js', str(SERVER_PORT), str(CACHE_SIZE)])
    elif target == 'cpp':
        server_process = subprocess.Popen(['../cpp/aiomemq', str(SERVER_PORT), str(CACHE_SIZE)])

Only the last 2 lines were added, no other changes were required. The C++ implementation passes all tests, similar to the Python and Javascript versions:

.

Note that the unit tests themselves are written in Python.

Conclusion

Writing the asynchronous memory queue implementations in 3 different languages has been instructional and fun. I plan to write a final summary article about lessons learned.