Lamport's logical clocks and the Ricart–Agrawala Distributed Mutual Exclusion algorithm in Python

Marton Trencseni - Sun 13 July 2025 - Programming

Introduction

In the previous article I gave an implementation of Lamport's Bakery algorithm in Python using HTTP Flask servers. Here, I replace the worker processes from that implementation with a version that uses the Ricart–Agrawala Distributed Mutual Exclusion algorithm, using Lamport's logical clocks.

The code is on Github: mtrencseni/dca/dme/python.

Lamport's logical clocks

Distributed systems have no shared wall clock, so we need a way to talk about before and after without assuming some shared time that everybody agrees on. Lamport’s logical clock design is simple:

  1. Each process carries a counter. It’s just an int called clock.
  2. Every local event (including sending a message) increments the counter by 1 and stamps the event with the new value.
  3. Every received message brings its own timestamp called msg_ts. On receive, set clock = max(clock, msg_ts) + 1.

That's it! What these stamps guarantee:

  • If event A could have influenced event B—i.e., A → B in Lamport’s happens-before relation—then timestamp(A) < timestamp(B). Causality never flows “backwards.”
  • The reverse is not true: a smaller number doesn’t prove causality; two independent events may just get different numbers.

Because numbers are total-ordered integers, you can always break ties deterministically (ie. if timestamps are equal, break the tie by comparing process_id or node_id), giving a single global order everyone agrees on—perfect for mutual-exclusion queues, totally ordered logging, or snapshot algorithms.

Lamport's Distributed Mutual Exclusion algorithm

Lamport in 1978 showed that you can build a lock for N independent machines using only message passing and his logical clocks:

  1. When a node wants the lock it sends a REQUEST stamped with its (timestamp, process_id) to everyone, then puts the pair into its own priority queue.
  2. Each receiver adds the pair to its queue and always sends back a REPLY right away.
  3. A node enters the critical section after (a) its own request is at the head of its queue and (b) it has received a REPLY from every peer.
  4. On exit it broadcasts a RELEASE so peers can remove its request and possibly unblock.

This works, but every entry costs three messages per peer: REQUEST, REPLY, and RELEASE.

Ricart–Agrawala’s improvement

Ricart and Agrawala in 1981 observed that the RELEASE broadcast is unnecessary; a node can give permission only when asked. Here is the entire protocol—self-contained and still powered by Lamport clocks.

Local state per node

  • clock (int) – Lamport logical clock (integer).
  • my_request (int) – the timestamp pair of the current request, or None.
  • deferred (set of ints) – set of peers whose requests you must answer later.

Requesting the critical section

  1. Increment clock; set my_request = (clock, pid).
  2. Send REQUEST(my_request) to every other node.
  3. Wait until a REPLY has arrived from every peer, then enter the critical section.

Handling a received REQUEST(ts, j)

  1. Set clock = max(clock, ts) + 1.
  2. If you are not requesting the section, or your own my_request is earlier than (ts, j), send a REPLY immediately.
  3. Otherwise add j to deferred; you will reply later.

Leaving the critical section

  1. Clear my_request.
  2. Send a REPLY to every process in deferred, then empty deferred.

The distributed mutual exclusion algorithm guarantees:

  • Safety. Two nodes cannot be inside together, because at least one sees the other’s timestamp as earlier and waits.
  • Fairness. Requests are served strictly by (timestamp, pid).
  • Cost. Exactly two messages per peer (REQUEST and REPLY), saving one-third of the traffic compared with Lamport’s original algorithm.
  • Delay. One round-trip time; a node enters as soon as the last reply arrives.

For small to medium clusters where every machine can reach every other directly, Ricart–Agrawala delivers the same correctness as Lamport’s lock with fewer messages and no extra complexity.

Python implementation with HTTP Flask servers

The previous article used a simple counter increment, broken into a GET and a SET contained in a critical section to test mutual exclusion correctness. If mutual exclusion works, there is no interleaving, so each GET and SET pair is effectively atomic. If mutual exclusion is broken and some interleaving happens, the final counter value will not be as expected. The implementation was split into three parts:

  1. Increment server: a HTTP server that just stores a single integer. For the purposes of this demonstration, there is no /increment; instead each worker has to call /get to retrieve the current value and then /set to store the incremented value in their critical sections. Correctness assumes mutual exclusion, so that no two workers /get and /set the same value in interleaving calls.
  2. Workers: a HTTP server that implements the bakery logic. Each server exposes their read-only /choosing and /ticket to the other workers, and calls the increment server in their critical section.
  3. Driver: a simple Python program that first creates the increment server and N workers, kicks off the workers, and then compares the final count in the increment server to the expected count.

