Skip to main content

Idomatic asyncio wrapper around paho-mqtt.

This project has been archived.

The maintainers of this project have marked this project as archived. No new releases are expected.

Project description

license semver PyPI

MQTT client with idiomatic asyncio interface 🙌

Write code like this:

Subscriber
async with Client("test.mosquitto.org") as client:
    async with client.filtered_messages("floors/+/humidity") as messages:
        await client.subscribe("floors/#")
        async for message in messages:
            print(message.payload.decode())
Publisher
async with Client("test.mosquitto.org") as client:
    message = "10%"
    await client.publish(
            "floors/bed_room/humidity",
             payload=message.encode()
          )

asyncio-mqtt combines the stability of the time-proven paho-mqtt library with a modern, asyncio-based interface.

  • No more callbacks! 👍
  • No more return codes (welcome to the MqttError)
  • Graceful disconnection (forget about on_unsubscribe, on_disconnect, etc.)
  • Compatible with async code
  • Fully type-hinted
  • Did we mention no more callbacks?

The whole thing is less than 700 lines of code.

Installation 📚

pip install asyncio-mqtt

Advanced use ⚡

Let's make the example from before more interesting:

import asyncio
from contextlib import AsyncExitStack, asynccontextmanager
from random import randrange
from asyncio_mqtt import Client, MqttError


async def advanced_example():
    # We 💛 context managers. Let's create a stack to help
    # us manage them.
    async with AsyncExitStack() as stack:
        # Keep track of the asyncio tasks that we create, so that
        # we can cancel them on exit
        tasks = set()
        stack.push_async_callback(cancel_tasks, tasks)

        # Connect to the MQTT broker
        client = Client("test.mosquitto.org")
        await stack.enter_async_context(client)

        # You can create any number of topic filters
        topic_filters = (
            "floors/+/humidity",
            "floors/rooftop/#"
            # 👉 Try to add more filters!
        )
        for topic_filter in topic_filters:
            # Log all messages that matches the filter
            manager = client.filtered_messages(topic_filter)
            messages = await stack.enter_async_context(manager)
            template = f'[topic_filter="{topic_filter}"] {{}}'
            task = asyncio.create_task(log_messages(messages, template))
            tasks.add(task)

        # Messages that doesn't match a filter will get logged here
        messages = await stack.enter_async_context(client.unfiltered_messages())
        task = asyncio.create_task(log_messages(messages, "[unfiltered] {}"))
        tasks.add(task)

        # Subscribe to topic(s)
        # 🤔 Note that we subscribe *after* starting the message
        # loggers. Otherwise, we may miss retained messages.
        await client.subscribe("floors/#")

        # Publish a random value to each of these topics
        topics = (
            "floors/basement/humidity",
            "floors/rooftop/humidity",
            "floors/rooftop/illuminance",
            # 👉 Try to add more topics!
        )
        task = asyncio.create_task(post_to_topics(client, topics))
        tasks.add(task)

        # Wait for everything to complete (or fail due to, e.g., network
        # errors)
        await asyncio.gather(*tasks)

async def post_to_topics(client, topics):
    while True:
        for topic in topics:
            message = randrange(100)
            print(f'[topic="{topic}"] Publishing message={message}')
            await client.publish(topic, message, qos=1)
            await asyncio.sleep(2)

async def log_messages(messages, template):
    async for message in messages:
        # 🤔 Note that we assume that the message paylod is an
        # UTF8-encoded string (hence the `bytes.decode` call).
        print(template.format(message.payload.decode()))

async def cancel_tasks(tasks):
    for task in tasks:
        if task.done():
            continue
        try:
            task.cancel()
            await task
        except asyncio.CancelledError:
            pass

async def main():
    # Run the advanced_example indefinitely. Reconnect automatically
    # if the connection is lost.
    reconnect_interval = 3  # [seconds]
    while True:
        try:
            await advanced_example()
        except MqttError as error:
            print(f'Error "{error}". Reconnecting in {reconnect_interval} seconds.')
        finally:
            await asyncio.sleep(reconnect_interval)


asyncio.run(main())

TLS configuration for MQTT client

asyncio-mqtt also exposes paho-mqtt's tls_set functionality for the users. The following minimal example explains how to enable SSL/TLS support for asyncio-mqtt client

import ssl
from asyncio_mqtt import Client, TLSParameters, ProtocolVersion

"""
ca_certs          : a string path to the Certificate Authority certificate files
                    that are to be treated as trusted by this client
certfile & keyfile: strings pointing to the PEM encoded client certificate and
                    private keys respectively
cert_reqs         : allows the certificate requirements that the client imposes on
                    the broker to be changed. By default this is ssl.CERT_REQUIRED
tls_version       : allows the version of the SSL/TLS protocol used to be specified.
                    By default TLS v1 is used
ciphers           : string specifying which encryption ciphers are allowable for this
                    connection, or None to use the defaults
keyfile_password  : if either certfile or keyfile is encrypted and needs a password to
                    decrypt it, then this can be passed using the keyfile_password
                    argument. If you do not provide keyfile_password, the password will
                    be requested to be typed in at a terminal window
"""
tls_params = TLSParameters(
    ca_certs="/path/to/certificates",
    certfile="/path/to/certfile",
    keyfile="/path/to/keyfile",
    cert_reqs=ssl.CERT_REQUIRED,
    tls_version=ssl.PROTOCOL_TLSv2,
    ciphers=None,
    keyfile_password=None,
)

