Litestar integration for SAQ
Project description
Litestar SAQ
Installation
pip install litestar-saq
For OpenTelemetry support:
pip install litestar-saq[otel]
Usage
Here is a basic application that demonstrates how to use the plugin.
from litestar import Litestar
from litestar_saq import QueueConfig, SAQConfig, SAQPlugin
saq = SAQPlugin(
config=SAQConfig(
use_server_lifespan=True,
queue_configs=[
QueueConfig(name="samples", dsn="redis://localhost:6379/0")
],
)
)
app = Litestar(plugins=[saq])
You can start a background worker with the following command now:
litestar --app-dir=examples/ --app basic:app workers run
Using Litestar app from env: 'basic:app'
Starting SAQ Workers ──────────────────────────────────────────────────────────────────
INFO - 2023-10-04 17:39:03,255 - saq - worker - Worker starting: Queue<redis=Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>, name='samples'>
INFO - 2023-10-04 17:39:06,545 - saq - worker - Worker shutting down
You can also start the process for only specific queues. This is helpful if you want separated processes working on different queues instead of combining them.
litestar --app-dir=examples/ --app basic:app workers run --queues sample
Using Litestar app from env: 'basic:app'
Starting SAQ Workers ──────────────────────────────────────────────────────────────────
INFO - 2023-10-04 17:39:03,255 - saq - worker - Worker starting: Queue<redis=Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>, name='samples'>
INFO - 2023-10-04 17:39:06,545 - saq - worker - Worker shutting down
If you are starting the process for only specific queues and still want to read from the other queues or enqueue a task into another queue that was not initialized in your worker or is found somewhere else, you can do so like here
import os
from saq import Queue
def get_queue_directly(queue_name: str, redis_url: str) -> Queue:
return Queue.from_url(redis_url, name=queue_name)
redis_url = os.getenv("REDIS_URL")
queue = get_queue_directly("queue-in-other-process", redis_url)
# Get queue info
info = await queue.info(jobs=True)
# Enqueue new task
await queue.enqueue("task_name", arg1="value1")
Monitored Jobs
For long-running tasks, use the monitored_job decorator to automatically send heartbeats and prevent SAQ from marking jobs as stuck:
from litestar_saq import monitored_job
@monitored_job() # Auto-calculates interval from job.heartbeat
async def long_running_task(ctx):
await process_large_dataset()
return {"status": "complete"}
@monitored_job(interval=30.0) # Explicit 30-second interval
async def train_model(ctx, model_id: str):
for epoch in range(100):
await train_epoch(model)
return {"model_id": model_id}
OpenTelemetry Integration
litestar-saq supports optional OpenTelemetry instrumentation for distributed tracing.
Configuration
from litestar_saq import SAQConfig, QueueConfig
config = SAQConfig(
queue_configs=[QueueConfig(dsn="redis://localhost:6379/0")],
enable_otel=None, # Auto-detect (default) - enabled if OTEL installed AND Litestar OpenTelemetryPlugin is active
# enable_otel=True, # Force enable (raises error if not installed)
# enable_otel=False, # Force disable
)
When enabled, the plugin creates:
- CONSUMER spans for job processing
- PRODUCER spans for job enqueue operations
- Automatic context propagation across process boundaries
Spans follow OpenTelemetry messaging semantic conventions with messaging.system = "saq".
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file litestar_saq-0.7.1.tar.gz.
File metadata
- Download URL: litestar_saq-0.7.1.tar.gz
- Upload date:
- Size: 250.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
65a1acd4680671d566e40986a6521c85dab5619a3522ca17740862cc6e592e04
|
|
| MD5 |
8c728aad97d4f9f3e140c43112cdd216
|
|
| BLAKE2b-256 |
4a86cf24b0482ee4a44352fd9eeb8d14f0f908813ac5e30e52cc4f6f74ecdac6
|
Provenance
The following attestation bundles were made for litestar_saq-0.7.1.tar.gz:
Publisher:
publish.yaml on litestar-org/litestar-saq
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
litestar_saq-0.7.1.tar.gz -
Subject digest:
65a1acd4680671d566e40986a6521c85dab5619a3522ca17740862cc6e592e04 - Sigstore transparency entry: 1013978706
- Sigstore integration time:
-
Permalink:
litestar-org/litestar-saq@ae058a3e8eab61d27fe24b03420978e5e25ab110 -
Branch / Tag:
refs/tags/v0.7.1 - Owner: https://github.com/litestar-org
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yaml@ae058a3e8eab61d27fe24b03420978e5e25ab110 -
Trigger Event:
release
-
Statement type:
File details
Details for the file litestar_saq-0.7.1-py3-none-any.whl.
File metadata
- Download URL: litestar_saq-0.7.1-py3-none-any.whl
- Upload date:
- Size: 37.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
75292d7005ad9b68c6d7c779f55dd0a1cd40aea354ccc0b510abf19c986cbee3
|
|
| MD5 |
6b8faba2336aa3cf1d2ba0786855b731
|
|
| BLAKE2b-256 |
5fe614d1b3f304ac9373950e202c86ea8943f643489f3175e43ac0b4dc1eac94
|
Provenance
The following attestation bundles were made for litestar_saq-0.7.1-py3-none-any.whl:
Publisher:
publish.yaml on litestar-org/litestar-saq
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
litestar_saq-0.7.1-py3-none-any.whl -
Subject digest:
75292d7005ad9b68c6d7c779f55dd0a1cd40aea354ccc0b510abf19c986cbee3 - Sigstore transparency entry: 1013978746
- Sigstore integration time:
-
Permalink:
litestar-org/litestar-saq@ae058a3e8eab61d27fe24b03420978e5e25ab110 -
Branch / Tag:
refs/tags/v0.7.1 - Owner: https://github.com/litestar-org
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yaml@ae058a3e8eab61d27fe24b03420978e5e25ab110 -
Trigger Event:
release
-
Statement type: