Writing a simple Python async message queue server - Part II
Marton Trencseni - Thu 02 March 2023 - Python
Introduction
In the previous article, I wrote a simple async message queue server in Python. Let's add features to it!
Delivery
Let's add a feature so the sender can specify whether messages get delivered to all
subscribers or just one
. So we extend the message with a delivery
field, which can be all
or one
:
{'command':'send', 'topic':'foo', 'msg':'blah', delivery:'all'}
{'command':'send', 'topic':'foo', 'msg':'blah', delivery:'one'}
The implementation for this is simple. We just have to select a random writer in topics
to get the message in case of one
, and send it to all writers in the case of all
, while watching for the empty list case:
import sys, asyncio, ast, random
from collections import defaultdict
topics = defaultdict(lambda: [])
async def handle_client(reader, writer):
print('New client connected...')
line = str()
while line.strip() != 'quit':
line = (await reader.readline()).decode('utf8')
if line.strip() == '': continue
print(f'Received: {line.strip()}')
cmd = ast.literal_eval(line)
if cmd['command'] == 'subscribe':
topics[cmd['topic']].append(writer)
elif cmd['command'] == 'send':
if cmd['delivery'] == 'all':
writers = topics[cmd['topic']]
else: # if delivery == 'one':
if len(topics[cmd['topic']]) == 0:
writers = []
else:
which = random.randint(0, len(topics[cmd['topic']])-1)
writers = [topics[cmd['topic']][which]]
for w in writers:
w.write(line.encode('utf8'))
writer.close()
print('Client disconnected...')
async def run_server(host, port):
server = await asyncio.start_server(handle_client, host, port)
print(f'Listening on {host}:{port}...')
async with server:
await server.serve_forever()
asyncio.run(run_server(host='localhost', port=int(sys.argv[1])))
Caching
Now, let's add a feature where the server will cache 100 messages per topic, while honoring the delivery
semantics, ie. messages with delivery
of one
will never be sent to more than one client.
The implementation is still simple: if an incoming message has all
delivery, just push it into the topic's cache. If if has one
delivery, only push it if there are no current subscribers. On the other hand, if a new subscriber connects, send them all cached messages for that topic, and then drop any messages that had one
delivery:
import sys, asyncio, ast, random
from collections import defaultdict, deque
CACHE_LENGTH = 100
topics = defaultdict(lambda: [])
caches = defaultdict(lambda: deque(maxlen=CACHE_LENGTH))
def send_cached(writer, topic):
for line in caches[topic]:
writer.write(line.encode('utf8'))
# construct new cache, removing elements which had delivery of one
# since we just delivered those
new_cache = deque(maxlen=CACHE_LENGTH)
for line in caches[topic]:
cmd = ast.literal_eval(line)
if cmd['delivery'] == 'all':
new_cache.append(line)
caches[topic] = new_cache
async def handle_client(reader, writer):
print('New client connected...')
line = str()
while line.strip() != 'quit':
line = (await reader.readline()).decode('utf8')
if line.strip() == '': continue
print(f'Received: {line.strip()}')
cmd = ast.literal_eval(line)
if cmd['command'] == 'subscribe':
topics[cmd['topic']].append(writer)
send_cached(writer, cmd['topic']) # if there are any cached messages to send, send it
elif cmd['command'] == 'send':
if cmd['delivery'] == 'all':
writers = topics[cmd['topic']]
caches[cmd['topic']].append(line) # cache it
else: # if delivery == 'one':
if len(topics[cmd['topic']]) == 0:
writers = []
caches[cmd['topic']].append(line) # cache it
else:
which = random.randint(0, len(topics[cmd['topic']])-1)
writers = [topics[cmd['topic']][which]]
# no need to cache it
for w in writers:
w.write(line.encode('utf8'))
writer.close()
print('Client disconnected...')
async def run_server(host, port):
server = await asyncio.start_server(handle_client, host, port)
print(f'Listening on {host}:{port}...')
async with server:
await server.serve_forever()
asyncio.run(run_server(host='localhost', port=int(sys.argv[1])))
Last seen semantics
Let's add support for clients (re)connecting and starting to read from a topic where they left off, by specifying a last_seen
index in the subscribe message, like:
{"command":"subscribe", "topic":"foo", "last_seen": 123}
To support this, we have to:
- add an index to each message (starting at 0), which increases sequentially
- the index is stored in the command itself, so it now makes more sense to cache the parsed
cmd
insteaf of the flatline
- this index is added to the
cmd
structure and sent out to subscribers - when a new client connects and specifies
last_seen
(default -1), only send out messages which have a bigger index
import sys, asyncio, ast, random
from collections import defaultdict, deque
CACHE_LENGTH = 100
topics = defaultdict(lambda: [])
caches = defaultdict(lambda: deque(maxlen=CACHE_LENGTH))
indexs = defaultdict(lambda: 0)
def handle_subscribe(cmd, writer):
topics[cmd['topic']].append(writer)
last_seen = int(cmd['last_seen']) if 'last_seen' in cmd else -1
send_cached(writer, cmd['topic'], last_seen) # if there are any cached messages to send, send it
def send_cached(writer, topic, last_seen):
for cmd in caches[topic]:
if cmd['index'] > last_seen:
writer.write((str(cmd)+'\n').encode('utf8'))
new_cache = deque(maxlen=CACHE_LENGTH)
for cmd in caches[topic]:
if cmd['index'] <= last_seen:
new_cache.append(cmd)
else:
if cmd['delivery'] == 'all':
new_cache.append(cmd)
caches[topic] = new_cache
def handle_send(cmd):
cmd['index'] = indexs[cmd['topic']]
indexs[cmd['topic']] += 1
if cmd['delivery'] == 'all':
writers = topics[cmd['topic']]
caches[cmd['topic']].append(cmd) # cache it
else: # if delivery == 'one':
if len(topics[cmd['topic']]) == 0:
writers = []
caches[cmd['topic']].append(cmd) # cache it
else:
which = random.randint(0, len(topics[cmd['topic']])-1)
writers = [topics[cmd['topic']][which]]
# no need to cache it
for w in writers:
w.write((str(cmd)+'\n').encode('utf8'))
async def handle_client(reader, writer):
print('New client connected...')
line = str()
while line.strip() != 'quit':
line = (await reader.readline()).decode('utf8')
if line.strip() == '': continue
print(f'Received: {line.strip()}')
cmd = ast.literal_eval(line)
if cmd['command'] == 'subscribe':
handle_subscribe(cmd, writer)
elif cmd['command'] == 'send':
handle_send(cmd)
writer.close()
print('Client disconnected...')
async def run_server(host, port):
server = await asyncio.start_server(handle_client, host, port)
print(f'Listening on {host}:{port}...')
async with server:
await server.serve_forever()
asyncio.run(run_server(host='localhost', port=int(sys.argv[1])))
Handling client disconnects
Up until this point, the code did not handle client disconnects. So writer
s were added to lists, but never removed, which means if a subscriber disconnects, the next time that topic receives a message, there will be a stale writer in the topic list, and the Python runtime will throw an exception. Handling this easy:
- when a client disconnects, an exception is thrown
- we maintain a reverse lookup dictionary so we know which writer was added to which topic list
I will show the basic logic on the simplest message queue implementation:
import sys, asyncio, ast, random
from collections import defaultdict
topics = defaultdict(lambda: [])
topics_reverse = defaultdict(lambda: [])
async def handle_client(reader, writer):
print('New client connected...')
try:
line = str()
while line.strip() != 'quit':
line = (await reader.readline()).decode('utf8')
print(f'Received: {line.strip()}')
cmd = ast.literal_eval(line)
if cmd['command'] == 'subscribe':
topics[cmd['topic']].append(writer)
topics_reverse[writer].append(cmd['topic'])
elif cmd['command'] == 'send':
if cmd['delivery'] == 'all':
writers = topics[cmd['topic']]
else: # if delivery == 'one':
if len(topics[cmd['topic']]) == 0:
writers = []
else:
which = random.randint(0, len(topics[cmd['topic']])-1)
writers = [topics[cmd['topic']][which]]
for w in writers:
w.write(line.encode('utf8'))
except Exception as e:
print(e)
if writer in topics_reverse:
for topic in topics_reverse[writer]:
topics[topic].remove(writer)
print(f'Removing writer from topic {topic}')
del topics_reverse[writer]
writer.close()
print('Client disconnected...')
async def run_server(host, port):
server = await asyncio.start_server(handle_client, host, port)
print(f'Listening on {host}:{port}...')
async with server:
await server.serve_forever()
asyncio.run(run_server(host='localhost', port=int(sys.argv[1])))
Conclusion
Of course this is not ready for production use, but it's quite surprising how much functionality fits into a 67 line, async message queue implementation which is bi-directional, supports caching and indexing.