Consumer utility for AMQP
Project description
aioamqp-consumer-best
Usage
import asyncio
from typing import List
from aioamqp_consumer_best import (
ConnectionParams,
Consumer,
Exchange,
Message,
ProcessBulk,
Queue,
QueueBinding,
ToBulks,
load_json,
)
async def callback(messages: List[Message]) -> None:
print(messages)
consumer = Consumer(
middleware=(
load_json
| ToBulks(max_bulk_size=10, bulk_timeout=3.0)
| ProcessBulk(callback)
),
prefetch_count=10,
queue=Queue(
name='test-queue',
bindings=[
QueueBinding(
exchange=Exchange('test-exchange'),
routing_key='test-routing-key',
),
],
),
connection_params=[ # Round robin
ConnectionParams(),
ConnectionParams.from_string('amqp://user@rmq-host:5672/'),
],
)
asyncio.run(consumer.start())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Close
Hashes for aioamqp-consumer-best-2.2.0.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | adedb8ef28c158e6035834d82800feb3fc087b15ba32a23ad3d533489b5495ed |
|
MD5 | c5549adb138a698adbfbc039e4f78132 |
|
BLAKE2b-256 | c209fb88956cf0dff9e86e13fa75a4e2ec7e26537e8433ee38276230fcca7d0c |
Close
Hashes for aioamqp_consumer_best-2.2.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | e42fb52ecf46f23b27a2193980793d352615579ec25723506da1eb39ec46e1fa |
|
MD5 | bd464cddb80ffd27bea7eb29b23f40fe |
|
BLAKE2b-256 | 6180fe13a583ed379b4c419cf23f0c203557d80384ac09b990eacf7651f6094a |