Writing a simple Python async message queue server - Part III

Marton Trencseni - Wed 22 May 2024 - Python

Introduction

In previous posts, I wrote a simple async message queue server in Python, C++ and Javascript:

In this final post on the toy Python async message queue server implementation, I make further feature and code improvements on this small program.

The code is up on Github.

Improvements

I want to make all 3 versions (Python, C++ and Javascript) wire-compatible, so I decided to standardize on JSON messages. The previous Python versions used serialized Python objects, which are very similar to JSON, but not exactly the same. Overall, the changes I wanted to make in this final version:

  • JSON wire protocol
  • support the following commands:
    • subscribe
      • last_seen (optional, int, default 0): if supplied, only send messages not yet seen
      • cache (optional, bool, default true): if supplied and true, don't send caches messages on subscribe
    • unsubscribe
    • send
      • delivery: all or one
      • cache (optional, bool, default true): if supplied and false, do not cache this messages, only send to currently connected subscribers, overriding delivery semantics
  • server always sends JSON responses, including error messages
  • add reasonable error handling to the code, but avoid cascading exceptions where possible
  • add Python type hints
  • refactor code for small wins

Command validation

The most interesting part of this refactor was command validation. I broke this into two parts:

  1. did the client send valid JSON?
  2. is the JSON a valid message to this server?

The first part is easy by catching json.JSONDecodeError from the json.loads() call. The second part is tricky, as there is no standard way to validate JSON. I came up with a mini-DSL for it, best explained by showing an actual template from the code:

template_send = {
    "command": {"type": str, "required": True},
    "topic": {"type": str, "required": True},
    "msg": {"type": str, "required": True},
    "delivery": {"type": str, "required": True, "values": ["all", "one"]},
    "cache": {"type": bool, "required": False},
}

Validation is simple and takes just ~10 lines of code:

def template_match(template: Dict[str, Dict[str, Any]], obj: Dict[str, Any]) -> bool:
    if not set(obj.keys()).issubset(template.keys()):
        return False
    for key, props in template.items():
        if props["required"] and key not in obj:
            return False
        if key in obj and not isinstance(obj[key], props["type"]):
            return False
        if "values" in props and obj[key] not in props["values"]:
            return False
    return True

With these definitions for the 3 commands the server accepts, valid JSON commands sent by clients can be validated like:

def verify_command(cmd: Dict[str, Any]) -> bool:
    templates: Dict[str, Dict[str, Dict[str, Any]]] = {
        "subscribe": template_subscribe,
        "unsubscribe": template_unsubscribe,
        "send": template_send,
    }
    if "command" not in cmd or not isinstance(cmd["command"], str):
        return False
    template: Optional[Dict[str, Dict[str, Any]]] = templates.get(cmd["command"])
    if template is None or not template_match(template, cmd):
        return False
    return True

The main message processing event loop is then:

while line.strip() != "quit":
    line = (await reader.readline()).decode("utf8")
    if line.strip() == "":
        continue
    logging.info(f"Received: {line.strip()}")
    try:
        cmd = json.loads(line)
    except json.JSONDecodeError:
        send_failure(writer, "Could not parse json")
    else:
        try:
            if verify_command(cmd):
                handle_command(cmd, writer)
            else:
                send_failure(writer, "Malformed json message")
        except Exception as e:
            logging.info(traceback.format_exc())
            send_failure(writer, "Internal exception")

Exceptions are used in this main loop, but the verify_command() and handle_command() functions containing the actual application logic are free of try blocks, which I find desirable and pleasing. Also, thanks to the separation of verification and handling, handle_command() is free of conditionals related to validity checks!

Type checks

Adding type hints (and then checking with mypy) was trivial because I used ChatGPT-4o: I gave it the original code, and told it to add type hints. The result was almost perfect, I only needed to do around 5-10 minutes of fixes to get to the final state. However, the code with types is significantly less readable in my opinion — in my opinion, part of the reason Python is so readable is that the code is not littered with type (and variable) declarations. This, coupled with the fact that at least for this small snippet of code, an LLM is able to infer my intent and add the type hints, led me think that maybe types shouldn't be a primary part of the language. Maybe it should be something that is hidden by default in IDEs, inferred by LLMs, and only "escalated" to the human programmer when the LLM can't get it right, or when the human is explicitly in review mode.

Conclusion

In the next step, I will write some unit tests, which I will be able to reuse for the C++ and Javascript implementations as well.