Skip to main content

AsyncPG and PostgreSQL integration for taskiq

Project description

TaskIQ - asyncpg

TaskIQ-pg is a plugin for taskiq that adds a new result backend and a new broker based on PostgreSQL and asyncpg.

The broker makes use of Postgres' built in LISTEN/NOTIFY functionality.

This is a fork of taskiq-psqlpy that adds a broker (because PSQLPy does not currently support LISTEN/NOTIFY).

Installation

To use this project you must have installed core taskiq library:

pip install taskiq

This project can be installed using pip:

pip install taskiq-pg

Or using poetry:

poetry add taskiq-pg

Usage

An example with the broker and result backend:

# example.py
import asyncio

from taskiq.serializers.json_serializer import JSONSerializer
from taskiq_pg import AsyncpgBroker, AsyncpgResultBackend

asyncpg_result_backend = AsyncpgResultBackend(
    dsn="postgres://postgres:postgres@localhost:15432/postgres",
    serializer=JSONSerializer(),
)

broker = AsyncpgBroker(
    dsn="postgres://postgres:postgres@localhost:15432/postgres",
).with_result_backend(asyncpg_result_backend)


@broker.task()
async def best_task_ever() -> str:
    """Solve all problems in the world."""
    await asyncio.sleep(1.0)
    return "All problems are solved!"


async def main() -> None:
    """Main."""
    await broker.startup()
    task = await best_task_ever.kiq()
    result = await task.wait_result(timeout=2)
    print(result)
    await broker.shutdown()


if __name__ == "__main__":
    asyncio.run(main())

Run example

shell 1: start a worker

$ taskiq worker example:broker
[2025-01-06 11:48:14,171][taskiq.worker][INFO   ][MainProcess] Pid of a main process: 80434
[2025-01-06 11:48:14,171][taskiq.worker][INFO   ][MainProcess] Starting 2 worker processes.
[2025-01-06 11:48:14,175][taskiq.process-manager][INFO   ][MainProcess] Started process worker-0 with pid 80436
[2025-01-06 11:48:14,176][taskiq.process-manager][INFO   ][MainProcess] Started process worker-1 with pid 80437

shell 2: run the example script

$ python example.py
is_err=False log=None return_value='All problems are solved!' execution_time=1.0 labels={} error=None

Details

The result backend stores the data as raw bytes by default, you can decode them in SQL:

select convert_from(result, 'UTF8') from taskiq_results;
-- Example results:
-- - success:
--   {
--     "is_err": false,
--     "log": null,
--     "return_value": "All problems are solved!",
--     "execution_time": 1.0,
--     "labels": {},
--     "error": null
--   }
-- - failure:
--   {
--     "is_err": true,
--     "log": null,
--     "return_value": null,
--     "execution_time": 10.0,
--     "labels": {},
--     "error": {
--       "exc_type": "ValueError",
--       "exc_message": ["Borked"],
--       "exc_module": "builtins",
--       "exc_cause": null,
--       "exc_context": null,
--       "exc_suppress_context": false
--     }
--   }

AsyncpgResultBackend configuration

  • dsn: connection string to PostgreSQL.
  • keep_results: flag to not remove results from Redis after reading.
  • table_name: name of the table in PostgreSQL to store TaskIQ results.
  • field_for_task_id: type of a field for task_id, you may need it if you want to have length of task_id more than 255 symbols.
  • **connect_kwargs: additional connection parameters, you can read more about it in asyncpg repository.

AsyncpgBroker configuration

  • dsn: Connection string to PostgreSQL.
  • result_backend: Custom result backend.
  • task_id_generator: Custom task_id generator.
  • channel_name: Name of the channel to listen on.
  • table_name: Name of the table to store messages.
  • max_retry_attempts: Maximum number of message processing attempts.
  • connection_kwargs: Additional arguments for asyncpg connection.
  • pool_kwargs: Additional arguments for asyncpg pool creation.

Acknowledgements

Builds on work from pgmq.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

taskiq_pg-0.2.0.tar.gz (9.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

taskiq_pg-0.2.0-py3-none-any.whl (9.3 kB view details)

Uploaded Python 3

File details

Details for the file taskiq_pg-0.2.0.tar.gz.

File metadata

  • Download URL: taskiq_pg-0.2.0.tar.gz
  • Upload date:
  • Size: 9.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for taskiq_pg-0.2.0.tar.gz
Algorithm Hash digest
SHA256 8106d089278c31660b9e8372bdb165354f2bb711595861b5bb3211ff0fd8401c
MD5 85e22919afcf786fbb6d4f176c88e2f3
BLAKE2b-256 c051865dea9b32c27ee09c2567cffeec12efec9702fe0a0db0abe886cba9e3ac

See more details on using hashes here.

Provenance

The following attestation bundles were made for taskiq_pg-0.2.0.tar.gz:

Publisher: publish.yaml on karoo-ca/taskiq-pg

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file taskiq_pg-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: taskiq_pg-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 9.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for taskiq_pg-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 14fa3d38a21c07e21594d4e47b1bc40103e5f96c7c6f349d9b30cd929139ca5b
MD5 f1f3735ba1909f3ee27899d4c9b24298
BLAKE2b-256 c03a9c456b86975a19f2a8ca5c9fdca74029a98d10c231c54099519012346af8

See more details on using hashes here.

Provenance

The following attestation bundles were made for taskiq_pg-0.2.0-py3-none-any.whl:

Publisher: publish.yaml on karoo-ca/taskiq-pg

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page