Writing a simple Python async message queue server - Part II

Marton Trencseni - Thu 02 March 2023 - Python

Introduction

In the previous article, I wrote a simple async message queue server in Python. Let's add features to it!

Delivery

Let's add a feature so the sender can specify whether messages get delivered to all subscribers or just one. So we extend the message with a delivery field, which can be all or one:

{'command':'send', 'topic':'foo', 'msg':'blah', delivery:'all'}
{'command':'send', 'topic':'foo', 'msg':'blah', delivery:'one'}

The implementation for this is simple. We just have to select a random writer in topics to get the message in case of one, and send it to all writers in the case of all, while watching for the empty list case:

import sys, asyncio, ast, random
from collections import defaultdict

topics = defaultdict(lambda: [])

async def handle_client(reader, writer):
    print('New client connected...')
    line = str()
    while line.strip() != 'quit':
        line = (await reader.readline()).decode('utf8')
        if line.strip() == '': continue
        print(f'Received: {line.strip()}')
        cmd = ast.literal_eval(line)
        if cmd['command'] == 'subscribe':
            topics[cmd['topic']].append(writer)
        elif cmd['command'] == 'send':
            if cmd['delivery'] == 'all':
                writers = topics[cmd['topic']]
            else: # if delivery == 'one':
                if len(topics[cmd['topic']]) == 0:
                    writers = []
                else:
                    which = random.randint(0, len(topics[cmd['topic']])-1)
                    writers = [topics[cmd['topic']][which]]
            for w in writers:
                w.write(line.encode('utf8'))
    writer.close()
    print('Client disconnected...')

async def run_server(host, port):
    server = await asyncio.start_server(handle_client, host, port)
    print(f'Listening on {host}:{port}...')
    async with server:
        await server.serve_forever()

asyncio.run(run_server(host='localhost', port=int(sys.argv[1])))

Caching

Now, let's add a feature where the server will cache 100 messages per topic, while honoring the delivery semantics, ie. messages with delivery of one will never be sent to more than one client.

The implementation is still simple: if an incoming message has all delivery, just push it into the topic's cache. If if has one delivery, only push it if there are no current subscribers. On the other hand, if a new subscriber connects, send them all cached messages for that topic, and then drop any messages that had one delivery:

import sys, asyncio, ast, random
from collections import defaultdict, deque

CACHE_LENGTH = 100

topics = defaultdict(lambda: [])
caches = defaultdict(lambda: deque(maxlen=CACHE_LENGTH))

def send_cached(writer, topic):
    for line in caches[topic]:
        writer.write(line.encode('utf8'))
    # construct new cache, removing elements which had delivery of one
    # since we just delivered those
    new_cache = deque(maxlen=CACHE_LENGTH)
    for line in caches[topic]:
        cmd = ast.literal_eval(line)
        if cmd['delivery'] == 'all':
            new_cache.append(line)
    caches[topic] = new_cache

async def handle_client(reader, writer):
    print('New client connected...')
    line = str()
    while line.strip() != 'quit':
        line = (await reader.readline()).decode('utf8')
        if line.strip() == '': continue
        print(f'Received: {line.strip()}')
        cmd = ast.literal_eval(line)
        if cmd['command'] == 'subscribe':
            topics[cmd['topic']].append(writer)
            send_cached(writer, cmd['topic']) # if there are any cached messages to send, send it
        elif cmd['command'] == 'send':
            if cmd['delivery'] == 'all':
                writers = topics[cmd['topic']]
                caches[cmd['topic']].append(line) # cache it
            else: # if delivery == 'one':
                if len(topics[cmd['topic']]) == 0:
                    writers = []
                    caches[cmd['topic']].append(line) # cache it
                else:
                    which = random.randint(0, len(topics[cmd['topic']])-1)
                    writers = [topics[cmd['topic']][which]]
                    # no need to cache it
            for w in writers:
                w.write(line.encode('utf8'))

    writer.close()
    print('Client disconnected...')

async def run_server(host, port):
    server = await asyncio.start_server(handle_client, host, port)
    print(f'Listening on {host}:{port}...')
    async with server:
        await server.serve_forever()

asyncio.run(run_server(host='localhost', port=int(sys.argv[1])))

Last seen semantics

Let's add support for clients (re)connecting and starting to read from a topic where they left off, by specifying a last_seen index in the subscribe message, like:

{"command":"subscribe", "topic":"foo", "last_seen": 123}

To support this, we have to:

  • add an index to each message (starting at 0), which increases sequentially
  • the index is stored in the command itself, so it now makes more sense to cache the parsed cmd insteaf of the flat line
  • this index is added to the cmd structure and sent out to subscribers
  • when a new client connects and specifies last_seen (default -1), only send out messages which have a bigger index
