Writing a simple Rust async message queue server
Marton Trencseni - Fri 20 March 2026 - Programming
Introduction
After writing async message queue servers in Python, Javascript and C++, I wrote one more version in Rust. The main reason was to try Rust, the language coming up as the language for systems programming. I wanted to understand what Rust looks and feels like for something small but non-trivial. An async message queue server is a good vehicle for this:
- it has sockets
- it has concurrency
- it has shared mutable state
- it has parsing and validation
This post walks through the Rust implementation, with special attention to the Rust-specific parts. The full code is on Github.

What the server does
The Rust version follows the same exact protocol as the previous implementations, and passes exactly the same unit tests.
Clients connect over TCP and send JSON commands, one per line:
{"command":"subscribe","topic":"news"}
{"command":"send","topic":"news","msg":"hello","delivery":"all"}
{"command":"unsubscribe","topic":"news"}
The server supports:
subscribeunsubscribesend
Each topic has:
- a set of subscribers
- a small cache of recent messages
- a monotonically increasing message index
Messages can be delivered in two modes:
all: send to all subscribersone: send to one random subscriber
If a message cannot be delivered immediately, it may be cached.
The data structures
Let’s start with the state.
use serde_json::{json, Value};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
struct Topic {
subscribers: HashMap<usize, mpsc::UnboundedSender<Value>>,
cache: VecDeque<Value>,
next_index: i64,
}
impl Topic {
fn new(cache_size: usize) -> Self {
Self {
subscribers: HashMap::new(),
cache: VecDeque::with_capacity(cache_size),
next_index: 0,
}
}
}
struct SharedState {
topics: HashMap<String, Topic>,
cache_size: usize,
next_session_id: usize,
}
type State = Arc<Mutex<SharedState>>;
The datatypes used are:
HashMap<K, V>is what you expect: a dictionary.VecDeque<T>is a double-ended queue. We use it for the cache so we canpush_backnew messages andpop_frontold ones.
The interesting part is this:
type State = Arc<Mutex<SharedState>>;
Arc means atomically reference counted. It is a shared smart pointer. Multiple tasks can own the same object, and the object is automatically freed when the last owner goes away. In Python or Javascript, shared ownership is mostly invisible. In C++, there is shared_ptr. In Rust, shared ownership has to be made explicit. Mutex means only one task can mutate the protected value at a time. Our SharedState contains all topics and subscriptions. Multiple client handler tasks need to access it, so we wrap it in a mutex. So:
Arc<Mutex<SharedState>>
means:
“A globally shared heap object, reference-counted, whose contents are protected by a lock.”
Similarly to C++ templates this results in noisy looking code; Rust does this to force the code to be explicit about what is shared and mutable.
Tokio: async runtime, sockets, and channels
The program uses Tokio, which is the dominant async runtime in Rust.
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter, WriteHalf};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{mpsc, Mutex};
Tokio is similar to Python's asyncio:
- event loop / runtime
- async TCP sockets
- async tasks
- async synchronization primitives
- channels
JSON values as dynamic objects
Like the previous implementations, this one uses dynamic JSON values:
use serde_json::{json, Value};
Value is basically “some JSON thing”:
- object
- string
- number
- boolean
- array
- null
For example:
let msg = json!({
"command": "send",
"topic": "news",
"msg": "hello",
"delivery": "all"
});
This is flexible and convenient for a toy server. If this were production Rust, a more idiomatic approach would be to define typed request enums and structs, deserialize into them, and let the compiler enforce the schema. But for parity with the earlier posts, keeping the protocol dynamic is fine.
Sending responses
Here are the helpers for writing JSON responses to the client:
async fn send_raw(writer: &mut BufWriter<WriteHalf<TcpStream>>, data: &str) -> tokio::io::Result<()> {
writer.write_all(data.as_bytes()).await?;
writer.write_all(b"\r\n").await?;
writer.flush().await?;
Ok(())
}
async fn send_cmd(writer: &mut BufWriter<WriteHalf<TcpStream>>, cmd: &Value) -> tokio::io::Result<()> {
let s = serde_json::to_string(cmd).unwrap();
send_raw(writer, &s).await
}
async fn send_success(writer: &mut BufWriter<WriteHalf<TcpStream>>) -> tokio::io::Result<()> {
send_cmd(writer, &json!({"success": true})).await
}
async fn send_failure(writer: &mut BufWriter<WriteHalf<TcpStream>>, reason: &str) -> tokio::io::Result<()> {
send_cmd(writer, &json!({"success": false, "reason": reason})).await
}
A few Rust notes here: &mut T means a mutable reference. So:
writer: &mut BufWriter<WriteHalf<TcpStream>>
means:
“Borrow the writer mutably for the duration of this function.”
Rust enforces that while something is mutably borrowed, nothing else can access it in conflicting ways. That sounds restrictive, but it eliminates a large class of bugs.
Rust does not have exceptions in the usual sense. Functions that can fail return Result<T, E>.
So:
tokio::io::Result<()>
means:
- success:
(), the empty value - failure: an I/O error
Command dispatch
The validator dispatcher is straightforward:
fn verify_command(v: &Value) -> bool {
let obj = match v.as_object() {
Some(o) => o,
None => return false,
};
let cmd = match obj.get("command").and_then(|c| c.as_str()) {
Some(s) => s,
None => return false,
};
match cmd {
"subscribe" => validate_subscribe(obj),
"unsubscribe" => validate_unsubscribe(obj),
"send" => validate_send(obj),
_ => false,
}
}
Rust has match, which is more powerful than a C/C++ switch.
let obj = match v.as_object() {
Some(o) => o,
None => return false,
};
This means:
- if
vis a JSON object, bind it too - otherwise return
false
The same thing happens for extracting the command string.
The session object
Each client connection has a Session.
struct Session {
id: usize,
state: State,
subscribed_topics: Vec<String>,
tx: mpsc::UnboundedSender<Value>,
}
The implementation:
impl Session {
async fn new(state: State, tx: mpsc::UnboundedSender<Value>) -> Self {
let mut s = state.lock().await;
let id = s.next_session_id;
s.next_session_id += 1;
Self {
id,
state: Arc::clone(&state),
subscribed_topics: Vec::new(),
tx,
}
}
}
A few things to note.
let mut s = state.lock().await;
This waits asynchronously until the mutex is available. The return value is a guard object, also a common pattern in C++. While that guard exists, the mutex is locked. When the guard goes out of scope, the lock is released automatically. This scope-based resource management is another core Rust pattern. No explicit unlock() call is needed.
Arc::clone(&state) does not deep-copy the state. It only increments the reference count of the Arc. This is easy to misunderstand when coming from other languages. Cloning an Arc is cheap. You just get another pointer to the same shared object.
Conclusion
Rust forces programmers to answer certain questions explicitly:
- Who owns this value?
- Is this shared?
- Is it mutable?
- How long does this lock live?
- Could this value disappear while something still references it?
- Are we moving this object or borrowing it?
In Python and Javascript, we can often ignore those questions until runtime. In C++, most of them still have to be answered in the code, but the compiler does significantly less hand-holding, and gives more freedom. In Rust, the compiler insists that we answer them upfront, and that is both the pain and the point.
Overall, this was a good first Rust project as it exposes the language’s core ideas:
- ownership
- borrowing
Option/Result- shared state with
Arc<Mutex<_>> - async tasks with Tokio
- channels
- scoped locking
My main takeaway is that Rust is about making the invariants of the program explicit, not terseness or convenience. It's a significant trade-off, since it makes the code significantly heavier than say Python or Javascript, even C/C++ in some cases.