Skip to main content

Microsoft Azure Event Hubs Client Library for Python

Project description

Azure Event Hubs client library for Python

Azure Event Hubs is a big data streaming platform and event ingestion service. It can receive and process millions of events per second.

Use the Event Hubs client library for Python to:

  • Publish events to the Event Hubs service through a sender.

  • Read events from the Event Hubs service through a receiver.

On Python 3.5 and above, it also includes:

  • An async sender and receiver that supports async/await methods.

  • An Event Processor Host module that manages the distribution of partition readers.

Source code | Package (PyPi) | API reference documentation | Product documentation

Getting started

Install the package

Install the Azure Event Hubs client library for Python with pip:

$ pip install azure-eventhub

Prerequisites

  • An Azure subscription.

  • Python 3.4 or later.

  • An existing Event Hubs namespace and event hub. You can create these entities by following the instructions in this article.

Authenticate the client

Interaction with Event Hubs starts with an instance of the EventHubClient class. You need the host name, sas policy name, sas key and event hub name to instantiate the client object.

Get credentials

You can find credential information in Azure Portal.

Create client

There are several ways to instantiate the EventHubClient object and the following code snippets demonstrate one way:

import os
from azure.eventhub import EventHubClient

connection_str = "Endpoint=sb://{}/;SharedAccessKeyName={};SharedAccessKey={};EntityPath={}".format(
    os.environ['EVENT_HUB_HOSTNAME'],
    os.environ['EVENT_HUB_SAS_POLICY'],
    os.environ['EVENT_HUB_SAS_KEY'],
    os.environ['EVENT_HUB_NAME'])
client = EventHubClient.from_connection_string(connection_str)

Key concepts

  • Namespace: An Event Hubs namespace provides a unique scoping container, referenced by its fully qualified domain name, in which you create one or more event hubs or Kafka topics.

  • Event publishers: Any entity that sends data to an event hub is an event producer, or event publisher. Event publishers can publish events using HTTPS or AMQP 1.0 or Kafka 1.0 and later. Event publishers use a Shared Access Signature (SAS) token to identify themselves to an event hub, and can have a unique identity, or use a common SAS token.

  • Event consumers: Any entity that reads event data from an event hub is an event consumer. All Event Hubs consumers connect via the AMQP 1.0 session and events are delivered through the session as they become available. The client does not need to poll for data availability.

  • SAS tokens: Event Hubs uses Shared Access Signatures, which are available at the namespace and event hub level. A SAS token is generated from a SAS key and is an SHA hash of a URL, encoded in a specific format. Using the name of the key (policy) and the token, Event Hubs can regenerate the hash and thus authenticate the sender.

For more information about these concepts, see Features and terminology in Azure Event Hubs.

Examples

The following sections provide several code snippets covering some of the most common Event Hubs tasks, including:

Send event data

Sends an event data and blocks until acknowledgement is received or operation times out.

client = EventHubClient.from_connection_string(connection_str)
sender = client.add_sender(partition="0")
 try:
    client.run()
    event_data = EventData(b"A single event")
    sender.send(event_data)
except:
    raise
finally:
    client.stop()

Receive event data

Receive events from the EventHub.

client = EventHubClient.from_connection_string(connection_str)
receiver = client.add_receiver(consumer_group="$default", partition="0", offset=Offset('@latest'))
 try:
    client.run()
    logger = logging.getLogger("azure.eventhub")
    received = receiver.receive(timeout=5, max_batch_size=100)
    for event_data in received:
        logger.info("Message received:{}".format(event_data.body_as_str()))
except:
    raise
finally:
    client.stop()

Async send event data

Sends an event data and asynchronously waits until acknowledgement is received or operation times out.

client = EventHubClientAsync.from_connection_string(connection_str)
sender = client.add_async_sender(partition="0")
try:
    await client.run_async()
    event_data = EventData(b"A single event")
    await sender.send(event_data)
except:
    raise
finally:
    await client.stop_async()

Async receive event data

Receive events asynchronously from the EventHub.

