Friday, January 16, 2026

FOR UPDATE SKIP LOCKED in MongoDB


SELECT ... FOR UPDATE SKIP LOCKED is a vendor-specific SQL characteristic out there in a number of relational databases (e.g., PostgreSQL, Oracle, MySQL). It helps parallel threads keep away from ready on locked rows. MongoDB’s concurrency mannequin makes use of optimistic concurrency: reads don’t block writes, and writes don’t block reads. To coordinate parallel processing, you may reserve a doc by writing a lock area so different employees skip it.

I will use an instance mentioned within the Reddit query “ACID learn then write – Python“:

Consumer in python, multi course of. Every course of picks and reads one doc, calls some public APIs, and add information to the doc and put it aside. Then subsequent doc. What’s written can rely on the learn information. Query is: in python, how can I create and configure transactions within the code to ensure no different course of can learn or write its present doc from the second a course of begins studying it till carried out writing its further information? This implies concurrent reads mustn’t occur…

On this instance, I will course of messages primarily based on their originating IP handle. A number of threads will enrich them with location information fetched from the general public API at https://ip-api.com/.

Right here is an instance of an preliminary doc:

{
  _id: ObjectId('6956e772baea71e37a818e73'),
  originatingIp: '1.1.1.1',
  location: null
}
Enter fullscreen mode

Exit fullscreen mode

Right here is the doc whereas it’s being processed:

{
  _id: ObjectId('6956e772baea71e37a818e73'),
  originatingIp: '1.1.1.1',
  location: null,
  lock: {
    by: 'franck',
    till: datetime.datetime(2026, 1, 1, 22, 33, 10, 833000)
  }
}
Enter fullscreen mode

Exit fullscreen mode

Right here is similar doc after processing:

{
  _id: ObjectId('6956e772baea71e37a818e73'),
  originatingIp: '1.1.1.1',
  location: {
    standing: 'success',
    nation: 'Hong Kong',
    countryCode: 'HK',
    area: 'HCW',
    regionName: 'Central and Western District',
    metropolis: 'Hong Kong',
    zip: '',
    lat: 22.3193,
    lon: 114.1693,
    timezone: 'Asia/Hong_Kong',
    isp: 'Cloudflare, Inc',
    org: 'APNIC and Cloudflare DNS Resolver venture',
    as: 'AS13335 Cloudflare, Inc.',
    question: '1.1.1.1'
  }
}
Enter fullscreen mode

Exit fullscreen mode

Storing in-process data avoids lengthy transactions that cover the present standing and make troubleshooting tough when the general public API is sluggish.



Design

This script is designed as a whole, runnable demonstration of tips on how to implement SELECT ... FOR UPDATE SKIP LOCKED-style parallel job claiming in MongoDB. The script will generate every part it wants, course of it, and present the top state.

  • insert_test_docs() inserts take a look at information with random IP addresses in a brand new assortment “message”, and creates a partial index to get the message to course of ({location: null}).
  • claim_document() updates a message to course of, including lock data in order that one other thread won’t decide the identical, and fetches the doc. The standards are that it have to be processed ({location: null}) and never locked, or the lock will need to have expired (with a 1s grace to account for clock skew).
  • fetch_location() is the decision to the general public API, right here getting location data for an IP handle.
  • process_document() calls claim_document() to get a message to course of, with a lock. It calls fetch_location() and updates the doc with the placement. It ensures the lock remains to be in place earlier than the replace, then unsets it. Every thread runs in a loop, claiming and processing paperwork till the timeout.
  • major() calls these in sequence and shows the ultimate paperwork.

This answer avoids express transactions, which is preferable as a result of they would come with a name to a public API with unpredictable response time. It additionally avoids utilizing findOneAndUpdate, whose greater overhead comes from storing full pre- and post-images of paperwork for retryable operations. For giant paperwork—attainable in actual workloads, even when not proven on this demo—this could result in important write amplification. Lastly, setting an expiration timestamp permits computerized re-processing if a message fails.



Code

Beneath is the whole Python program, which you’ll take a look at utilizing completely different numbers of paperwork and threads:

import os
import random
import socket
import threading
import time
from datetime import datetime, timedelta
import requests
from pymongo import MongoClient


# Mongo connection and assortment
consumer = MongoClient("mongodb://127.0.0.1:27017/?directConnection=true")
db = consumer.take a look at
messages = db.message

#  Take a look at settings (the take a look at inserts paperwork, then runs the processing threads for some period)
DOCUMENTS = 10 # variety of paperwork created initially
THREADS = 5    # variety of threads that loop to say a doc
SECONDS = 15   # thread stops looping on declare

# Employee identification (to establish the thread, and set an expiration on the lock)
WORKER_ID = f"{socket.gethostname()}-{os.getpid()}"
LOCK_DURATION = timedelta(seconds=60) # assumes processing completes inside that period, if not, will probably be claimed by one other, and this one won't replace it

# Get the time
def utcnow(): return datetime.utcnow()
MAX_CLOCK_SKEW=timedelta(seconds=1) # used as a grace interval when lock is expired

# --- Put together take a look at messages (with random generated IP) ---
def insert_test_docs():
    # Drop the gathering fully (removes information + indexes)
    messages.drop()
    # Create the partial index for unprocessed docs  (they've location = null )
    messages.create_index(  [("lock.until", 1)],  partialFilterExpression={"location": None}  )
    # Generate random IPs for the take a look at
    ips = [
        ".".join(str(random.randint(1, 255)) for _ in range(4))
        for _ in range(DOCUMENTS)
    ]
    # Explicitly set location=None to match the partial index filter
    docs = [
        { "originatingIp": ip, "location": None  } # A null location is the marker to process it
        for ip in ips
    ]
    messages.insert_many(docs)
    print(f"[STARTUP] Inserted {DOCUMENTS} take a look at docs into 'message'")
    for doc in messages.discover({}, {"_id": 0, "originatingIp": 1, "location": 1}):
        print(doc)


