AsyncPG and PostgreSQL integration for taskiq
Project description
TaskIQ - asyncpg
TaskIQ-pg is a plugin for taskiq that adds a new result backend and a new broker based on PostgreSQL and asyncpg.
The broker makes use of Postgres' built in LISTEN/NOTIFY functionality.
This is a fork of taskiq-psqlpy that adds a broker (because PSQLPy does not currently support LISTEN/NOTIFY).
Installation
To use this project you must have installed core taskiq library:
pip install taskiq
This project can be installed using pip:
pip install taskiq-pg
Or using poetry:
poetry add taskiq-pg
Usage
An example with the broker and result backend:
# example.py
import asyncio
from taskiq.serializers.json_serializer import JSONSerializer
from taskiq_pg import AsyncpgBroker, AsyncpgResultBackend
asyncpg_result_backend = AsyncpgResultBackend(
dsn="postgres://postgres:postgres@localhost:15432/postgres",
serializer=JSONSerializer(),
)
broker = AsyncpgBroker(
dsn="postgres://postgres:postgres@localhost:15432/postgres",
).with_result_backend(asyncpg_result_backend)
@broker.task()
async def best_task_ever() -> str:
"""Solve all problems in the world."""
await asyncio.sleep(1.0)
return "All problems are solved!"
async def main() -> None:
"""Main."""
await broker.startup()
task = await best_task_ever.kiq()
result = await task.wait_result(timeout=2)
print(result)
await broker.shutdown()
if __name__ == "__main__":
asyncio.run(main())
Run example
shell 1: start a worker
$ taskiq worker example:broker
[2025-01-06 11:48:14,171][taskiq.worker][INFO ][MainProcess] Pid of a main process: 80434
[2025-01-06 11:48:14,171][taskiq.worker][INFO ][MainProcess] Starting 2 worker processes.
[2025-01-06 11:48:14,175][taskiq.process-manager][INFO ][MainProcess] Started process worker-0 with pid 80436
[2025-01-06 11:48:14,176][taskiq.process-manager][INFO ][MainProcess] Started process worker-1 with pid 80437
shell 2: run the example script
$ python example.py
is_err=False log=None return_value='All problems are solved!' execution_time=1.0 labels={} error=None
Details
The result backend stores the data as raw bytes by default, you can decode them in SQL:
select convert_from(result, 'UTF8') from taskiq_results;
-- Example results:
-- - success:
-- {
-- "is_err": false,
-- "log": null,
-- "return_value": "All problems are solved!",
-- "execution_time": 1.0,
-- "labels": {},
-- "error": null
-- }
-- - failure:
-- {
-- "is_err": true,
-- "log": null,
-- "return_value": null,
-- "execution_time": 10.0,
-- "labels": {},
-- "error": {
-- "exc_type": "ValueError",
-- "exc_message": ["Borked"],
-- "exc_module": "builtins",
-- "exc_cause": null,
-- "exc_context": null,
-- "exc_suppress_context": false
-- }
-- }
AsyncpgResultBackend configuration
dsn: connection string to PostgreSQL.keep_results: flag to not remove results from Redis after reading.table_name: name of the table in PostgreSQL to store TaskIQ results.field_for_task_id: type of a field fortask_id, you may need it if you want to have length of task_id more than 255 symbols.**connect_kwargs: additional connection parameters, you can read more about it in asyncpg repository.
AsyncpgBroker configuration
dsn: Connection string to PostgreSQL.result_backend: Custom result backend.task_id_generator: Custom task_id generator.channel_name: Name of the channel to listen on.table_name: Name of the table to store messages.max_retry_attempts: Maximum number of message processing attempts.connection_kwargs: Additional arguments for asyncpg connection.pool_kwargs: Additional arguments for asyncpg pool creation.
Acknowledgements
Builds on work from pgmq.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file taskiq_pg-0.2.0.tar.gz.
File metadata
- Download URL: taskiq_pg-0.2.0.tar.gz
- Upload date:
- Size: 9.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8106d089278c31660b9e8372bdb165354f2bb711595861b5bb3211ff0fd8401c
|
|
| MD5 |
85e22919afcf786fbb6d4f176c88e2f3
|
|
| BLAKE2b-256 |
c051865dea9b32c27ee09c2567cffeec12efec9702fe0a0db0abe886cba9e3ac
|
Provenance
The following attestation bundles were made for taskiq_pg-0.2.0.tar.gz:
Publisher:
publish.yaml on karoo-ca/taskiq-pg
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
taskiq_pg-0.2.0.tar.gz -
Subject digest:
8106d089278c31660b9e8372bdb165354f2bb711595861b5bb3211ff0fd8401c - Sigstore transparency entry: 178142399
- Sigstore integration time:
-
Permalink:
karoo-ca/taskiq-pg@2a1cbb80f7cef2aa21ab5aa37ddf032591b8a0bc -
Branch / Tag:
refs/tags/v0.2.0 - Owner: https://github.com/karoo-ca
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yaml@2a1cbb80f7cef2aa21ab5aa37ddf032591b8a0bc -
Trigger Event:
release
-
Statement type:
File details
Details for the file taskiq_pg-0.2.0-py3-none-any.whl.
File metadata
- Download URL: taskiq_pg-0.2.0-py3-none-any.whl
- Upload date:
- Size: 9.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
14fa3d38a21c07e21594d4e47b1bc40103e5f96c7c6f349d9b30cd929139ca5b
|
|
| MD5 |
f1f3735ba1909f3ee27899d4c9b24298
|
|
| BLAKE2b-256 |
c03a9c456b86975a19f2a8ca5c9fdca74029a98d10c231c54099519012346af8
|
Provenance
The following attestation bundles were made for taskiq_pg-0.2.0-py3-none-any.whl:
Publisher:
publish.yaml on karoo-ca/taskiq-pg
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
taskiq_pg-0.2.0-py3-none-any.whl -
Subject digest:
14fa3d38a21c07e21594d4e47b1bc40103e5f96c7c6f349d9b30cd929139ca5b - Sigstore transparency entry: 178142409
- Sigstore integration time:
-
Permalink:
karoo-ca/taskiq-pg@2a1cbb80f7cef2aa21ab5aa37ddf032591b8a0bc -
Branch / Tag:
refs/tags/v0.2.0 - Owner: https://github.com/karoo-ca
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yaml@2a1cbb80f7cef2aa21ab5aa37ddf032591b8a0bc -
Trigger Event:
release
-
Statement type: