Skip to main content

Pgqueuer is a Python library leveraging PostgreSQL for efficient job queuing.

Project description

🚀 PGQueuer – PostgreSQL‑powered job queues for Python

CI pypi downloads versions

📚 Documentation · 💻 Source · 💬 Discord

PGQueuer turns your PostgreSQL database into a fast, reliable background job processor. Jobs live in the same database as your application data, so you scale without adding new infrastructure. PGQueuer uses PostgreSQL features like LISTEN/NOTIFY and FOR UPDATE SKIP LOCKED to keep workers coordinated and throughput high.

Features

  • 💡 Minimal integration: PGQueuer is a single Python package. Bring your existing PostgreSQL connection and start enqueueing jobs—no extra services to deploy.
  • ⚛️ PostgreSQL-powered concurrency: Workers lease jobs with FOR UPDATE SKIP LOCKED, allowing many processes to pull from the same queue without stepping on each other.
  • 🚧 Instant notifications: LISTEN/NOTIFY wakes idle workers as soon as a job arrives. A periodic poll backs it up for robustness.
  • 👨‍🎓 Batch friendly: Designed for throughput; enqueue or acknowledge thousands of jobs per round trip.
  • Scheduling & graceful shutdown: Register cron-like recurring jobs and let PGQueuer stop workers cleanly when your service exits.

Installation

PGQueuer targets Python 3.11+ and any PostgreSQL 12+ server. Install the package and initialize the database schema with the CLI:

pip install pgqueuer
pgq install        # create tables and functions in your database

The CLI reads PGHOST, PGUSER, PGDATABASE and friends to know where to install. Use pgq install --dry-run to preview SQL or pgq uninstall to remove the structure when you're done experimenting.

Quick Start

The API revolves around consumers that process jobs and producers that enqueue them. Follow the steps below to see both sides.

1. Create a consumer

The consumer declares entrypoints and scheduled tasks. PGQueuer wires these up to the worker process that polls for jobs.

from datetime import datetime

import asyncpg
from pgqueuer import PgQueuer
from pgqueuer.db import AsyncpgDriver
from pgqueuer.models import Job, Schedule

async def main() -> PgQueuer:
    conn = await asyncpg.connect()
    driver = AsyncpgDriver(conn)
    pgq = PgQueuer(driver)

    @pgq.entrypoint("fetch")
    async def process(job: Job) -> None:
        print(f"Processed: {job!r}")

    @pgq.schedule("every_minute", "* * * * *")
    async def every_minute(schedule: Schedule) -> None:
        print(f"Ran at {datetime.now():%H:%M:%S}")

    return pgq

Run the consumer with the CLI so it begins listening for work:

pgq run examples.consumer:main

2. Enqueue jobs

In a separate process, create a Queries object and enqueue jobs targeted at your entrypoint:

import asyncpg
from pgqueuer.db import AsyncpgDriver
from pgqueuer.queries import Queries

async def main() -> None:
    conn = await asyncpg.connect()
    driver = AsyncpgDriver(conn)
    q = Queries(driver)
    await q.enqueue(["fetch"], [b"hello world"], [0])

Run the producer:

python examples/producer.py

Monitor your queues

pgq dashboard --interval 10 --tail 25 --table-format grid

The dashboard is an interactive terminal UI that refreshes periodically. Use it to watch queue depth, processing times and job statuses in real time.

Example output:

+---------------------------+-------+------------+--------------------------+------------+----------+
|          Created          | Count | Entrypoint | Time in Queue (HH:MM:SS) |   Status   | Priority |
+---------------------------+-------+------------+--------------------------+------------+----------+
| 2024-05-05 16:44:26+00:00 |  49   |    sync    |         0:00:01          | successful |    0     |
...
+---------------------------+-------+------------+--------------------------+------------+----------+

Drivers

PGQueuer works with both async and sync PostgreSQL drivers:

  • AsyncpgDriver / AsyncpgPoolDriver – integrate with asyncpg connections or pools for asyncio applications.
  • PsycopgDriver – built on psycopg's async interface for apps already using psycopg 3.
  • SyncPsycopgDriver – a thin wrapper around psycopg's blocking API so traditional scripts can enqueue work.

See the driver documentation for details.

Development and Testing

PGQueuer uses Testcontainers to launch an ephemeral PostgreSQL instance automatically for the integration test suite—no manual Docker Compose setup or pre‑provisioned database required. Just ensure Docker (or another supported container runtime) is running locally.

Typical development workflow:

  1. Install dependencies (including extras): uv sync --all-extras
  2. Run lint & type checks: uv ruff check . and uv run mypy .
  3. Run the test suite (will start/stop a disposable PostgreSQL container automatically): uv run pytest
  4. (Optional) Aggregate target (if you prefer the Makefile): make check

The containerized database lifecycle is fully automatic; tests handle creation, migrations, and teardown. This keeps your local environment clean and ensures consistent, isolated runs.

License

PGQueuer is MIT licensed. See LICENSE for details.


Ready to supercharge your workflows? Install PGQueuer today and start processing jobs with the database you already trust.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pgqueuer-0.25.3.tar.gz (322.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pgqueuer-0.25.3-py3-none-any.whl (69.4 kB view details)

Uploaded Python 3

File details

Details for the file pgqueuer-0.25.3.tar.gz.

File metadata

  • Download URL: pgqueuer-0.25.3.tar.gz
  • Upload date:
  • Size: 322.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for pgqueuer-0.25.3.tar.gz
Algorithm Hash digest
SHA256 542f109073ed7266dfca890dda0dc234b980d1ccd8e9a3bd18fd1a6580c1a7e4
MD5 a24cea2cfc1903021b425423ea201721
BLAKE2b-256 db160b7e68597d10110283bf98679aa72139ab43415215f1adc7fa03035373ab

See more details on using hashes here.

Provenance

The following attestation bundles were made for pgqueuer-0.25.3.tar.gz:

Publisher: release.yml on janbjorge/pgqueuer

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pgqueuer-0.25.3-py3-none-any.whl.

File metadata

  • Download URL: pgqueuer-0.25.3-py3-none-any.whl
  • Upload date:
  • Size: 69.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for pgqueuer-0.25.3-py3-none-any.whl
Algorithm Hash digest
SHA256 0dc29f95d6eec6809d54b954185239c9f9fd2af8567493beaae8245bf1f768c3
MD5 e1f06dd824497d4c3ea490acb00d13bc
BLAKE2b-256 357d450c3a013c47b517c99bd76ee05c0a5de2acae14065ffa40c3fa7c199ff3

See more details on using hashes here.

Provenance

The following attestation bundles were made for pgqueuer-0.25.3-py3-none-any.whl:

Publisher: release.yml on janbjorge/pgqueuer

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page