import sys, asyncio, ast, random
from collections import defaultdict, deque

CACHE_LENGTH = 100

topics = defaultdict(lambda: [])
caches = defaultdict(lambda: deque(maxlen=CACHE_LENGTH))
indexs = defaultdict(lambda: 0)

def handle_subscribe(cmd, writer):
    topics[cmd['topic']].append(writer)
    last_seen = int(cmd['last_seen']) if 'last_seen' in cmd else -1
    send_cached(writer, cmd['topic'], last_seen) # if there are any cached messages to send, send it

def send_cached(writer, topic, last_seen):
    for cmd in caches[topic]:
        if cmd['index'] > last_seen:
            writer.write((str(cmd)+'\n').encode('utf8'))
    new_cache = deque(maxlen=CACHE_LENGTH)
    for cmd in caches[topic]:
        if cmd['index'] <= last_seen:
            new_cache.append(cmd)
        else:
            if cmd['delivery'] == 'all':
                new_cache.append(cmd)
    caches[topic] = new_cache

def handle_send(cmd):
    cmd['index'] = indexs[cmd['topic']]
    indexs[cmd['topic']] += 1
    if cmd['delivery'] == 'all':
        writers = topics[cmd['topic']]
        caches[cmd['topic']].append(cmd) # cache it
    else: # if delivery == 'one':
        if len(topics[cmd['topic']]) == 0:
            writers = []
            caches[cmd['topic']].append(cmd) # cache it
        else:
            which = random.randint(0, len(topics[cmd['topic']])-1)
            writers = [topics[cmd['topic']][which]]
            # no need to cache it
    for w in writers:
        w.write((str(cmd)+'\n').encode('utf8'))

async def handle_client(reader, writer):
    print('New client connected...')
    line = str()
    while line.strip() != 'quit':
        line = (await reader.readline()).decode('utf8')
        if line.strip() == '': continue
        print(f'Received: {line.strip()}')
        cmd = ast.literal_eval(line)
        if cmd['command'] == 'subscribe':
            handle_subscribe(cmd, writer)
        elif cmd['command'] == 'send':
            handle_send(cmd)
    writer.close()
    print('Client disconnected...')

async def run_server(host, port):
    server = await asyncio.start_server(handle_client, host, port)
    print(f'Listening on {host}:{port}...')
    async with server:
        await server.serve_forever()

asyncio.run(run_server(host='localhost', port=int(sys.argv[1])))

Handling client disconnects

Up until this point, the code did not handle client disconnects. So writers were added to lists, but never removed, which means if a subscriber disconnects, the next time that topic receives a message, there will be a stale writer in the topic list, and the Python runtime will throw an exception. Handling this easy:

  • when a client disconnects, an exception is thrown
  • we maintain a reverse lookup dictionary so we know which writer was added to which topic list

I will show the basic logic on the simplest message queue implementation:

import sys, asyncio, ast, random
from collections import defaultdict

topics = defaultdict(lambda: [])
topics_reverse = defaultdict(lambda: [])

async def handle_client(reader, writer):
    print('New client connected...')
    try:
        line = str()
        while line.strip() != 'quit':
            line = (await reader.readline()).decode('utf8')
            print(f'Received: {line.strip()}')
            cmd = ast.literal_eval(line)
            if cmd['command'] == 'subscribe':
                topics[cmd['topic']].append(writer)
                topics_reverse[writer].append(cmd['topic'])
            elif cmd['command'] == 'send':
                if cmd['delivery'] == 'all':
                    writers = topics[cmd['topic']]
                else: # if delivery == 'one':
                    if len(topics[cmd['topic']]) == 0:
                        writers = []
                    else:
                        which = random.randint(0, len(topics[cmd['topic']])-1)
                        writers = [topics[cmd['topic']][which]]
                for w in writers:
                    w.write(line.encode('utf8'))
    except Exception as e:
        print(e)
    if writer in topics_reverse:
        for topic in topics_reverse[writer]:
            topics[topic].remove(writer)
            print(f'Removing writer from topic {topic}')
        del topics_reverse[writer]
    writer.close()
    print('Client disconnected...')

async def run_server(host, port):
    server = await asyncio.start_server(handle_client, host, port)
    print(f'Listening on {host}:{port}...')
    async with server:
        await server.serve_forever()

asyncio.run(run_server(host='localhost', port=int(sys.argv[1])))

Conclusion

Of course this is not ready for production use, but it's quite surprising how much functionality fits into a 67 line, async message queue implementation which is bi-directional, supports caching and indexing.