Can ChatGPT write the Python async message queue codes from the previous posts?

Marton Trencseni - Sun 05 March 2023 - Python

Introduction

In the previous articles I wrote a simple async message queue servers in Python:

In the first one, I wrote a very simple message queue using the asyncio library. In the second, I added a few features to it, such as delivery=one|all semantics, caching, message sequencing and last_seen specified by the subscriber. The whole thing remained very simple at less than 100 lines of code.

With the advent of ChatGPT, and it being able to write code, it's interesting to check whether the AI could write these simple codes. Personally I think anything ChatGPT can write is a linear combination of what's already out there on the public Internet, so it's not worth a blog post. Having said that, I do not claim the opposite, just because ChatGPT cannot reproduce something does not mean it's worthy or writing down.

I also tried using OpenAI's Codex, but it produced poorer results than ChatGPT, so I'm not showing that.

.

Note: the cover image is from DALL-E, the prompt was an ai writing python code.

Simple echo server

Prompt:

Write a simple TCP echo server in Python using the asyncio library. It should bind to localhost, but take the listening port as a command-line argument.

ChatGPT's code:

import asyncio
import argparse

async def handle_echo(reader, writer):
    while True:
        data = await reader.read(1024)
        if not data:
            break
        writer.write(data)
        await writer.drain()
    writer.close()

async def main():
    parser = argparse.ArgumentParser(description='TCP echo server')
    parser.add_argument('port', type=int, help='listening port')
    args = parser.parse_args()

    server = await asyncio.start_server(handle_echo, '127.0.0.1', args.port)
    async with server:
        await server.serve_forever()

asyncio.run(main())

This code runs and works! It's great start. It's showing what everybody knows by now, ChatGPT is super-useful for getting started with a topic.

Simple message queue

Prompt:

Write a simple TCP messsage queue server in Python using the asyncio library that accepts the listening port as a command-like argument. The messages are Python dicts followed by newlines. It should accept subscribe messages like {'command':'subscribe', 'topic':'foo'} and send messages like {'command':'send', 'topic':'foo', 'msg':'blah'}. There is no need to cache, just send messages to currentlly connected subscribers.

ChatGPT's code:

import asyncio
import json

class MessageQueueServer:
    def __init__(self):
        self.subscriptions = {}
        self.loop = asyncio.get_event_loop()

    async def handle_client(self, reader, writer):
        while True:
            data = await reader.readline()
            if not data:
                break
            message = json.loads(data.decode().strip())
            if message['command'] == 'subscribe':
                topic = message['topic']
                if topic not in self.subscriptions:
                    self.subscriptions[topic] = []
                self.subscriptions[topic].append(writer)
            elif message['command'] == 'send':
                topic = message['topic']
                msg = message['msg']
                if topic in self.subscriptions:
                    for subscriber in self.subscriptions[topic]:
                        subscriber.write(json.dumps({'msg': msg}).encode() + b'\n')
                        await subscriber.drain()

    async def start_server(self, port):
        server = await asyncio.start_server(self.handle_client, 'localhost', port)
        async with server:
            await server.serve_forever()

if __name__ == '__main__':
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument('port', type=int)
    args = parser.parse_args()
    server = MessageQueueServer()
    server.loop.run_until_complete(server.start_server(args.port))

It kindof works, but it commits some mistakes the way a novice programmer would: it assumes the structures are JSON, and uses json.loads(). This will not parse the messages as I specified it, as json requires double-quotes and not single-quotes. Otherwise, this is a working message queue with asyncio!

Delivery

Prompt:

Now, let's add a 'delivery' field to the send command, it can either be 'one' or 'all'. In the case of 'one', only one connected subscriber of that topic gets the message, in the case of 'all', all connected subscribers of that topic get the message.

ChatGPT's code:

import asyncio
import json

class MessageQueueServer:
    def __init__(self):
        self.subscriptions = {}
        self.loop = asyncio.get_event_loop()

    async def handle_client(self, reader, writer):
        while True:
            data = await reader.readline()
            if not data:
                break
            message = json.loads(data.decode().strip())
            if message['command'] == 'subscribe':
                topic = message['topic']
                if topic not in self.subscriptions:
                    self.subscriptions[topic] = []
                self.subscriptions[topic].append(writer)
            elif message['command'] == 'send':
                topic = message['topic']
                msg = message['msg']
                delivery = message.get('delivery', 'all')
                if topic in self.subscriptions:
                    subscribers = self.subscriptions[topic]
                    if delivery == 'one':
                        if subscribers:
                            subscriber = subscribers.pop(0)
                            subscriber.write(json.dumps({'msg': msg}).encode() + b'\n')
                            await subscriber.drain()
                            subscribers.append(subscriber)
                    else:
                        for subscriber in subscribers:
                            subscriber.write(json.dumps({'msg': msg}).encode() + b'\n')
                            await subscriber.drain()

    async def start_server(self, port):
        server = await asyncio.start_server(self.handle_client, 'localhost', port)
        async with server:
            await server.serve_forever()

