Skip to main content

Litestar integration for SAQ

Project description

Litestar SAQ

Installation

pip install litestar-saq

For OpenTelemetry support:

pip install litestar-saq[otel]

Usage

Here is a basic application that demonstrates how to use the plugin.

from litestar import Litestar
from litestar_saq import QueueConfig, SAQConfig, SAQPlugin

saq = SAQPlugin(
    config=SAQConfig(
        use_server_lifespan=True,
        queue_configs=[
            QueueConfig(name="samples", dsn="redis://localhost:6379/0")
        ],
    )
)
app = Litestar(plugins=[saq])

You can start a background worker with the following command now:

litestar --app-dir=examples/ --app basic:app workers run
Using Litestar app from env: 'basic:app'
Starting SAQ Workers ──────────────────────────────────────────────────────────────────
INFO - 2023-10-04 17:39:03,255 - saq - worker - Worker starting: Queue<redis=Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>, name='samples'>
INFO - 2023-10-04 17:39:06,545 - saq - worker - Worker shutting down

You can also start the process for only specific queues. This is helpful if you want separated processes working on different queues instead of combining them.

litestar --app-dir=examples/ --app basic:app workers run --queues sample
Using Litestar app from env: 'basic:app'
Starting SAQ Workers ──────────────────────────────────────────────────────────────────
INFO - 2023-10-04 17:39:03,255 - saq - worker - Worker starting: Queue<redis=Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>, name='samples'>
INFO - 2023-10-04 17:39:06,545 - saq - worker - Worker shutting down

If you are starting the process for only specific queues and still want to read from the other queues or enqueue a task into another queue that was not initialized in your worker or is found somewhere else, you can do so like here

import os
from saq import Queue

def get_queue_directly(queue_name: str, redis_url: str) -> Queue:
    return Queue.from_url(redis_url, name=queue_name)

redis_url = os.getenv("REDIS_URL")
queue = get_queue_directly("queue-in-other-process", redis_url)

# Get queue info
info = await queue.info(jobs=True)

# Enqueue new task
await queue.enqueue("task_name", arg1="value1")

Monitored Jobs

For long-running tasks, use the monitored_job decorator to automatically send heartbeats and prevent SAQ from marking jobs as stuck:

from litestar_saq import monitored_job

@monitored_job()  # Auto-calculates interval from job.heartbeat
async def long_running_task(ctx):
    await process_large_dataset()
    return {"status": "complete"}

@monitored_job(interval=30.0)  # Explicit 30-second interval
async def train_model(ctx, model_id: str):
    for epoch in range(100):
        await train_epoch(model)
    return {"model_id": model_id}

OpenTelemetry Integration

litestar-saq supports optional OpenTelemetry instrumentation for distributed tracing.

Configuration

from litestar_saq import SAQConfig, QueueConfig

config = SAQConfig(
    queue_configs=[QueueConfig(dsn="redis://localhost:6379/0")],
    enable_otel=None,  # Auto-detect (default) - enabled if OTEL installed AND Litestar OpenTelemetryPlugin is active
    # enable_otel=True,  # Force enable (raises error if not installed)
    # enable_otel=False,  # Force disable
)

When enabled, the plugin creates:

  • CONSUMER spans for job processing
  • PRODUCER spans for job enqueue operations
  • Automatic context propagation across process boundaries

Spans follow OpenTelemetry messaging semantic conventions with messaging.system = "saq".

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

litestar_saq-0.7.1.tar.gz (250.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

litestar_saq-0.7.1-py3-none-any.whl (37.4 kB view details)

Uploaded Python 3

File details

Details for the file litestar_saq-0.7.1.tar.gz.

File metadata

  • Download URL: litestar_saq-0.7.1.tar.gz
  • Upload date:
  • Size: 250.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for litestar_saq-0.7.1.tar.gz
Algorithm Hash digest
SHA256 65a1acd4680671d566e40986a6521c85dab5619a3522ca17740862cc6e592e04
MD5 8c728aad97d4f9f3e140c43112cdd216
BLAKE2b-256 4a86cf24b0482ee4a44352fd9eeb8d14f0f908813ac5e30e52cc4f6f74ecdac6

See more details on using hashes here.

Provenance

The following attestation bundles were made for litestar_saq-0.7.1.tar.gz:

Publisher: publish.yaml on litestar-org/litestar-saq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file litestar_saq-0.7.1-py3-none-any.whl.

File metadata

  • Download URL: litestar_saq-0.7.1-py3-none-any.whl
  • Upload date:
  • Size: 37.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for litestar_saq-0.7.1-py3-none-any.whl
Algorithm Hash digest
SHA256 75292d7005ad9b68c6d7c779f55dd0a1cd40aea354ccc0b510abf19c986cbee3
MD5 6b8faba2336aa3cf1d2ba0786855b731
BLAKE2b-256 5fe614d1b3f304ac9373950e202c86ea8943f643489f3175e43ac0b4dc1eac94

See more details on using hashes here.

Provenance

The following attestation bundles were made for litestar_saq-0.7.1-py3-none-any.whl:

Publisher: publish.yaml on litestar-org/litestar-saq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page