Skip to main content

Kafka integration with asyncio.

Project description

|Build status| |Coverage|

asyncio client for Kafka

AIOKafkaProducer

AIOKafkaProducer is a high-level, asynchronous message producer.

Example of AIOKafkaProducer usage:

import asyncio
from aiokafka import AIOKafkaProducer

@asyncio.coroutine
def produce(loop):
    # Just adds message to sending queue
    future = yield from producer.send('foobar', b'some_message_bytes')
    # waiting for message to be delivered
    resp = yield from future
    print("Message produced: partition {}; offset {}".format(
          resp.partition, resp.offset))
    # Also can use a helper to send and wait in 1 call
    resp = yield from producer.send_and_wait(
        'foobar', key=b'foo', value=b'bar')
    resp = yield from producer.send_and_wait(
        'foobar', b'message for partition 1', partition=1)

loop = asyncio.get_event_loop()
producer = AIOKafkaProducer(loop=loop, bootstrap_servers='localhost:9092')
# Bootstrap client, will get initial cluster metadata
loop.run_until_complete(producer.start())
loop.run_until_complete(produce(loop))
# Wait for all pending messages to be delivered or expire
loop.run_until_complete(producer.stop())
loop.close()

AIOKafkaConsumer

AIOKafkaConsumer is a high-level, asynchronous message consumer. It interacts with the assigned Kafka Group Coordinator node to allow multiple consumers to load balance consumption of topics (requires kafka >= 0.9.0.0).

Example of AIOKafkaConsumer usage:

import asyncio
from kafka.common import KafkaError
from aiokafka import AIOKafkaConsumer

@asyncio.coroutine
def consume_task(consumer):
    while True:
        try:
            msg = yield from consumer.getone()
            print("consumed: ", msg.topic, msg.partition, msg.offset,
                  msg.key, msg.value, msg.timestamp)
        except KafkaError as err:
            print("error while consuming message: ", err)

loop = asyncio.get_event_loop()
consumer = AIOKafkaConsumer(
    'topic1', 'topic2', loop=loop, bootstrap_servers='localhost:1234')
# Bootstrap client, will get initial cluster metadata
loop.run_until_complete(consumer.start())
c_task = loop.create_task(consume_task(consumer))
try:
    loop.run_forever()
finally:
    # Will gracefully leave consumer group; perform autocommit if enabled
    loop.run_until_complete(consumer.stop())
    c_task.cancel()
    loop.close()

Running tests

Docker is required to run tests. See https://docs.docker.com/engine/installation for installation notes. Also note, that lz4 compression libraries for python will require python-dev package, or python source header files for compilation on Linux.

Setting up tests requirements (assuming you’re within virtualenv on ubuntu 14.04+):

sudo apt-get install -y libsnappy-dev
make setup

Running tests:

make cov

To run tests with a specific version of Kafka (default one is 0.10.1.0) use KAFKA_VERSION variable:

make cov KAFKA_VERSION=0.10.0.0

CHANGES

0.2.1 (2017-02-19)

  • Add a check to wait topic autocreation in Consumer, instead of raising UnknownTopicOrPartitionError (PR #92 by fabregas)

  • Consumer now stops consumption after consumer.stop() call. Any new get* calls will result in ConsumerStoppedError (PR #81)

  • Added exclude_internal_topics option for Consumer (PR #111)

  • Better support for pattern subscription when used with group_id (part of PR #111)

  • Fix for Consumer subscribe and JoinGroup race condition (issue #88). Coordinator will now notice subscription changes during rebalance and will join group again. (PR #106)

  • Changed logging messages according to KAFKA-3318. Now INFO level should be less messy and more informative. (PR #110)

  • Add support for connections_max_idle_ms config (PR #113)

0.2.0 (2016-12-18)

  • Added SSL support. (PR #81 by Drizzt1991)

  • Fixed UnknownTopicOrPartitionError error on first message for autocreated topic (PR #96 by fabregas)

  • Fixed next_record recursion (PR #94 by fabregas)

  • Fixed Heartbeat fail if no consumers (PR #92 by fabregas)

  • Added docs addressing kafka-python and aiokafka differences (PR #70 by Drizzt1991)

  • Added max_poll_records option for Consumer (PR #72 by Drizzt1991)

  • Fix kafka-python typos in docs (PR #69 by jeffwidman)

  • Topics and partitions are now randomized on each Fetch request (PR #66 by Drizzt1991)

0.1.4 (2016-11-07)

  • Bumped kafka-python version to 1.3.1 and Kafka to 0.10.1.0.

  • Fixed auto version detection, to correctly handle 0.10.0.0 version

  • Updated Fetch and Produce requests to use v2 with v0.10.0 message format on brokers. This allows a timestamp to be associated with messages.

  • Changed lz4 compression framing, as it was changed due to KIP-57 in new message format.

  • Minor refactorings

Big thanks to @fabregas for the hard work on this release (PR #60)

0.1.3 (2016-10-18)

  • Fixed bug with infinite loop on heartbeats with autocommit=True. #44

  • Bumped kafka-python to version 1.1.1

  • Fixed docker test runner with multiple interfaces

  • Minor documentation fixes

0.1.2 (2016-04-30)

  • Added Python3.5 usage example to docs

  • Don’t raise retriable exceptions in 3.5’s async for iterator

  • Fix Cancellation issue with producer’s send_and_wait method

0.1.1 (2016-04-15)

  • Fix packaging issues. Removed unneded files from package.

0.1.0 (2016-04-15)

Initial release

Added full support for Kafka 9.0. Older Kafka versions are not tested.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aiokafka-0.2.1.tar.gz (48.7 kB view details)

Uploaded Source

File details

Details for the file aiokafka-0.2.1.tar.gz.

File metadata

  • Download URL: aiokafka-0.2.1.tar.gz
  • Upload date:
  • Size: 48.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No

File hashes

Hashes for aiokafka-0.2.1.tar.gz
Algorithm Hash digest
SHA256 cc08465318b3f763410c0f11635cafc4fcc45466c58a33c04bcc758be0c2be7f
MD5 7e15deafe2749db59a9b4759ae13351c
BLAKE2b-256 004e3e8921560e1eb2297f383b0b8a58997f02e9f904e84a8cd0eda58b05068b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page