async with Client(
    "test.mosquitto.org",
    username="username",
    password="password",
    protocol=ProtocolVersion.V31,
    tls_params=tls_params,
) as client:
    async with client.filtered_messages("floors/+/humidity") as messages:
        # subscribe is done afterwards so that we just start receiving messages
        # from this point on
        await client.subscribe("floors/#")
        async for message in messages:
            print(message.topic)
            print(json.loads(message.payload))

Proxy settings for asyncio-mqtt client

asyncio-mqtt allows the user to configure proxing of MQTT connection and enables the support for SOCKS or HTTP proxies. asyncio-mqtt uses the paho-mqtt proxy_set functionality to allow setting up the proxy. One thing to note here is that setting up a proxy is an extra feature (even in paho-mqtt) that requires the PySocks dependency.

The following minimal example depicts how to configure proxing of the MQTT connection

import socks
from asyncio_mqtt import Client, ProxySettings

proxy_params = ProxySettings(
    proxy_type=socks.HTTP,
    proxy_addr="example.com",
    proxy_rdns=True,
    proxy_username="username",
    proxy_password="password",
)

async with Client(
    "test.mosquitto.org",
    username="username",
    password="password",
    protocol=ProtocolVersion.V31,
    procxy=proxy_params,
) as client:
    ...
    ...

Related projects

Is asyncio-mqtt not what you are looking for? Try another client:

  • paho-mqtt — Own protocol implementation. Synchronous.
    GitHub stars license
  • gmqtt — Own protocol implementation. Asynchronous.
    GitHub stars license
  • fastapi-mqtt — Asynchronous wrapper around gmqtt. Simplifies integration in your FastAPI application.
    GitHub stars license
  • amqtt — Own protocol implementation. Asynchronous. Includes a broker.
    GitHub stars license
  • mqttools — Own protocol implementation. Asynchronous.
    GitHub stars license
  • trio-paho-mqtt — Asynchronous wrapper around paho-mqtt (similar to asyncio-mqtt). Based on trio instead of asyncio.
    GitHub stars license

Requirements

Python 3.7 or later. The only dependency is paho-mqtt.

Note for Windows Users

Since Python 3.8, the default asyncio event loop is the ProactorEventLoop. Said loop doesn't support the add_reader method that is required by asyncio-mqtt. To use asyncio-mqtt, please switch to an event loop that supports the add_reader method such as the built-in SelectorEventLoop. E.g:

# Change to the "Selector" event loop
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
# Run your async application as usual
asyncio.run(main())

Changelog

Please refer to the CHANGELOG document. It adheres to the principles of Keep a Changelog.

Versioning

semver

This project adheres to Semantic Versioning.

Expect API changes until we reach version 1.0.0. After 1.0.0, breaking changes will only occur in major release (e.g., 2.0.0, 3.0.0, etc.).

License

license

Note that the underlying paho-mqtt library is dual-licensed. One of the licenses is the so-called Eclipse Distribution License v1.0. It is almost word-for-word identical to the BSD 3-clause License. The only differences are:

  • One use of "COPYRIGHT OWNER" (EDL) instead of "COPYRIGHT HOLDER" (BSD)
  • One use of "Eclipse Foundation, Inc." (EDL) instead of "copyright holder" (BSD)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

asyncio_mqtt-0.13.0.tar.gz (18.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

asyncio_mqtt-0.13.0-py3-none-any.whl (14.9 kB view details)

Uploaded Python 3

File details

Details for the file asyncio_mqtt-0.13.0.tar.gz.

File metadata

  • Download URL: asyncio_mqtt-0.13.0.tar.gz
  • Upload date:
  • Size: 18.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.8.10

File hashes

Hashes for asyncio_mqtt-0.13.0.tar.gz
Algorithm Hash digest
SHA256 f3bee8dfdd080524e93feccec562c644786314ae4bb47ef6d2bd9a667970b825
MD5 aa3ef51314a12774db77d07afeeaa20b
BLAKE2b-256 669d370e2d7ce8d75f9384bf326a46e90026bef3f2308c43cf74a45caa2d4e8f

See more details on using hashes here.

File details

Details for the file asyncio_mqtt-0.13.0-py3-none-any.whl.

File metadata

  • Download URL: asyncio_mqtt-0.13.0-py3-none-any.whl
  • Upload date:
  • Size: 14.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.8.10

File hashes

Hashes for asyncio_mqtt-0.13.0-py3-none-any.whl
Algorithm Hash digest
SHA256 438842f57e27b85e3daf65a4d4001460ba6d46ed3b85923c44a1bd75e56d5d53
MD5 83cacfd9b9dd1b2ef8a8e1bcbdc49f0e
BLAKE2b-256 1d28c625996741bc21a59f52a62e8fa41dc2fd7cb0fa2acb0bbfd417457830fe

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page