Skip to main content

Distributed lock implementation based on SQLAlchemy with support for MySQL, PostgreSQL, MSSQL, and Oracle

Project description

sqlalchemy-dlock

CI GitHub Release PyPI version Documentation Status codecov License Code style: ruff

A distributed lock library based on databases and SQLAlchemy.

sqlalchemy-dlock provides distributed locking capabilities using your existing database infrastructure—no additional services like Redis or ZooKeeper required. It currently supports:

Database Lock Mechanism
MySQL Named Lock (GET_LOCK / RELEASE_LOCK)
MariaDB Named Lock (compatible with MySQL)
MSSQL Application Lock (sp_getapplock)
Oracle User Lock (DBMS_LOCK)
PostgreSQL Advisory Lock

⚠️ Oracle Not Tested: Oracle Database Free (23c/23ai) does NOT support DBMS_LOCK.REQUEST. We do NOT test Oracle in CI or integration tests. Use with Oracle Enterprise/Standard Edition at your own risk.


Why sqlalchemy-dlock?

Distributed locks coordinate access to shared resources across multiple processes or servers. Here's how database-based locking compares to other approaches:

Solution Pros Cons Best For
Redis High performance Additional infrastructure, consistency complexities High-throughput scenarios
ZooKeeper Strong consistency Complex deployment, high operational cost Financial/mission-critical systems
Database Lock Zero additional dependencies, ACID guarantees Lower performance than in-memory solutions Applications with existing databases

sqlalchemy-dlock is ideal for:

  • Projects already using MySQL, MariaDB, MSSQL, Oracle, or PostgreSQL
  • Teams wanting zero additional infrastructure
  • Low to medium concurrency distributed synchronization
  • Applications requiring strong consistency guarantees

Not recommended for:

  • High-concurrency scenarios (consider Redis instead)
  • Situations sensitive to database load

Quick Start

Installation

pip install sqlalchemy-dlock

Requirements:

  • Python 3.9+
  • SQLAlchemy 1.4.3+ or 2.x
  • Appropriate database driver for your database (see below)

Database Drivers

This library requires a database driver to be installed separately. Since you're already using SQLAlchemy, you likely have the appropriate driver installed. For a complete list of SQLAlchemy-supported drivers, see the SQLAlchemy Dialects documentation.

ℹ️ Notes:

  • MSSQL: The pyodbc driver requires the Microsoft ODBC driver to be installed on your system. On Ubuntu/Debian:
    sudo ACCEPT_EULA=Y apt-get install -y msodbcsql18
    
  • Oracle: Oracle Database Free (23c/23ai) does NOT support DBMS_LOCK.REQUEST which is required for distributed lock functionality. For production use with Oracle, a full Oracle Database (Enterprise/Standard Edition) installation is required.

Basic Usage

from sqlalchemy import create_engine
from sqlalchemy_dlock import create_sadlock

engine = create_engine('postgresql://user:pass@localhost/db')

with engine.connect() as conn:
    # Create a lock
    lock = create_sadlock(conn, 'my-resource-key')

    # Acquire the lock
    lock.acquire()
    assert lock.locked

    # Release the lock
    lock.release()
    assert not lock.locked

Using Context Managers

from sqlalchemy import create_engine
from sqlalchemy_dlock import create_sadlock

engine = create_engine('postgresql://user:pass@localhost/db')

with engine.connect() as conn:
    # Automatically acquires and releases the lock
    with create_sadlock(conn, 'my-resource-key') as lock:
        assert lock.locked
        # Your critical section here

    # Lock is automatically released
    assert not lock.locked

With Timeout

from sqlalchemy import create_engine
from sqlalchemy_dlock import create_sadlock

engine = create_engine('postgresql://user:pass@localhost/db')

with engine.connect() as conn:
    try:
        # Raises TimeoutError if lock cannot be acquired within 5 seconds
        with create_sadlock(conn, 'my-resource-key', contextual_timeout=5) as lock:
            pass
    except TimeoutError:
        print("Could not acquire lock - resource is busy")

Common Use Cases

Use Case 1: Preventing Duplicate Task Execution

Prevent multiple workers from processing the same task simultaneously:

from sqlalchemy import create_engine
from sqlalchemy_dlock import create_sadlock

def process_monthly_billing(user_id: int):
    engine = create_engine('postgresql://user:pass@localhost/db')
    with engine.connect() as conn:
        # Ensure billing for a user is only processed once at a time
        lock_key = f'billing:user:{user_id}'
        with create_sadlock(conn, lock_key, contextual_timeout=0):
            # If another worker is already processing this user's billing,
            # this will fail immediately (timeout=0)
            perform_billing_calculation(user_id)
            send_bill(user_id)

Use Case 2: API Rate Limiting & Debouncing