We can re-use the increment server and the driver as-is for the demonstration of the distributed mutual exclusion algorithm. Only the worker has to change:

import sys, time, threading, requests, json, logging
from flask import Flask, request, jsonify

num_loops   = int(sys.argv[1])  # e.g. 1_000
my_id       = int(sys.argv[2])  # 1 .. n
num_workers = int(sys.argv[3])  # e.g. 8

app = Flask(__name__)
clock            = 0                # Lamport logical clock
requesting       = False            # am I trying to get the lock to run my critical section?
request_ts       = 0                # timestamp of my current request
replies_needed   = 0                # remaining replies to receive
deferred_replies = set()            # worker IDs whose reply is deferred
done             = False            # set to True when loops finished
guard_lock       = threading.Lock() # guards all variables above

@app.route("/request", methods=["POST"])
def endpoint_request():
    id = int(request.get_json()["id"])
    ts = int(request.get_json()["ts"])
    global clock
    with guard_lock:
        clock = max(clock, ts) + 1
        grant_request = (not requesting) or (ts, id) < (request_ts, my_id)
        if grant_request:
            # reply immediately
            requests.post(f"http://127.0.0.1:{7000+id}/reply", json={"id": my_id, "ts": clock})
        else:
            deferred_replies.add(id)
    return jsonify(ok=True)

@app.route("/reply", methods=["POST"])
def endpoint_reply():
    ts = int(request.get_json()["ts"])
    global clock, replies_needed
    with guard_lock:
        clock = max(clock, ts) + 1
        replies_needed -= 1 # we assume each worker only replies once
    return jsonify(ok=True)

@app.route("/start", methods=["POST"])
def endpoint_start():
    threading.Thread(target=run_worker, daemon=True).start()
    return jsonify(started=True)

@app.route("/status")
def endpoint_status():
    return jsonify(done=done)

def lock():
    global clock, requesting, request_ts, replies_needed
    with guard_lock:
        requesting = True
        clock += 1
        request_ts = clock
        replies_needed = num_workers - 1
    # broadcast request
    for i in range(1, num_workers + 1):
        if i != my_id:
            requests.post(f"http://127.0.0.1:{7000+i}/request", json={"ts": request_ts, "id": my_id})
    # wait for all replies
    while True:
        with guard_lock:
            if replies_needed == 0:
                break
        time.sleep(0.001)

def unlock():
    global requesting
    with guard_lock:
        pending = list(deferred_replies)
        deferred_replies.clear()
        requesting = False
    for i in pending:
        requests.post(f"http://127.0.0.1:{7000+i}/reply", json={"id": my_id, "ts": clock})

def critical_section():
    curr = requests.get(f"http://127.0.0.1:7000/get").json()["value"]     # get
    requests.post(f"http://127.0.0.1:7000/set", json={"value": curr + 1}) # set

def run_worker():
    global done
    for i in range(num_loops):
        lock()
        critical_section()
        unlock()
        #print(f"Worker {my_id} at {i}")
    done = True
    print(f"Worker {my_id} Done.", flush=True)

if __name__ == "__main__":
    app.run(port=7000+my_id, threaded=False)

Note that each Flask worker is started with threaded=False so incoming HTTP calls are handled one-at-a-time inside the process. Meanwhile, the worker’s main loop runs in a separate Python thread, so both threads still touch the same clock, request_ts, and state flags. That's why we need the guard_lock: it guarantees atomic updates to the state variables, preventing races between the Flask handler and the main worker loop.

Running the demonstration

With lock() and unlock() commented out, so no mutual exclusion guarantee:

$ time python3 driver.py 8 100
Workers:    8
Iterations: 100
Expected:   800
Observed:   171
FAILED!

real    0m1.969s
user    0m5.314s
sys     0m0.555s

With the distributed mutual exclusion implementation:

$ time python3 driver.py 8 100
Workers:    8
Iterations: 100
Expected:   800
Observed:   800
Passed!

real    0m7.574s
user    0m22.182s
sys     0m2.498s

Conclusion

The Ricart–Agrawala algorithm shows that you can get mutual exclusion in a distributed system with nothing more than Lamport's logical clocks and two messages.