JetStream Extensions is a set of utilities providing additional features to `jetstream` package in nats-py client.
Project description
NATS JetStream Extensions
JetStream Extensions is a set of utilities providing additional features to jetstream package in nats-py client.
Installation
uv add jetstreamext
Utilities
get_batch and get_last_msgs_for
get_batch and get_last_msgs_for are utilities that allow you to fetch multiple messages from a JetStream stream.
Responses are returned as async iterators, which you can iterate over using async for to receive messages.
get_batch
get_batch fetches a batch of messages from a provided stream, starting from
either the lowest matching sequence, from the provided sequence, or from the
given time. It can be configured to fetch messages from matching subject (which
may contain wildcards) and up to a maximum byte limit.
Examples:
see examples.py for a runnable version of all snippets below.
- fetching 10 messages from the beginning of the stream:
import nats
import jetstreamext
nc = await nats.connect()
js = nc.jetstream()
async for msg in jetstreamext.get_batch(js, "stream", batch=10):
print(msg.data)
- fetching 10 messages from the stream starting from sequence 100 and matching subject:
import nats
import jetstreamext
nc = await nats.connect()
js = nc.jetstream()
async for msg in jetstreamext.get_batch(js, "stream", batch=10, seq=100, subject="foo"):
print(msg.data)
- fetching 10 messages from the stream starting from time 1 hour ago:
from datetime import datetime, timedelta, timezone
import nats
import jetstreamext
nc = await nats.connect()
js = nc.jetstream()
async for msg in jetstreamext.get_batch(
js,
"stream",
batch=10,
start_time=datetime.now(timezone.utc) - timedelta(hours=1)
):
print(msg.data)
- fetching 10 messages or up to provided byte limit:
import nats
import jetstreamext
nc = await nats.connect()
js = nc.jetstream()
async for msg in jetstreamext.get_batch(js, "stream", batch=10, max_bytes=1024):
print(msg.data)
get_last_msgs_for
get_last_msgs_for fetches the last messages for the specified subjects from the specified stream. It can be optionally configured to fetch messages up to the provided sequence (or time), rather than the latest messages available. It can also be configured to fetch messages up to a provided batch size.
The provided subjects may contain wildcards, however it is important to note that the NATS server will match a maximum of 1024 subjects.
Responses are returned as async iterators, which you can iterate over using async for to receive messages.
Examples:
- fetching last messages from the stream for the provided subjects:
import nats
import jetstreamext
nc = await nats.connect()
js = nc.jetstream()
async for msg in jetstreamext.get_last_msgs_for(js, "stream", ["foo", "bar"]):
print(msg.data)
- fetching last messages from the stream for the provided subjects up to stream sequence 100:
import nats
import jetstreamext
nc = await nats.connect()
js = nc.jetstream()
async for msg in jetstreamext.get_last_msgs_for(js, "stream", ["foo", "bar"], up_to_seq=100):
print(msg.data)
- fetching last messages from the stream for the provided subjects up to time 1 hour ago:
from datetime import datetime, timedelta, timezone
import nats
import jetstreamext
nc = await nats.connect()
js = nc.jetstream()
async for msg in jetstreamext.get_last_msgs_for(
js,
"stream",
["foo", "bar"],
up_to_time=datetime.now(timezone.utc) - timedelta(hours=1)
):
print(msg.data)
- fetching last messages from the stream for the provided subjects up to a batch size of 10:
import nats
import jetstreamext
nc = await nats.connect()
js = nc.jetstream()
async for msg in jetstreamext.get_last_msgs_for(js, "stream", ["foo.*"], batch=10):
print(msg.data)
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file jetstreamext-0.4.0.tar.gz.
File metadata
- Download URL: jetstreamext-0.4.0.tar.gz
- Upload date:
- Size: 5.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.8.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3b49ed20e626258731a0f32ce15657ff7d1dde3c146e0048cddee0c4219cf492
|
|
| MD5 |
737f21d01744a44cfb499d5c6bd44395
|
|
| BLAKE2b-256 |
bd18c04b0739b513fb578a7d7867812b080a8c1dba39e84773b7734af9f5ec29
|
File details
Details for the file jetstreamext-0.4.0-py3-none-any.whl.
File metadata
- Download URL: jetstreamext-0.4.0-py3-none-any.whl
- Upload date:
- Size: 7.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.8.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5a6fae5fd86e7d10d5d65d45eda190f437e6ba60b8ec7931d4fa197aac928dca
|
|
| MD5 |
4c845a12179a899e9d6dd80571d7f9d3
|
|
| BLAKE2b-256 |
a4d75ceaea36d15c6c1d904776d316767305e36e4993199737b2cf87698d9ee2
|