# --- Declare a message ---
def claim_document():
    now = utcnow()
    lock_expiry = now + LOCK_DURATION
    token = random.randint(1000, 9999)  # distinctive lock token for additional security
    # Atomic lock declare: match unlocked or steal locks expired
    end result = messages.update_one(
        {
          "$and": [
            # the location is not set
            { "location": None },
            # the document is not locked, or locked expired including grace period
            {  "$or": [  { "lock": { "$exists": False } },  { "lock.until": { "$lt": now - MAX_CLOCK_SKEW } }  ]  }
          ]
        },
        { "$set": {  "lock": {  "by": WORKER_ID,  "till": lock_expiry,  "token": token  }  }}
    )
    if end result.modified_count == 0:
        return None
    # Fetch precisely the doc we locked — match by employee, expiry, AND token
    doc = messages.find_one({  "lock.by": WORKER_ID,  "lock.till": lock_expiry,  "lock.token": token  })
    if doc:
        print(f"[{WORKER_ID}] {threading.current_thread().identify} claimed IP {doc['originatingIp']} with token={token}")
    else:
        print(f"[{WORKER_ID}] {threading.current_thread().identify} declare succeeded however fetch failed — attainable race?")
    return doc

# --- Name the general public API ---
def fetch_location(ip):
    url = f"http://ip-api.com/json/{ip}"
    attempt:
        resp = requests.get(url, timeout=30)
        if resp.status_code == 200:
            return resp.json()
        print(f"[API] Error: HTTP {resp.status_code} for {ip}")
        return None
    besides Exception as e:
        print(f"[API] Exception for {ip}: {e}")
        return None

# --- Course of messages in a loop ---
def process_document():
    start_time = time.time()
    timeout = SECONDS  # seconds
    thread_name = threading.current_thread().identify
    whereas True:
        # Attempt to declare a doc
        doc = claim_document()
        if doc:
            # We efficiently claimed a doc — course of it
            ip = doc["originatingIp"]
            location_data = fetch_location(ip)
            if not location_data:
                print(f"[{WORKER_ID}] {thread_name} did not fetch location for {ip}")
                return
            # Closing replace provided that lock remains to be legitimate
            now = utcnow()
            end result = messages.update_one(
                {
                    "_id": doc["_id"],
                    "lock.by": WORKER_ID,
                    "lock.till": {"$gte": now},
                    "lock.token": doc["lock"]["token"]
                },
                {
                    "$set": {"location": location_data},
                    "$unset": {"lock": ""}
                }
            )
        # No doc claimed — test elapsed time earlier than wait and retry
        elapsed = time.time() - start_time
        if elapsed >= timeout:
            print(f"[{WORKER_ID}] {thread_name} exiting after {elapsed:.2f}s")
            return
        time.sleep(5)  # keep away from hammering DB and the general public API

# --- Initialize and run a number of processing threads ---
def major():
    print(f"nInserting paperwork")
    insert_test_docs()
    print(f"nBeginning threads")
    threads = []
    for i in vary(THREADS):
        tname = f"T{i}"
        t = threading.Thread(goal=process_document, identify=tname)
        t.begin()
        threads.append(t)
    for t in threads:
        t.be a part of()
    print(f"n[{WORKER_ID}] Verify remaining paperwork:")
    for doc in messages.discover({}, {"originatingIp": 1, "location.question": 1, "location.nation": 1, "location.message": 1, "lock.by": 1, "lock.till": 1}):
        print(doc)

if __name__ == "__main__":
    major()

Enter fullscreen mode

Exit fullscreen mode



Technical Insights

MongoDB’s storage engine ensures atomicity for every update_one by way of its WriteUnitOfWork and RecoveryUnit mechanisms. Nevertheless, sustaining learn consistency throughout a number of operations requires application-level coordination. On this implementation, that coordination is offered by an atomic declare with conditional standards, making certain that just one employee can lock an unprocessed or expired doc at a time.

A number of safeguards mitigate race circumstances. The declare step narrows matches utilizing the employee ID, lock expiry, and a random token. The ultimate replace then re-verifies all these fields earlier than committing modifications, stopping stale or stolen locks from being utilized. Lock expiration permits computerized restoration from failures, and a small grace window accounts for clock skew in distributed techniques.

Write conflicts throughout concurrent updates are mechanically resolved on the storage layer through optimistic concurrency management. This ensures correctness with out blocking different operations. The end result is a sturdy, non-blocking parallel processing workflow that preserves document-level ACID ensures whereas scaling successfully in shared or cloud environments.

On this design, every thread processes one message at a time, in index order. Imposing strict world message ordering can be extra complicated. The first objective right here is the scalability of the parallel processing.



Closing Suggestion

When migrating from PostgreSQL to MongoDB—like between any two databases—keep away from a direct feature-by-feature mapping, as a result of the techniques are basically completely different. SKIP LOCKED works round blocking FOR UPDATE reads in PostgreSQL, whereas reads and writes don’t block in MongoDB. As a substitute of replicating one other database habits, make clear the enterprise requirement and design essentially the most applicable answer. On this instance, fairly than counting on generic transaction management like SQL, we modeled object states—corresponding to declare acquisition and expiration—and retailer that state immediately within the paperwork.

Related Articles

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Latest Articles