Prevent simultaneous expensive operations on the same resource:

from fastapi import FastAPI, HTTPException
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy_dlock import create_async_sadlock

app = FastAPI()
engine = create_async_engine('postgresql+asyncpg://user:pass@localhost/db')
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

@app.post("/api/resources/{resource_id}/export")
async def export_resource(resource_id: str):
    async with AsyncSessionLocal() as session:
        # Try to acquire lock without blocking
        lock = create_async_sadlock(session, f'export:{resource_id}')
        acquired = await lock.acquire(block=False)
        if not acquired:
            raise HTTPException(status_code=409, detail="Export already in progress")

        try:
            return await perform_export(resource_id)
        finally:
            await lock.release()

Use Case 3: Scheduled Job Coordination

Ensure scheduled jobs don't overlap across multiple servers:

from apscheduler.schedulers.background import BackgroundScheduler
from sqlalchemy import create_engine
from sqlalchemy_dlock import create_sadlock

def data_sync_job():
    engine = create_engine('mysql://user:pass@localhost/db')
    with engine.connect() as conn:
        lock_key = 'scheduled-job:data-sync'

        # Only proceed if no other server is running this job
        lock = create_sadlock(conn, lock_key, contextual_timeout=60)
        with lock:
            perform_data_sync()

scheduler = BackgroundScheduler()
scheduler.add_job(data_sync_job, 'interval', minutes=30)
scheduler.start()

Use Case 4: Decorator Pattern for Clean Code

Create a reusable decorator for locking functions:

from functools import wraps
from sqlalchemy import create_engine
from sqlalchemy_dlock import create_sadlock

