Pgqueuer is a Python library leveraging PostgreSQL for efficient job queuing.
Project description
🚀 PGQueuer – PostgreSQL‑powered job queues for Python
📚 Documentation · 💻 Source · 💬 Discord
PGQueuer turns your PostgreSQL database into a fast, reliable background job processor. Jobs live in the same database as your application data, so you scale without adding new infrastructure. PGQueuer uses PostgreSQL features like LISTEN/NOTIFY and FOR UPDATE SKIP LOCKED to keep workers coordinated and throughput high.
Features
- 💡 Minimal integration: PGQueuer is a single Python package. Bring your existing PostgreSQL connection and start enqueueing jobs—no extra services to deploy.
- ⚛️ PostgreSQL-powered concurrency: Workers lease jobs with
FOR UPDATE SKIP LOCKED, allowing many processes to pull from the same queue without stepping on each other. - 🚧 Instant notifications:
LISTEN/NOTIFYwakes idle workers as soon as a job arrives. A periodic poll backs it up for robustness. - 👨🎓 Batch friendly: Designed for throughput; enqueue or acknowledge thousands of jobs per round trip.
- ⏳ Scheduling & graceful shutdown: Register cron-like recurring jobs and let PGQueuer stop workers cleanly when your service exits.
Installation
PGQueuer targets Python 3.11+ and any PostgreSQL 12+ server. Install the package and initialize the database schema with the CLI:
pip install pgqueuer
pgq install # create tables and functions in your database
The CLI reads PGHOST, PGUSER, PGDATABASE and friends to know where to install. Use pgq install --dry-run to preview SQL or pgq uninstall to remove the structure when you're done experimenting.
Quick Start
The API revolves around consumers that process jobs and producers that enqueue them. Follow the steps below to see both sides.
1. Create a consumer
The consumer declares entrypoints and scheduled tasks. PGQueuer wires these up to the worker process that polls for jobs.
from datetime import datetime
import asyncpg
from pgqueuer import PgQueuer
from pgqueuer.db import AsyncpgDriver
from pgqueuer.models import Job, Schedule
async def main() -> PgQueuer:
conn = await asyncpg.connect()
driver = AsyncpgDriver(conn)
pgq = PgQueuer(driver)
@pgq.entrypoint("fetch")
async def process(job: Job) -> None:
print(f"Processed: {job!r}")
@pgq.schedule("every_minute", "* * * * *")
async def every_minute(schedule: Schedule) -> None:
print(f"Ran at {datetime.now():%H:%M:%S}")
return pgq
Run the consumer with the CLI so it begins listening for work:
pgq run examples.consumer:main
2. Enqueue jobs
In a separate process, create a Queries object and enqueue jobs targeted at your entrypoint:
import asyncpg
from pgqueuer.db import AsyncpgDriver
from pgqueuer.queries import Queries
async def main() -> None:
conn = await asyncpg.connect()
driver = AsyncpgDriver(conn)
q = Queries(driver)
await q.enqueue(["fetch"], [b"hello world"], [0])
Run the producer:
python examples/producer.py
Monitor your queues
pgq dashboard --interval 10 --tail 25 --table-format grid
The dashboard is an interactive terminal UI that refreshes periodically. Use it to watch queue depth, processing times and job statuses in real time.
Example output:
+---------------------------+-------+------------+--------------------------+------------+----------+
| Created | Count | Entrypoint | Time in Queue (HH:MM:SS) | Status | Priority |
+---------------------------+-------+------------+--------------------------+------------+----------+
| 2024-05-05 16:44:26+00:00 | 49 | sync | 0:00:01 | successful | 0 |
...
+---------------------------+-------+------------+--------------------------+------------+----------+
Drivers
PGQueuer works with both async and sync PostgreSQL drivers:
- AsyncpgDriver / AsyncpgPoolDriver – integrate with
asyncpgconnections or pools for asyncio applications. - PsycopgDriver – built on psycopg's async interface for apps already using psycopg 3.
- SyncPsycopgDriver – a thin wrapper around psycopg's blocking API so traditional scripts can enqueue work.
See the driver documentation for details.
Development and Testing
PGQueuer uses Testcontainers to launch an ephemeral PostgreSQL instance automatically for the integration test suite—no manual Docker Compose setup or pre‑provisioned database required. Just ensure Docker (or another supported container runtime) is running locally.
Typical development workflow:
- Install dependencies (including extras):
uv sync --all-extras - Run lint & type checks:
uv ruff check .anduv run mypy . - Run the test suite (will start/stop a disposable PostgreSQL container automatically):
uv run pytest - (Optional) Aggregate target (if you prefer the Makefile):
make check
The containerized database lifecycle is fully automatic; tests handle creation, migrations, and teardown. This keeps your local environment clean and ensures consistent, isolated runs.
License
PGQueuer is MIT licensed. See LICENSE for details.
Ready to supercharge your workflows? Install PGQueuer today and start processing jobs with the database you already trust.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pgqueuer-0.25.3.tar.gz.
File metadata
- Download URL: pgqueuer-0.25.3.tar.gz
- Upload date:
- Size: 322.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
542f109073ed7266dfca890dda0dc234b980d1ccd8e9a3bd18fd1a6580c1a7e4
|
|
| MD5 |
a24cea2cfc1903021b425423ea201721
|
|
| BLAKE2b-256 |
db160b7e68597d10110283bf98679aa72139ab43415215f1adc7fa03035373ab
|
Provenance
The following attestation bundles were made for pgqueuer-0.25.3.tar.gz:
Publisher:
release.yml on janbjorge/pgqueuer
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pgqueuer-0.25.3.tar.gz -
Subject digest:
542f109073ed7266dfca890dda0dc234b980d1ccd8e9a3bd18fd1a6580c1a7e4 - Sigstore transparency entry: 737850077
- Sigstore integration time:
-
Permalink:
janbjorge/pgqueuer@190f4763be25ce1f0d1601e6d1f41020dd0f8452 -
Branch / Tag:
refs/tags/v0.25.3 - Owner: https://github.com/janbjorge
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@190f4763be25ce1f0d1601e6d1f41020dd0f8452 -
Trigger Event:
release
-
Statement type:
File details
Details for the file pgqueuer-0.25.3-py3-none-any.whl.
File metadata
- Download URL: pgqueuer-0.25.3-py3-none-any.whl
- Upload date:
- Size: 69.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0dc29f95d6eec6809d54b954185239c9f9fd2af8567493beaae8245bf1f768c3
|
|
| MD5 |
e1f06dd824497d4c3ea490acb00d13bc
|
|
| BLAKE2b-256 |
357d450c3a013c47b517c99bd76ee05c0a5de2acae14065ffa40c3fa7c199ff3
|
Provenance
The following attestation bundles were made for pgqueuer-0.25.3-py3-none-any.whl:
Publisher:
release.yml on janbjorge/pgqueuer
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pgqueuer-0.25.3-py3-none-any.whl -
Subject digest:
0dc29f95d6eec6809d54b954185239c9f9fd2af8567493beaae8245bf1f768c3 - Sigstore transparency entry: 737850078
- Sigstore integration time:
-
Permalink:
janbjorge/pgqueuer@190f4763be25ce1f0d1601e6d1f41020dd0f8452 -
Branch / Tag:
refs/tags/v0.25.3 - Owner: https://github.com/janbjorge
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@190f4763be25ce1f0d1601e6d1f41020dd0f8452 -
Trigger Event:
release
-
Statement type: