Skip to main content

Python client for the PGMQ Postgres extension.

Project description

Python Client for PGMQ PyPI Downloads

Installation

Install with pip from pypi.org:

pip install pgmq

To use the async version, install with the optional dependencies:

pip install pgmq[async]

Dependencies:

Usage

Start a Postgres Instance with the PGMQ extension installed

docker run -d --name pgmq-postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 ghcr.io/pgmq/pg18-pgmq:v1.10.0

Using Environment Variables

Set environment variables:

export PG_HOST=127.0.0.1
export PG_PORT=5432
export PG_USERNAME=postgres
export PG_PASSWORD=postgres
export PG_DATABASE=test_db

Initialize a connection to Postgres using environment variables:

from pgmq import PGMQueue, Message

queue = PGMQueue()

Note on the async version

Initialization for the async version requires an explicit call of the initializer:

from pgmq.async_queue import PGMQueue

async def main():
    queue = PGMQueue()
    await queue.init()

Then, the interface is exactly the same as the sync version.

Initialize a connection to Postgres without environment variables

from pgmq import PGMQueue, Message

queue = PGMQueue(
    host="0.0.0.0",
    port="5432",
    username="postgres",
    password="postgres",
    database="postgres"
)

Create a queue

queue.create_queue("my_queue")

Or create a partitioned queue

queue.create_partitioned_queue("my_partitioned_queue", partition_interval=10000)

List all queues

queues = queue.list_queues()
for q in queues:
    print(f"Queue name: {q}")

Send a message

msg_id: int = queue.send("my_queue", {"hello": "world"})

Send a batch of messages

msg_ids: list[int] = queue.send_batch("my_queue", [{"hello": "world"}, {"foo": "bar"}])

Read a message, set it invisible for 30 seconds

read_message: Message = queue.read("my_queue", vt=30)
print(read_message)

Read a batch of messages

read_messages: list[Message] = queue.read_batch("my_queue", vt=30, batch_size=5)
for message in read_messages:
    print(message)

Read messages with polling

The read_with_poll method allows you to repeatedly check for messages in the queue until either a message is found or the specified polling duration is exceeded. This can be useful in scenarios where you want to wait for new messages to arrive without continuously querying the queue in a tight loop.

In the following example, the method will check for up to 5 messages in the queue my_queue, making the messages invisible for 30 seconds (vt), and will poll for a maximum of 5 seconds (max_poll_seconds) with intervals of 100 milliseconds (poll_interval_ms) between checks.

read_messages: list[Message] = queue.read_with_poll(
    "my_queue", vt=30, qty=5, max_poll_seconds=5, poll_interval_ms=100
)
for message in read_messages:
    print(message)

This method will continue polling until it retrieves any messages, with a maximum of (qty) messages in a single poll, or until the max_poll_seconds duration is reached. The poll_interval_ms parameter controls the interval between successive polls, allowing you to avoid hammering the database with continuous queries.

Archive the message after we're done with it

Archived messages are moved to an archive table.

archived: bool = queue.archive("my_queue", read_message.msg_id)

Archive a batch of messages

archived_ids: list[int] = queue.archive_batch("my_queue", [msg_id1, msg_id2])

Delete a message completely

read_message: Message = queue.read("my_queue")
deleted: bool = queue.delete("my_queue", read_message.msg_id)

Delete a batch of messages

deleted_ids: list[int] = queue.delete_batch("my_queue", [msg_id1, msg_id2])

Set the visibility timeout (VT) for a specific message

updated_message: Message = queue.set_vt("my_queue", msg_id, 60)
print(updated_message)

Pop a message, deleting it and reading it in one transaction

popped_message: Message = queue.pop("my_queue")
print(popped_message)

Purge all messages from a queue

purged_count: int = queue.purge("my_queue")
print(f"Purged {purged_count} messages from the queue.")

Detach an archive from a queue

queue.detach_archive("my_queue")

Drop a queue

dropped: bool = queue.drop_queue("my_queue")
print(f"Queue dropped: {dropped}")