if __name__ == '__main__':
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument('port', type=int)
    args = parser.parse_args()
    server = MessageQueueServer()
    server.loop.run_until_complete(server.start_server(args.port))

It works, but it's very ugly. On the one branch, it removes a subscriber from the array and then appends it back for some reason.

Caching

Prompt:

Now, let's add simple in-memory caching to it. Let's cache 100 messages per topic. When a new subscriber connects, we can send them whatever we have cached for that topic. Be careful to maintain the delivery semantics, ie. messages with delivery=one should only be delivered to 1 subscriber.

ChatGPT's code:

import asyncio
import json
from collections import deque

class MessageQueueServer:
    def __init__(self):
        self.subscriptions = {}
        self.caches = {}
        self.loop = asyncio.get_event_loop()

    async def handle_client(self, reader, writer):
        while True:
            data = await reader.readline()
            if not data:
                break
            message = json.loads(data.decode().strip())
            if message['command'] == 'subscribe':
                topic = message['topic']
                if topic not in self.subscriptions:
                    self.subscriptions[topic] = []
                if topic in self.caches:
                    for cached_msg in self.caches[topic]:
                        writer.write(json.dumps(cached_msg).encode() + b'\n')
                        await writer.drain()
                self.subscriptions[topic].append(writer)
            elif message['command'] == 'send':
                topic = message['topic']
                msg = message['msg']
                delivery = message.get('delivery', 'all')
                if topic not in self.caches:
                    self.caches[topic] = deque(maxlen=100)
                self.caches[topic].append({'msg': msg})
                if topic in self.subscriptions:
                    subscribers = self.subscriptions[topic]
                    if delivery == 'one':
                        if subscribers:
                            subscriber = subscribers.pop(0)
                            subscriber.write(json.dumps({'msg': msg}).encode() + b'\n')
                            await subscriber.drain()
                            subscribers.append(subscriber)
                    else:
                        for subscriber in subscribers:
                            subscriber.write(json.dumps({'msg': msg}).encode() + b'\n')
                            await subscriber.drain()

    async def start_server(self, port):
        server = await asyncio.start_server(self.handle_client, 'localhost', port)
        async with server:
            await server.serve_forever()

if __name__ == '__main__':
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument('port', type=int)
    args = parser.parse_args()
    server = MessageQueueServer()
    server.loop.run_until_complete(server.start_server(args.port))

ChatGPT is able to add the caching logic, but is not able to maintain the delivery=one semantics. Once a message is cached, it's sent to all subscribers who connect thereafter.

Prompt:

The way you wrote it, messages with delivery of one will be sent to multiple subscribers who connect after the message was cached. Fix it.

ChatGPT's code:

import asyncio
import json
from collections import deque

class MessageQueueServer:
    def __init__(self):
        self.subscriptions = {}
        self.caches = {}
        self.loop = asyncio.get_event_loop()

    async def handle_client(self, reader, writer):
        while True:
            data = await reader.readline()
            if not data:
                break
            message = json.loads(data.decode().strip())
            if message['command'] == 'subscribe':
                topic = message['topic']
                if topic not in self.subscriptions:
                    self.subscriptions[topic] = []
                if topic in self.caches:
                    for cached_msg in self.caches[topic]:
                        writer.write(json.dumps(cached_msg).encode() + b'\n')
                        await writer.drain()
                self.subscriptions[topic].append(writer)
            elif message['command'] == 'send':
                topic = message['topic']
                msg = message['msg']
                delivery = message.get('delivery', 'all')
                if topic not in self.caches:
                    self.caches[topic] = deque(maxlen=100)
                self.caches[topic].append({'msg': msg, 'delivery': delivery})
                if topic in self.subscriptions:
                    subscribers = self.subscriptions[topic]
                    if delivery == 'one':
                        for i, subscriber in enumerate(subscribers):
                            if i == 0:
                                subscriber.write(json.dumps({'msg': msg}).encode() + b'\n')
                                await subscriber.drain()
                            else:
                                self.caches[topic][-1]['delivery'] = 'all'
                    else:
                        for subscriber in subscribers:
                            subscriber.write(json.dumps({'msg': msg}).encode() + b'\n')
                            await subscriber.drain()

    async def start_server(self, port):
        server = await asyncio.start_server(self.handle_client, 'localhost', port)
        async with server:
            await server.serve_forever()

if __name__ == '__main__':
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument('port', type=int)
    args = parser.parse_args()
    server = MessageQueueServer()
    server.loop.run_until_complete(server.start_server(args.port))

It's not able to produce correct code, this problem statement is too complex for ChatGPT!

Conclusion

In this scenario, ChatGPT is like a very smart search engine. Before ChatGPT, to get a skeleton version of an asyncio server, you had to look at the documentation and/or Stackoverflow. Now, you still have to do that, but you can get a working skeleton from ChatGPT. As these examples show, ChatGPT won't get you much farther, but this is already impressive. And who knows how much better newer models will do that have 10x or 100x as many parameters and are trained on more code. Having said that, for the time being I suspect we programmers still don't have to fear for our jobs, and we still have to read the documentation.