def with_db_lock(key_func, timeout=None):
    """Decorator that acquires a database lock before executing the function."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            engine = create_engine('postgresql://user:pass@localhost/db')
            lock_key = key_func(*args, **kwargs)

            with engine.connect() as conn:
                with create_sadlock(conn, lock_key, contextual_timeout=timeout):
                    return func(*args, **kwargs)
        return wrapper
    return decorator

# Usage
@with_db_lock(lambda user_id: f'user:update:{user_id}', timeout=10)
def update_user_profile(user_id: int, profile_data: dict):
    # This function is protected from concurrent execution
    # for the same user_id
    ...

Working with SQLAlchemy ORM

Using locks with SQLAlchemy ORM sessions:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy_dlock import create_sadlock

engine = create_engine('postgresql://user:pass@localhost/db')
Session = sessionmaker(bind=engine)

with Session() as session:
    with create_sadlock(session, 'my-resource-key') as lock:
        # Use the session within the locked context
        user = session.query(User).get(user_id)
        user.balance += 100
        session.commit()

Asynchronous I/O Support

Full async/await support for asynchronous applications:

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy_dlock import create_async_sadlock

engine = create_async_engine('postgresql+asyncpg://user:pass@localhost/db')

async def main():
    async with engine.connect() as conn:
        async with create_async_sadlock(conn, 'my-resource-key') as lock:
            assert lock.locked
            # Your async critical section here
        assert not lock.locked

Supported async drivers:


PostgreSQL Lock Types

PostgreSQL provides multiple advisory lock types. Choose based on your scenario:

Lock Type Parameters Description Use Case
Session-exclusive (default) Held until manually released or session ends Long-running tasks
Session-shared shared=True Multiple shared locks can coexist Multi-reader scenarios
Transaction-exclusive xact=True Automatically released when transaction ends Transaction-scoped operations
Transaction-shared shared=True, xact=True Shared locks within transaction Transactional read-heavy workloads

Example: Transaction-Level Lock

from sqlalchemy import create_engine
from sqlalchemy_dlock import create_sadlock

engine = create_engine('postgresql://user:pass@localhost/db')

with engine.connect() as conn:
    # Transaction-level lock - automatically released on commit/rollback
    with create_sadlock(conn, 'my-key', xact=True) as lock:
        conn.execute(text("INSERT INTO ..."))
        conn.commit()  # Lock is released here

Example: Shared Lock for Read-Heavy Workloads

from sqlalchemy import create_engine
from sqlalchemy_dlock import create_sadlock

engine = create_engine('postgresql://user:pass@localhost/db')

# Multiple readers can hold shared locks simultaneously
def read_resource(resource_id: str):
    with engine.connect() as conn:
        with create_sadlock(conn, f'resource:{resource_id}', shared=True):
            return conn.execute(text("SELECT * FROM resources WHERE id = :id"), {"id": resource_id})

# Writers need exclusive locks
def write_resource(resource_id: str, data: dict):
    with engine.connect() as conn:
        # This will wait for all shared locks to be released
        with create_sadlock(conn, f'resource:{resource_id}') as lock:
            conn.execute(text("UPDATE resources SET ..."))
            conn.commit()

MSSQL Lock Types

SQL Server's sp_getapplock supports multiple lock modes:

Lock Mode Parameters Description Use Case
Exclusive (default) (default) Full exclusive access Write operations, critical sections
Shared shared=True Multiple readers can hold lock concurrently Read-heavy workloads
Update update=True Intended for update operations; compatible with Shared locks Read-then-write patterns

Example: Exclusive Lock for Writing (Default)

from sqlalchemy import create_engine
from sqlalchemy_dlock import create_sadlock

engine = create_engine('mssql+pyodbc://user:pass@localhost/db')

with engine.connect() as conn:
    # Exclusive lock for writing (default)
    with create_sadlock(conn, 'my-resource') as lock:
        conn.execute(text("UPDATE resources SET ..."))

Example: Shared Lock for Reading

from sqlalchemy import create_engine
from sqlalchemy_dlock import create_sadlock

engine = create_engine('mssql+pyodbc://user:pass@localhost/db')

# Multiple readers can hold shared locks simultaneously
def read_resource(resource_id: str):
    with engine.connect() as conn:
        with create_sadlock(conn, f'resource:{resource_id}', shared=True):
            return conn.execute(text("SELECT * FROM resources WHERE id = :id"), {"id": resource_id})

Example: Update Lock for Read-Then-Write Patterns

from sqlalchemy import create_engine
from sqlalchemy_dlock import create_sadlock

engine = create_engine('mssql+pyodbc://user:pass@localhost/db')

with engine.connect() as conn:
    # Update lock - compatible with shared locks, used for read-then-write patterns
    with create_sadlock(conn, 'my-resource', update=True) as lock:
        data = conn.execute(text("SELECT * FROM resources WHERE id = :id"), {"id": resource_id})
        # Perform read operations
        # Then upgrade to exclusive lock for writing
        conn.execute(text("UPDATE resources SET ..."))

Oracle Lock Types

Oracle's DBMS_LOCK.REQUEST supports 6 lock modes with different compatibility:

Lock Mode Constant Description Use Case
X (default) X_MODE Exclusive - full exclusive access Write operations, critical sections
S S_MODE Shared - multiple readers Read-heavy workloads
SS SS_MODE Sub-Shared - share locks on subparts Aggregate object read
SX SX_MODE Sub-Exclusive (Row Exclusive) Row-level updates
SSX SSX_MODE Shared Sub-Exclusive Read with pending write
NL NL_MODE Null - no actual lock Testing/coordination only

Example: Exclusive Lock for Writing (Default)

from sqlalchemy import create_engine
from sqlalchemy_dlock import create_sadlock

engine = create_engine('oracle+oracledb://user:pass@localhost/db')

with engine.connect() as conn:
    # Exclusive lock for writing (default)
    with create_sadlock(conn, 'my-resource') as lock:
        conn.execute(text("UPDATE resources SET ..."))

Example: Shared Lock for Reading

from sqlalchemy import create_engine
from sqlalchemy_dlock import create_sadlock

engine = create_engine('oracle+oracledb://user:pass@localhost/db')

# Multiple readers can hold shared locks simultaneously
def read_resource(resource_id: str):
    with engine.connect() as conn:
        with create_sadlock(conn, f'resource:{resource_id}', lock_mode="S"):
            return conn.execute(text("SELECT * FROM resources WHERE id = :id"), {"id": resource_id})

Example: Transaction-Level Lock

from sqlalchemy import create_engine
from sqlalchemy_dlock import create_sadlock

engine = create_engine('oracle+oracledb://user:pass@localhost/db')

with engine.connect() as conn:
    # Transaction-level lock - automatically released on commit/rollback
    with create_sadlock(conn, 'my-resource', release_on_commit=True) as lock:
        conn.execute(text("INSERT INTO ..."))
        conn.commit()  # Lock is released here

Example: Using Integer Lock ID Directly

from sqlalchemy import create_engine
from sqlalchemy_dlock import create_sadlock

engine = create_engine('oracle+oracledb://user:pass@localhost/db')

with engine.connect() as conn:
    # Direct integer lock ID (no hashing needed)
    with create_sadlock(conn, 12345, lock_mode="X") as lock:
        # Direct lock ID usage
        pass

Note: String keys are converted to integer IDs using blake2b hash (similar to PostgreSQL).


Performance Considerations

  • Database lock operations require network round-trips and are slower than in-memory solutions like Redis
  • PostgreSQL timeout is implemented through polling and may have ~1 second variance
  • Consider your concurrency requirements before choosing database-based locks

Thread Safety

  • Lock objects are thread-local and cannot be safely passed between threads
  • Each thread must create its own lock instance
  • Cross-process/cross-server locking works normally

MySQL-Specific Behavior

⚠️ Warning: MySQL allows acquiring the same named lock multiple times on the same connection. This can lead to unexpected cascading locks:

# DANGER: On MySQL, the second acquisition succeeds immediately
with create_sadlock(conn, 'my-key') as lock1:
    # This immediately returns without waiting - no real mutual exclusion!
    with create_sadlock(conn, 'my-key') as lock2:
        pass

To avoid this, use separate connections or implement additional checking logic.

Lock Lifetime

  • Locks are tied to your database connection
  • Closing a connection releases all associated locks
  • Properly manage Connection/Session lifecycle to avoid accidental lock releases

FAQ

Q: What happens if the database goes down?

A: Locks are automatically released when the database connection is lost. This is intentional behavior to prevent deadlocks.

Q: Can I pass a lock object between threads?

A: No. Lock objects are thread-local for safety reasons. Each thread should create its own lock instance pointing to the same lock key.

Q: How do I choose between MySQL and PostgreSQL?

A: Both are fully supported. PostgreSQL offers more lock types (shared/transaction-level), while MySQL's implementation is simpler.

Q: Are locks inherited by child processes?

A: No. Child processes must establish their own database connections and create new lock objects.

Q: How can I debug lock status?

A:

  • MySQL: SELECT * FROM performance_schema.metadata_locks;
  • PostgreSQL: SELECT * FROM pg_locks WHERE locktype = 'advisory';

Q: What's the maximum lock key size?

A:

  • MySQL: 64 characters
  • MSSQL: 255 characters
  • PostgreSQL: Keys are converted to 64-bit integers via BLAKE2b hash
  • Oracle: Keys are converted to integers (0-1073741823) via BLAKE2b hash

Q: Can I use this with SQLite?

A: No. SQLite does not support the same named/advisory lock mechanisms as MySQL or PostgreSQL.


Testing

The following database drivers are tested:

MySQL:

PostgreSQL:

MSSQL:

Oracle:

Running Tests Locally

  1. Install the project with development dependencies:
uv sync --group test
uv pip install mysqlclient aiomysql psycopg2 asyncpg
  1. Start MySQL and PostgreSQL services using Docker:
docker compose -f db.docker-compose.yml up
  1. Set environment variables for database connections (or use defaults):
export TEST_URLS="mysql://test:test@127.0.0.1:3306/test postgresql://postgres:test@127.0.0.1:5432/"
export TEST_ASYNC_URLS="mysql+aiomysql://test:test@127.0.0.1:3306/test postgresql+asyncpg://postgres:test@127.0.0.1:5432/"

ℹ️ Note: Test cases also load environment variables from tests/.env.

  1. Run tests:
python -m unittest

Running Tests with Docker Compose

The project includes a comprehensive test matrix across Python and SQLAlchemy versions:

cd tests
docker compose up --abort-on-container-exit

Documentation

Full API documentation is available at https://sqlalchemy-dlock.readthedocs.io/


Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sqlalchemy_dlock-0.8.0.tar.gz (31.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sqlalchemy_dlock-0.8.0-py3-none-any.whl (29.0 kB view details)

Uploaded Python 3

File details

Details for the file sqlalchemy_dlock-0.8.0.tar.gz.

File metadata

  • Download URL: sqlalchemy_dlock-0.8.0.tar.gz
  • Upload date:
  • Size: 31.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for sqlalchemy_dlock-0.8.0.tar.gz
Algorithm Hash digest
SHA256 42d236ecb4d04f3f4cc4dfc88ceb3766f847cbeeea3d8709b644179f85a7d71c
MD5 79dbf9d62f6ab3190e1ffac75bd5a14a
BLAKE2b-256 c60816e7144ba63029ed7d5c2cb1dd40bdc581efecdf316458035ecedd26913f

See more details on using hashes here.

Provenance

The following attestation bundles were made for sqlalchemy_dlock-0.8.0.tar.gz:

Publisher: python-package.yml on tanbro/sqlalchemy-dlock

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file sqlalchemy_dlock-0.8.0-py3-none-any.whl.

File metadata

File hashes

Hashes for sqlalchemy_dlock-0.8.0-py3-none-any.whl
Algorithm Hash digest
SHA256 ebee4af423e60953f7dfcd60dbe6859f9d719c0bfa683afcecefb3471633a383
MD5 565a9d0fa42c8719938d7d736d8b73be
BLAKE2b-256 8f5b1d14d919b024a444db15e8d35eb49f7b673b770af62ea878f5b8470bcdda

See more details on using hashes here.

Provenance

The following attestation bundles were made for sqlalchemy_dlock-0.8.0-py3-none-any.whl:

Publisher: python-package.yml on tanbro/sqlalchemy-dlock

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page