Writing a simple C++20 async message queue server - Part III
Marton Trencseni - Fri 20 June 2025 - C++
Introduction
In previous posts, I wrote a simple async message queue server in Python, C++ and Javascript:
- Writing unit tests for the async message queue server
- Writing a simple Python async message queue server - Part I
- Writing a simple Python async message queue server - Part II
- Writing a simple Python async message queue server - Part III
- Writing a simple Javascript async message queue server - Part I
- Writing a simple Javascript async message queue server - Part II
- Writing a simple Javascript async message queue server - Part III
- Writing a simple C++ async message queue server - Part I
- Writing a simple C++ async message queue server - Part II
In this final post on the toy C++ async message queue server implementation, I re-write it to be similar to the Python and Javascript versions, and also make it wire compatible.
The code is up on Github.
Improvements
I want to make all 3 versions (Python, C++ and Javascript) wire compatible, so like in the other versions, I standardized on JSON messages. Overall, the changes I made in this final version:
- flat, single-file C++ code, similar to the Python and Javascript versions
- continue to use
boost::asio
for asynchronous, event-based architecture
- continue to use
- JSON wire protocol
- use
boost::json
for serializing and deserializing JSON
- use
- support the following commands:
subscribe
last_seen
(optional,int
, default0
): if supplied, only send messages not yet seencache
(optional,bool
, defaulttrue
): if supplied andfalse
, don't send caches messages on subscribe
unsubscribe
send
delivery
:all
orone
cache
(optional,bool
, defaulttrue
): if supplied andfalse
, do not cache this messages, only send to currently connected subscribers, overriding delivery semantics
- server always sends JSON responses, including error messages
- add reasonable error handling to the code, but avoid cascading exceptions where possible
- refactor code for small wins
Code structure
Similar to the Python and Javascript versions, there are now global variables storing the message queue state:
constexpr int DEFAULT_PORT = 7000;
constexpr int DEFAULT_CACHE_SIZE = 100;
static int cache_size = DEFAULT_CACHE_SIZE; // set at runtime
// global state
unordered_map<string, set<SessionPtr>> topics; // topic -> subscribers
unordered_map<SessionPtr, set<string>> topics_rev; // session -> subscribed topics
unordered_map<string, deque<js::object>> caches; // topic -> ring buffer
unordered_map<string, int> indexs; // topic -> next index
The main()
function creates a Server
object and starts the async event loop:
int main(int argc, char* argv[])
{
if (argc != 1 && argc != 2 && argc != 3) {
cerr << "Usage: aiomemq <port> <cache_size>\n";
cerr << " <port> - optional, default " << DEFAULT_PORT << '\n';
cerr << " <cache_size> - optional, default " << DEFAULT_CACHE_SIZE << '\n';
return 1;
}
int port = DEFAULT_PORT;
if (argc >= 2) port = stoi(argv[1]);
if (argc == 3) cache_size = stoi(argv[2]);
asio::io_context io;
Server srv(io, static_cast<uint16_t>(port));
cerr << "Listening on 127.0.0.1:" << port << "\n";
io.run();
}
The Server
object creates a Session
object and moves execution there:
class Server
{
asio::io_context& io_;
tcp::acceptor acc_;
public:
Server(asio::io_context& io, uint16_t port)
: io_(io),
acc_(io, tcp::endpoint(asio::ip::make_address("127.0.0.1"), port))
{ do_accept(); }
private:
void do_accept()
{
acc_.async_accept([this](boost::system::error_code err, tcp::socket s) {
if (!err)
make_shared<Session>(std::move(s))->start();
do_accept();
});
}
};
The Session
class has handlers similar to the Python handlers:
class Session : public enable_shared_from_this<Session>
{
// ...
void handle_subscribe(const js::object& cmd)
{
string topic = string(cmd.at("topic").as_string());
SessionPtr self = shared_from_this();
topics[topic].insert(self);
topics_rev[self].insert(topic);
int last_seen = -1;
if (auto it = cmd.find("last_seen"); it != cmd.end())
last_seen = static_cast<int>(it->value().as_int64());
send_success(sock_);
bool want_cache = cmd.contains("cache") ? cmd.at("cache").as_bool() : true;
if (want_cache)
send_cached(sock_, topic, last_seen);
}
// ...
}
Another noteworthy detail is that the templates for verifying commands are remarkably similar to the implementations in the other languages:
const Template template_subscribe {
{"command", {js::kind::string, true , {}}},
{"topic", {js::kind::string, true , {}}},
{"last_seen", {js::kind::int64 , false, {}}},
{"cache", {js::kind::bool_ , false, {}}}
};
const Template template_unsubscribe {
{"command", {js::kind::string, true , {}}},
{"topic", {js::kind::string, true , {}}}
};
const Template template_send {
{"command", {js::kind::string, true , {}}},
{"topic", {js::kind::string, true , {}}},
{"msg", {js::kind::string, true , {}}},
{"delivery", {js::kind::string, true , {js::value("all"), js::value("one")}}},
{"cache", {js::kind::bool_ , false, {}}}
};
The associated template matching code:
bool template_match(const Template& t, const js::object& o)
{
for (auto& [k, _] : o)
if (!t.contains(k))
return false;
for (auto& [k, spec] : t) {
if (spec.required && !o.contains(k))
return false;
if (!o.contains(k))
continue;
const js::value& v = o.at(k);
if (v.kind() != spec.kind)
return false;
if (spec.values.empty())
continue;
if (find(spec.values.begin(), spec.values.end(), v) == spec.values.end())
return false;
}
return true;
}
bool verify_command(const js::object& o)
{
auto it = o.find("command");
if (it == o.end() || it->value().kind() != js::kind::string)
return false;
string cmd = string(it->value().as_string());
if (cmd == "subscribe")
return template_match(template_subscribe, o);
if (cmd == "unsubscribe")
return template_match(template_unsubscribe, o);
if (cmd == "send")
return template_match(template_send, o);
return false;
}
Makefile
Primarily to document the build command I wrote a Makefile
:
BOOST_DIR := boost
BOOST_LIB_DIR := $(BOOST_DIR)/stage/lib
CXX := g++
CXXFLAGS := -w -std=c++20 -I$(BOOST_DIR)
LD_STATIC := -Wl,-Bstatic
LD_DYNAMIC := -Wl,-Bdynamic
LDLIBS := -lboost_json
TARGET := aiomemq
SRC := aiomemq.cpp
# ------------------------------------------------------------
# Sanity checks
# ------------------------------------------------------------
ifeq ($(wildcard $(BOOST_DIR)),)
$(error Boost directory '$(BOOST_DIR)' not found. Clone, symlink or point BOOST_DIR to your local Boost tree.)
endif
# Optional: warn if Boost.JSON archive not present (non-fatal)
ifneq ($(wildcard $(BOOST_LIB_DIR)/libboost_json*.a),)
JSON_LIB_OK := yes
else
$(warning ** libboost_json*.a not found in $(BOOST_LIB_DIR). Make sure you've built/staged Boost with the JSON library.)
endif
# ------------------------------------------------------------
# Build rules
# ------------------------------------------------------------
.PHONY: all clean
all: $(TARGET)
$(TARGET): $(SRC)
$(CXX) $(CXXFLAGS) -L$(BOOST_LIB_DIR) -o $@ $< $(LD_STATIC) $(LDLIBS) $(LD_DYNAMIC)
clean:
$(RM) $(TARGET)
Unit tests
Having written unit tests for the Python and Javascript versions paid off handsomely here. I just added 2 lines for the C++ target, and was able to verify correctness (to the degree that these unit tests do) and wire compatibility!
if target == 'python':
server_process = subprocess.Popen(['python3', '../python/aiomemq.py', str(SERVER_PORT), str(CACHE_SIZE)])
elif target == 'javascript':
server_process = subprocess.Popen(['node', '../javascript/aiomemq.js', str(SERVER_PORT), str(CACHE_SIZE)])
elif target == 'cpp':
server_process = subprocess.Popen(['../cpp/aiomemq', str(SERVER_PORT), str(CACHE_SIZE)])
Only the last 2 lines were added, no other changes were required. The C++ implementation passes all tests, similar to the Python and Javascript versions:
Note that the unit tests themselves are written in Python.
Conclusion
Writing the asynchronous memory queue implementations in 3 different languages has been instructional and fun. I plan to write a final summary article about lessons learned.