Skip to main content

Library to deal with RabbitMQ and Google PubSub

Project description

Library to deal with RabbitMQ and Google PubSub

Usage

QueueManager class

>>> from queue_manager.queue_manager import QueueManager
>>> conn_params = {
...     'host': '',
...     'port': '',
...     'username': '',
...     'password': ''
... }
>>> # or use multiple urls
>>> conn_params = ('amqp://host1', 'amqp://host2',)
qm = QueueManager(conn_params)
>>> qm.push('hello', 'Hello from QueueManager')
True
>>> qm.pop('hello')
Hello from QueueManager
>>> del(qm)

RabbitMqPublisher class

from queue_manager.rabbitmq_consumer import RabbitMqPublisher

producer = RabbitMqPublisher('amqp://username:password@hostname:port',
                             'exchange', 'exchange_type', 'queue_name', 'routing_key')

producer.publish_message('hello')
# or passing message property
producer.publish_message('hello', dict(priority=8))

RabbitMqConsumer class

This class is an async consumer class, that still connected.

Inspired on: asynchronous_consumer_example

single_url = 'amqp://username:password@hostname:port'
# or multiple urls
multiple_urls = ('amqp://username:password@hostnameone:port', 'amqp://username:password@hostnametwo:port')
consumer = RabbitMqConsumer(single_url, queue='queue_name')

try:
    def callback(body):
        print("message body", body)

    consumer.run(callback)
except KeyboardInterrupt:
    consumer.stop()

TornadoConsumer class

from tornado.ioloop import IOLoop
from queue_manager.tornado_consumer import TornadoConsumer

consumer = TornadoConsumer('amqp://username:password@hostname:port',
                           'exchange', 'exchange_type', 'queue_name', 'routing_key')


def callback(body):
    print("message body", body)

consumer.run(callback)
IOLoop.instance().start()

PubsubConsumer class

consumer = PubsubConsumer('project_id', 'path/to/sa.json', 'subscription_name', 'topic_name')

def callback(message):
    print("message", message)

try:
    consumer.start_listening(callback)
except KeyboardInterrupt:
    consumer.stop()

Running tests with tox

Install tox

$ pip install tox

Run tests

tox

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

PyQueueManager-1.12.1.tar.gz (8.1 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page