client = EventHubClientAsync.from_connection_string(connection_str)
receiver = client.add_async_receiver(consumer_group="$default", partition="0", offset=Offset('@latest'))
try:
    await client.run_async()
    logger = logging.getLogger("azure.eventhub")
    received = await receiver.receive(timeout=5)
    for event_data in received:
        logger.info("Message received:{}".format(event_data.body_as_str()))
except:
    raise
finally:
    await client.stop_async()

Troubleshooting

General

The Event Hubs APIs generate exceptions that can fall into the following categories, along with the associated action you can take to try to fix them.

  • User coding error: System.ArgumentException, System.InvalidOperationException, System.OperationCanceledException, System.Runtime.Serialization.SerializationException. General action: try to fix the code before proceeding.

  • Setup/configuration error: Microsoft.ServiceBus.Messaging.MessagingEntityNotFoundException, Microsoft.Azure.EventHubs.MessagingEntityNotFoundException, System.UnauthorizedAccessException. General action: review your configuration and change if necessary.

  • Transient exceptions: Microsoft.ServiceBus.Messaging.MessagingException, Microsoft.ServiceBus.Messaging.ServerBusyException, Microsoft.Azure.EventHubs.ServerBusyException, Microsoft.ServiceBus.Messaging.MessagingCommunicationException. General action: retry the operation or notify users.

  • Other exceptions: System.Transactions.TransactionException, System.TimeoutException, Microsoft.ServiceBus.Messaging.MessageLockLostException, Microsoft.ServiceBus.Messaging.SessionLockLostException. General action: specific to the exception type; refer to the table in Event Hubs messaging exceptions.

For more detailed infromation about excpetions and how to deal with them , see Event Hubs messaging exceptions.

Next steps

Examples

  • ./examples/send.py - use sender to publish events

  • ./examples/recv.py - use receiver to read events

  • ./examples/send_async.py - async/await support of a sender

  • ./examples/recv_async.py - async/await support of a receiver

  • ./examples/eph.py - event processor host

Documentation

Reference documentation is available at docs.microsoft.com/python/api/azure-eventhub.

Logging

  • enable ‘azure.eventhub’ logger to collect traces from the library

  • enable ‘uamqp’ logger to collect traces from the underlying uAMQP library

  • enable AMQP frame level trace by setting debug=True when creating the Client

Provide Feedback

If you encounter any bugs or have suggestions, please file an issue in the Issues section of the project.

Contributing

This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.microsoft.com.

When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.

Release History

1.3.3 (2019-12-4)

Features

  • Added reconnect_timeout and max_reconnect_tries parameters to receive functions for better control of connection behaviour during receive.

  • Added an option release_partition_on_checkpoint_failure to EPHOptions for EventProcessorHost to instruct the EventProcessorHost to fail fast on a checkpoint failure and proactively release the partition. This should reduce spurious reprocessing of non-checkpointed events, at the cost of a small amount of additional latency if the checkpoint interruption was actually transient.

BugFixes

  • Increments UAMQP dependency min version to 1.2.5 to include a set of fixes, including handling of large messages and mitigation of segfaults.

  • Fixes bug preventing application_properties from being transmitted when set individually in key-value form.

  • Fixed send timeout threadthrough to sender so it is now passed in proper units and leveraged within UAMQP.

  • Fixed bug where on reconnect, receive functions returned an empty list.

  • Fixed bug in partition pump logger interfering with certain failure mode logs.

  • Fixed bug to pass proper args to process_error_async within EventHubPartitionPump.

  • Demoted error-level logging in the cases of EPH existing leases not found or out-of-date leases being ignored. These will now be logged at info-level.

1.3.2 (2019-09-18)

BugFixes

  • Fixed bug where errors were not handled when EventProcessorHost was initializing EventHubClient.

1.3.1 (2019-02-28)

BugFixes

  • Fixed bug where datetime offset filter was using a local timestamp rather than UTC.

  • Fixed stackoverflow error in continuous connection reconnect attempts.

1.3.0 (2019-01-29)