Validate the length of a queue name

queue.validate_queue_name("my_queue")

Get queue metrics

The metrics method retrieves various statistics for a specific queue, such as the queue length, the age of the newest and oldest messages, the total number of messages, and the time of the metrics scrape.

metrics = queue.metrics("my_queue")
print(f"Metrics: {metrics}")

Access individual metrics

You can access individual metrics directly from the metrics method's return value:

metrics = queue.metrics("my_queue")
print(f"Queue name: {metrics.queue_name}")
print(f"Queue length: {metrics.queue_length}")
print(f"Newest message age (seconds): {metrics.newest_msg_age_sec}")
print(f"Oldest message age (seconds): {metrics.oldest_msg_age_sec}")
print(f"Total messages: {metrics.total_messages}")
print(f"Scrape time: {metrics.scrape_time}")

Get metrics for all queues

The metrics_all method retrieves metrics for all queues, allowing you to iterate through each queue's metrics.

all_metrics = queue.metrics_all()
for metrics in all_metrics:
    print(f"Queue name: {metrics.queue_name}")
    print(f"Queue length: {metrics.queue_length}")
    print(f"Newest message age (seconds): {metrics.newest_msg_age_sec}")
    print(f"Oldest message age (seconds): {metrics.oldest_msg_age_sec}")
    print(f"Total messages: {metrics.total_messages}")
    print(f"Scrape time: {metrics.scrape_time}")

Optional Logging Configuration

You can enable verbose logging and specify a custom log filename.

queue = PGMQueue(
    host="0.0.0.0",
    port="5432",
    username="postgres",
    password="postgres",
    database="postgres",
    verbose=True,
    log_filename="my_custom_log.log"
)

Using Transactions

To perform multiple operations within a single transaction, use the @transaction decorator from the pgmq.decorators module. This ensures that all operations within the function are executed within the same transaction and are either committed together or rolled back if an error occurs.

First, import the transaction decorator:

from pgmq.decorators import transaction

Example: Transactional Operation

@transaction
def transactional_operation(queue: PGMQueue, conn=None):
    # Perform multiple queue operations within a transaction
    queue.create_queue("transactional_queue", conn=conn)
    queue.send("transactional_queue", {"message": "Hello, World!"}, conn=conn)

To execute the transaction:

try:
    transactional_operation(queue)
except Exception as e:
    print(f"Transaction failed: {e}")

In this example, the transactional_operation function is decorated with @transaction, ensuring all operations inside it are part of a single transaction. If an error occurs, the entire transaction is rolled back automatically.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pgmq-1.0.6.tar.gz (14.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pgmq-1.0.6-py3-none-any.whl (17.1 kB view details)

Uploaded Python 3

File details

Details for the file pgmq-1.0.6.tar.gz.

File metadata

  • Download URL: pgmq-1.0.6.tar.gz
  • Upload date:
  • Size: 14.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.10.5 {"installer":{"name":"uv","version":"0.10.5","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for pgmq-1.0.6.tar.gz
Algorithm Hash digest
SHA256 966aa20cf835749cd50d5387dc9677d1009331d9e5d4f615bdf2047c55bddbb2
MD5 93a5f002990d01b3a690ad4c906cb12f
BLAKE2b-256 7333ac965dde305ef876ca0463a79d18b938f23f19f595001861fbd915a857ee

See more details on using hashes here.

File details

Details for the file pgmq-1.0.6-py3-none-any.whl.

File metadata

  • Download URL: pgmq-1.0.6-py3-none-any.whl
  • Upload date:
  • Size: 17.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.10.5 {"installer":{"name":"uv","version":"0.10.5","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for pgmq-1.0.6-py3-none-any.whl
Algorithm Hash digest
SHA256 a1c3fa4d083ea7f0ef1471f3cbf015340154cb5b0eb3ca1746c11ac1706ea901
MD5 beb9d8a0c5767f2f83039e192621214b
BLAKE2b-256 40d8107e9d0cecc33ea2fcaa80a9e51ceae8405702d630cbed2115bda6805014

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page