Skip to main content

Kafka integration with asyncio.

Project description

|Build status| |Coverage|

asyncio client for kafka

AIOKafkaProducer

AIOKafkaProducer is a high-level, asynchronous message producer.

Example of AIOKafkaProducer usage:

import asyncio
from aiokafka import AIOKafkaProducer

@asyncio.coroutine
def produce(loop):
    # Just adds message to sending queue
    future = yield from producer.send('foobar', b'some_message_bytes')
    # waiting for message to be delivered
    resp = yield from future
    print("Message produced: partition {}; offset {}".format(
          resp.partition, resp.offset))
    # Also can use a helper to send and wait in 1 call
    resp = yield from producer.send_and_wait(
        'foobar', key=b'foo', value=b'bar')
    resp = yield from producer.send_and_wait(
        'foobar', b'message for partition 1', partition=1)

loop = asyncio.get_event_loop()
producer = AIOKafkaProducer(loop=loop, bootstrap_servers='localhost:9092')
loop.run_until_complete(producer.start())
loop.run_until_complete(produce(loop))
loop.run_until_complete(producer.stop())
loop.close()

AIOKafkaConsumer

AIOKafkaConsumer is a high-level, asynchronous message consumer. It interacts with the assigned kafka Group Coordinator node to allow multiple consumers to load balance consumption of topics (requires kafka >= 0.9.0.0).

Example of AIOKafkaConsumer usage:

import asyncio
from kafka.common import KafkaError
from aiokafka import AIOKafkaConsumer

@asyncio.coroutine
def consume_task(consumer):
    while True:
        try:
            msg = yield from consumer.getone()
            print("consumed: ", msg.topic, msg.partition, msg.offset, msg.value)
        except KafkaError as err:
            print("error while consuming message: ", err)

loop = asyncio.get_event_loop()
consumer = AIOKafkaConsumer(
    'topic1', 'topic2', loop=loop, bootstrap_servers='localhost:1234')
loop.run_until_complete(consumer.start())
c_task = loop.create_task(consume_task(consumer))
try:
    loop.run_forever()
finally:
    loop.run_until_complete(consumer.stop())
    c_task.cancel()
    loop.close()

Running tests

Docker is required to run tests. See https://docs.docker.com/engine/installation for installation notes.

Setting up tests requirements (assuming you’re within virtualenv on ubuntu 14.04+):

sudo apt-get install -y libsnappy-dev
pip install flake8 pytest pytest-cov pytest-catchlog docker-py python-snappy coveralls .

Running tests:

make cov

To run tests with a specific version of Kafka (default one is 0.9.0.1) use KAFKA_VERSION variable:

make cov KAFKA_VERSION=0.8.2.1

CHANGES

0.1.0 (2016-04-15)

Initial release

Added full support for Kafka 9.0. Older Kafka versions are not tested.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aiokafka-0.1.0.tar.gz (47.0 kB view details)

Uploaded Source

File details

Details for the file aiokafka-0.1.0.tar.gz.

File metadata

  • Download URL: aiokafka-0.1.0.tar.gz
  • Upload date:
  • Size: 47.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No

File hashes

Hashes for aiokafka-0.1.0.tar.gz
Algorithm Hash digest
SHA256 3cc67687e7cc5a58db2eecfe5cf198e4546a24008358cd1e914791874c4f26c0
MD5 9c4f9ee607a7421c743fb532cc76835e
BLAKE2b-256 0cde2055b80402c1c87dc0d3aa015a13047800b9d0a132e32d0e9ed63f251c9e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page