Skip to main content

Rejected is a Python RabbitMQ Consumer Framework and Controller Daemon

Project description

Rejected is a AMQP consumer daemon and message processing framework. It allows for rapid development of message processing consumers by handling all of the core functionality of communicating with RabbitMQ and management of consumer processes.

Rejected runs as a master process with multiple consumer configurations that are each run it an isolated process. It has the ability to collect statistical data from the consumer processes and report on it.

Version Downloads Status License

Features

  • Automatic exception handling including connection management and consumer restarting

  • Smart consumer classes that can automatically decode and deserialize message bodies based upon message headers

  • Metrics logging and submission to statsd

  • Built-in profiling of consumer code

  • Ability to write asynchronous code in consumers allowing for parallel communication with external resources

Documentation

https://rejected.readthedocs.org

Example Consumers

from rejected import consumer
import logging

LOGGER = logging.getLogger(__name__)


class Test(consumer.Consumer):
    def process(self, message):
        LOGGER.debug('In Test.process: %s' % message.body)

Async Consumer

To make a consumer async, you can decorate the Consumer.prepare and Consumer.process methods using Tornado’s @gen.coroutine. Asynchronous consumers to handle multiple messages in the same process, but rather allow you to use asynchronous clients like Tornado’s AsyncHTTPClient and the Queries PostgreSQL library to perform parallel tasks using coroutines.

import logging

from rejected import consumer

from tornado import gen
from tornado import httpclient


class AsyncExampleConsumer(consumer.Consumer):

    @gen.coroutine
    def process(self):
        LOGGER.debug('Message: %r', self.body)
        http_client = httpclient.AsyncHTTPClient()
        results = yield [http_client.fetch('http://www.github.com'),
                         http_client.fetch('http://www.reddit.com')]
        LOGGER.info('Length: %r', [len(r.body) for r in results])

Example Configuration

%YAML 1.2
---
Application:
  poll_interval: 10.0
  log_stats: True
  statsd:
    enabled: True
    host: localhost
    port: 8125
    prefix: applications.rejected
  Connections:
    rabbitmq:
      host: localhost
      port: 5672
      user: guest
      pass: guest
      ssl: False
      vhost: /
      heartbeat_interval: 300
  Consumers:
    example:
      consumer: rejected.example.Consumer
      connections: [rabbitmq]
      qty: 2
      queue: generated_messages
      qos_prefetch: 100
      ack: True
      max_errors: 100
      config:
        foo: True
        bar: baz

 Daemon:
   user: rejected
   group: daemon
   pidfile: /var/run/rejected/example.%(pid)s.pid

 Logging:
   version: 1
   formatters:
     verbose:
       format: "%(levelname) -10s %(asctime)s %(process)-6d %(processName) -15s %(name) -25s %(funcName) -20s: %(message)s"
       datefmt: "%Y-%m-%d %H:%M:%S"
     syslog:
       format: "%(levelname)s <PID %(process)d:%(processName)s> %(name)s.%(funcName)s(): %(message)s"
   filters: []
   handlers:
     console:
       class: logging.StreamHandler
       formatter: verbose
       debug_only: true
     syslog:
       class: logging.handlers.SysLogHandler
       facility: local6
       address: /var/run/syslog
       #address: /dev/log
       formatter: syslog
   loggers:
     my_consumer:
       level: INFO
       propagate: true
       handlers: [console, syslog]
     rejected:
       level: INFO
       propagate: true
       handlers: [console, syslog]
     tornado:
       level: INFO
       propagate: true
       handlers: [console, syslog]
     urllib3:
       level: ERROR
       propagate: true
   disable_existing_loggers: false
   incremental: false

Version History

Available at https://rejected.readthedocs.org/en/latest/history.html

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rejected-3.7.1.tar.gz (28.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rejected-3.7.1-py2-none-any.whl (34.9 kB view details)

Uploaded Python 2

File details

Details for the file rejected-3.7.1.tar.gz.

File metadata

  • Download URL: rejected-3.7.1.tar.gz
  • Upload date:
  • Size: 28.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No

File hashes

Hashes for rejected-3.7.1.tar.gz
Algorithm Hash digest
SHA256 9d9dd84fe7f9715ae758b722fd16821bcfcc12be1bf02b669a9188316857bb1b
MD5 d46e7e94b31df3bbe696f5c0946e087b
BLAKE2b-256 dcf2e9a81383b71ec0233e7c97691b9c010c9b0811b3dc773390d3b587801052

See more details on using hashes here.

File details

Details for the file rejected-3.7.1-py2-none-any.whl.

File metadata

File hashes

Hashes for rejected-3.7.1-py2-none-any.whl
Algorithm Hash digest
SHA256 4c5e6753841ff661443a5e410cf5bc407afcc54ddc33a3a79b1dcfb107c7c1c5
MD5 0d01244746f19bd6d815d0017f3ad6bb
BLAKE2b-256 fb76c69bff6234fbe3a0942c9645ac0ca100528843dd6c9dcaad3d94ec18145a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page