Bugfixes

  • Added support for auto reconnect on token expiration and other auth errors (issue #89).

Features

  • Added ability to create ServiceBusClient from an existing SAS auth token, including provding a function to auto-renew that token on expiry.

  • Added support for storing a custom EPH context value in checkpoint (PR #84, thanks @konstantinmiller)

1.2.0 (2018-11-29)

  • Support for Python 2.7 in azure.eventhub module (azure.eventprocessorhost will not support Python 2.7).

  • Parse EventData.enqueued_time as a UTC timestamp (issue #72, thanks @vjrantal)

1.1.1 (2018-10-03)

  • Fixed bug in Azure namespace package.

1.1.0 (2018-09-21)

  • Changes to AzureStorageCheckpointLeaseManager parameters to support other connection options (issue #61):

    • The storage_account_name, storage_account_key and lease_container_name arguments are now optional keyword arguments.

    • Added a sas_token argument that must be specified with storage_account_name in place of storage_account_key.

    • Added an endpoint_suffix argument to support storage endpoints in National Clouds.

    • Added a connection_string argument that, if specified, overrides all other endpoint arguments.

    • The lease_container_name argument now defaults to “eph-leases” if not specified.

  • Fix for clients failing to start if run called multipled times (issue #64).

  • Added convenience methods body_as_str and body_as_json to EventData object for easier processing of message data.

1.0.0 (2018-08-22)

  • API stable.

  • Renamed internal _async module to async_ops for docs generation.

  • Added optional auth_timeout parameter to EventHubClient and EventHubClientAsync to configure how long to allow for token negotiation to complete. Default is 60 seconds.

  • Added optional send_timeout parameter to EventHubClient.add_sender and EventHubClientAsync.add_async_sender to determine the timeout for Events to be successfully sent. Default value is 60 seconds.

  • Reformatted logging for performance.

0.2.0 (2018-08-06)

  • Stability improvements for EPH.

  • Updated uAMQP version.

  • Added new configuration options for Sender and Receiver; keep_alive and auto_reconnect. These flags have been added to the following:

    • EventHubClient.add_receiver

    • EventHubClient.add_sender

    • EventHubClientAsync.add_async_receiver

    • EventHubClientAsync.add_async_sender

    • EPHOptions.keey_alive_interval

    • EPHOptions.auto_reconnect_on_error

0.2.0rc2 (2018-07-29)

  • Breaking change EventData.offset will now return an object of type ~uamqp.common.Offset rather than str. The original string value can be retrieved from ~uamqp.common.Offset.value.

  • Each sender/receiver will now run in its own independent connection.

  • Updated uAMQP dependency to 0.2.0

  • Fixed issue with IoTHub clients not being able to retrieve partition information.

  • Added support for HTTP proxy settings to both EventHubClient and EPH.

  • Added error handling policy to automatically reconnect on retryable error.

  • Added keep-alive thread for maintaining an unused connection.

0.2.0rc1 (2018-07-06)

  • Breaking change Restructured library to support Python 3.7. Submodule async has been renamed and all classes from this module can now be imported from azure.eventhub directly.

  • Breaking change Removed optional callback argument from Receiver.receive and AsyncReceiver.receive.

  • Breaking change EventData.properties has been renamed to EventData.application_properties. This removes the potential for messages to be processed via callback for not yet returned in the batch.

  • Updated uAMQP dependency to v0.1.0

  • Added support for constructing IoTHub connections.

  • Fixed memory leak in receive operations.

  • Dropped Python 2.7 wheel support.

0.2.0b2 (2018-05-29)

  • Added namespace_suffix to EventHubConfig() to support national clouds.

  • Added device_id attribute to EventData to support IoT Hub use cases.

  • Added message header to workaround service bug for PartitionKey support.

  • Updated uAMQP dependency to vRC1.

0.2.0b1 (2018-04-20)

  • Updated uAMQP to latest version.

  • Further testing and minor bug fixes.

0.2.0a2 (2018-04-02)

  • Updated uAQMP dependency.

0.2.0a1 (unreleased)

  • Swapped out Proton dependency for uAMQP.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

azure-eventhub-1.3.3.zip (74.5 kB view hashes)

Uploaded Source

Built Distribution

azure_eventhub-1.3.3-py2.py3-none-any.whl (60.3 kB view hashes)

Uploaded Python 2 Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page