Writing a simple Python async message queue server - Part III
Marton Trencseni - Wed 22 May 2024 - Python
Introduction
In previous posts, I wrote a simple async message queue server in Python, C++ and Javascript:
- Writing a simple Python async message queue server - Part I
- Writing a simple Python async message queue server - Part II
- Writing a simple C++ async message queue server - Part I
- Writing a simple C++ async message queue server - Part II
- Writing a simple Javascript async message queue server - Part I
- Writing a simple Javascript async message queue server - Part II
In this final post on the toy Python async message queue server implementation, I make further feature and code improvements on this small program.
The code is up on Github.
Improvements
I want to make all 3 versions (Python, C++ and Javascript) wire-compatible, so I decided to standardize on JSON messages. The previous Python versions used serialized Python objects, which are very similar to JSON, but not exactly the same. Overall, the changes I wanted to make in this final version:
- JSON wire protocol
- support the following commands:
subscribe
last_seen
(optional,int
, default0
): if supplied, only send messages not yet seencache
(optional,bool
, defaulttrue
): if supplied andtrue
, don't send caches messages on subscribe
unsubscribe
send
delivery
:all
orone
cache
(optional,bool
, defaulttrue
): if supplied andfalse
, do not cache this messages, only send to currently connected subscribers, overriding delivery semantics
- server always sends JSON responses, including error messages
- add reasonable error handling to the code, but avoid cascading exceptions where possible
- add Python type hints
- refactor code for small wins
Command validation
The most interesting part of this refactor was command validation. I broke this into two parts:
- did the client send valid JSON?
- is the JSON a valid message to this server?
The first part is easy by catching json.JSONDecodeError
from the json.loads()
call. The second part is tricky, as there is no standard way to validate JSON. I came up with a mini-DSL for it, best explained by showing an actual template from the code:
template_send = {
"command": {"type": str, "required": True},
"topic": {"type": str, "required": True},
"msg": {"type": str, "required": True},
"delivery": {"type": str, "required": True, "values": ["all", "one"]},
"cache": {"type": bool, "required": False},
}
Validation is simple and takes just ~10 lines of code:
def template_match(template: Dict[str, Dict[str, Any]], obj: Dict[str, Any]) -> bool:
if not set(obj.keys()).issubset(template.keys()):
return False
for key, props in template.items():
if props["required"] and key not in obj:
return False
if key in obj and not isinstance(obj[key], props["type"]):
return False
if "values" in props and obj[key] not in props["values"]:
return False
return True
With these definitions for the 3 commands the server accepts, valid JSON commands sent by clients can be validated like:
def verify_command(cmd: Dict[str, Any]) -> bool:
templates: Dict[str, Dict[str, Dict[str, Any]]] = {
"subscribe": template_subscribe,
"unsubscribe": template_unsubscribe,
"send": template_send,
}
if "command" not in cmd or not isinstance(cmd["command"], str):
return False
template: Optional[Dict[str, Dict[str, Any]]] = templates.get(cmd["command"])
if template is None or not template_match(template, cmd):
return False
return True
The main message processing event loop is then:
while line.strip() != "quit":
line = (await reader.readline()).decode("utf8")
if line.strip() == "":
continue
logging.info(f"Received: {line.strip()}")
try:
cmd = json.loads(line)
except json.JSONDecodeError:
send_failure(writer, "Could not parse json")
else:
try:
if verify_command(cmd):
handle_command(cmd, writer)
else:
send_failure(writer, "Malformed json message")
except Exception as e:
logging.info(traceback.format_exc())
send_failure(writer, "Internal exception")
Exceptions are used in this main loop, but the verify_command()
and handle_command()
functions containing the actual application logic are free of try
blocks, which I find desirable and pleasing. Also, thanks to the separation of verification and handling, handle_command()
is free of conditionals related to validity checks!
Type checks
Adding type hints (and then checking with mypy
) was trivial because I used ChatGPT-4o: I gave it the original code, and told it to add type hints. The result was almost perfect, I only needed to do around 5-10 minutes of fixes to get to the final state. However, the code with types is significantly less readable in my opinion — in my opinion, part of the reason Python is so readable is that the code is not littered with type (and variable) declarations. This, coupled with the fact that at least for this small snippet of code, an LLM is able to infer my intent and add the type hints, led me think that maybe types shouldn't be a primary part of the language. Maybe it should be something that is hidden by default in IDEs, inferred by LLMs, and only "escalated" to the human programmer when the LLM can't get it right, or when the human is explicitly in review mode.
Conclusion
In the next step, I will write some unit tests, which I will be able to reuse for the C++ and Javascript implementations as well.