Writing a simple Python async message queue server - Part I

Marton Trencseni - Mon 27 February 2023 - Python

Introduction

In 2023, I want to get back into playful systems programming. My plan is to write a simple message queue (MQ), and for my own education write the same toy implementation in modern Python, modern C++ and Rust. To make things interesting, I will use async / event-driven programming and avoid multi-threading. I have lots of experience in the first two, but this will be my first time writing Rust.

In this first article, I will explore how to write a simple Python MQ server using the Python asyncio library. asyncio allows programmers to write code as if it would be multi-threaded, but the run-time is actually using co-routines and switching between them at special await points. Also, the library has special data structures that allow synchronization between co-routines witout having to use synchronization primitives such as locks.

Simple async echo server

Let's start with writing the simplest possible skeleton server, which listens on a port, accepts TCP connections, reads incoming bytes, and echoes back whatever was sent:

import sys, asyncio

async def handle_client(reader, writer):
    print('New client connected...')
    line = str()
    while line.strip() != 'quit':
        line = (await reader.readline()).decode('utf8')
        if line.strip() == '': continue
        print(f'Received: {line.strip()}')
        writer.write(line.encode('utf8'))
    writer.close()
    print('Client disconnected...')

async def run_server(host, port):
    server = await asyncio.start_server(handle_client, host, port)
    print(f'Listening on {host}:{port}...')
    async with server:
        await server.serve_forever()

asyncio.run(run_server(host='localhost', port=int(sys.argv[1])))

You can test it by running it in a terminal, like:

$ python async_echo.py 7777

Then, in another terminal, connect to it:

$ telnet
> open localhost 7777
foo
foo
bar
bar

asyncio.run() is the top-level entry-point in an async Python program, it's like main(). We pass in our own run_server() function, which kicks of the main event loop. This then uses asyncio.start_server(), which sets up a TCP server listening on the specified port, and will fire off a new instance of handle_client() on every new incoming connection. We can pretend each handle_client() is a new thread, but actually the run-time is single-threaded, and uses co-routines. server.serve_forever() continues the main event loop, this is where the handle_client() calls will come from.

Interacting with the client connection is simple. handle_client() receives a reader and a writer, and in these simple examples we use reader.readline() and writer.write() to receive and send data.

What about the funny async and await keywords? Essentially what happens in this example is that multiple copies of handle_client() will be running, but not at the same time. The Python runtime will potentially switch between these co-routines whenever one of the co-routines gets to an await statement. As an example, assume a client connects, and the handle_client() starts running. It will get to the line = (await reader.readline()).decode('utf8') line, which has await. At this point, this co-routine is waiting for input, but it may take any time for the data to arrive. Meanwhile, another client may connect, in this case this waiting co-routine is paused, and the main co-routine run_server() runs again to handle the new connection, and to spawn another handle_client(). Now we have 1 server listening/awaiting and 2 clients reading/awaiting, whichever event happens first, that co-routine will resume next. So, async tells Python that the function is a co-routine (anything with await inside is a co-routine), and await tells Python that the function being called is a (blocking) co-routine.

Simple async message queue in Python

The above skeleton can be extended to a simple uni-directional message queue. It accepts two kinds of structured messages, subscribe and send.

{'command':'subscribe', 'topic':'foo'              }
{'command':'send',      'topic':'foo', 'msg':'blah'}

The semantics are simple: after subscribe, that client receives messages sent to that topic by other clients, sent after the client connected. In this simple implementation, all subscribers get the message.

The implementation is barely longer than the echo server, since the problem is very similar: we just have to maintain a list of writers for each topic, and when we receive a message for a topic, we go through those writers and send the message.

import sys, asyncio, ast
from collections import defaultdict

topics = defaultdict(lambda: [])

async def handle_client(reader, writer):
    print('New client connected...')
    line = str()
    while line.strip() != 'quit':
        line = (await reader.readline()).decode('utf8')
        if line.strip() == '': continue
        print(f'Received: {line.strip()}')
        cmd = ast.literal_eval(line)
        if cmd['command'] == 'subscribe':
            topics[cmd['topic']].append(writer)
        elif cmd['command'] == 'send':
            writers = topics[cmd['topic']]
            for w in writers:
                w.write(line.encode('utf8'))
    writer.close()
    print('Client disconnected...')

async def run_server(host, port):
    server = await asyncio.start_server(handle_client, host, port)
    print(f'Listening on {host}:{port}...')
    async with server:
        await server.serve_forever()

asyncio.run(run_server(host='localhost', port=int(sys.argv[1])))

You can test it by running it in a terminal, like:

$ python async_unidir.py 7777

Then, in a second terminal, connect to it:

$ telnet
> open localhost 7777
{'command':'subscribe', 'topic':'foo'}

Then, in a third terminal:

$ telnet
> open localhost 7777
{'command':'send', 'topic':'foo', 'msg':'blah'}

The message will appear in the second terminal!

Note that in traditional multi-threaded programming, the queue objects would have to be protected by locks to avoid corrupting the data structure. With Python async, this is not an issue, there is no way to corrupt the internal state of the queues (or the topics hashmap), since the program is actually single-threaded, and will only switch co-routines at await points.

Conclusion

In the next article, I will add some features to this simple skeleton.