Idomatic asyncio wrapper around paho-mqtt.
This project has been archived.
The maintainers of this project have marked this project as archived. No new releases are expected.
Project description
MQTT client with idiomatic asyncio interface 🙌
Write code like this:
Subscriber
async with Client("test.mosquitto.org") as client:
async with client.filtered_messages("floors/+/humidity") as messages:
await client.subscribe("floors/#")
async for message in messages:
print(message.payload.decode())
Publisher
async with Client("test.mosquitto.org") as client:
message = "10%"
await client.publish(
"floors/bed_room/humidity",
payload=message.encode()
)
asyncio-mqtt combines the stability of the time-proven paho-mqtt library with a modern, asyncio-based interface.
- No more callbacks! 👍
- No more return codes (welcome to the
MqttError) - Graceful disconnection (forget about
on_unsubscribe,on_disconnect, etc.) - Compatible with
asynccode - Fully type-hinted
- Did we mention no more callbacks?
The whole thing is less than 700 lines of code.
Installation 📚
pip install asyncio-mqtt
Advanced use ⚡
Let's make the example from before more interesting:
import asyncio
from contextlib import AsyncExitStack, asynccontextmanager
from random import randrange
from asyncio_mqtt import Client, MqttError
async def advanced_example():
# We 💛 context managers. Let's create a stack to help
# us manage them.
async with AsyncExitStack() as stack:
# Keep track of the asyncio tasks that we create, so that
# we can cancel them on exit
tasks = set()
stack.push_async_callback(cancel_tasks, tasks)
# Connect to the MQTT broker
client = Client("test.mosquitto.org")
await stack.enter_async_context(client)
# You can create any number of topic filters
topic_filters = (
"floors/+/humidity",
"floors/rooftop/#"
# 👉 Try to add more filters!
)
for topic_filter in topic_filters:
# Log all messages that matches the filter
manager = client.filtered_messages(topic_filter)
messages = await stack.enter_async_context(manager)
template = f'[topic_filter="{topic_filter}"] {{}}'
task = asyncio.create_task(log_messages(messages, template))
tasks.add(task)
# Messages that doesn't match a filter will get logged here
messages = await stack.enter_async_context(client.unfiltered_messages())
task = asyncio.create_task(log_messages(messages, "[unfiltered] {}"))
tasks.add(task)
# Subscribe to topic(s)
# 🤔 Note that we subscribe *after* starting the message
# loggers. Otherwise, we may miss retained messages.
await client.subscribe("floors/#")
# Publish a random value to each of these topics
topics = (
"floors/basement/humidity",
"floors/rooftop/humidity",
"floors/rooftop/illuminance",
# 👉 Try to add more topics!
)
task = asyncio.create_task(post_to_topics(client, topics))
tasks.add(task)
# Wait for everything to complete (or fail due to, e.g., network
# errors)
await asyncio.gather(*tasks)
async def post_to_topics(client, topics):
while True:
for topic in topics:
message = randrange(100)
print(f'[topic="{topic}"] Publishing message={message}')
await client.publish(topic, message, qos=1)
await asyncio.sleep(2)
async def log_messages(messages, template):
async for message in messages:
# 🤔 Note that we assume that the message paylod is an
# UTF8-encoded string (hence the `bytes.decode` call).
print(template.format(message.payload.decode()))
async def cancel_tasks(tasks):
for task in tasks:
if task.done():
continue
try:
task.cancel()
await task
except asyncio.CancelledError:
pass
async def main():
# Run the advanced_example indefinitely. Reconnect automatically
# if the connection is lost.
reconnect_interval = 3 # [seconds]
while True:
try:
await advanced_example()
except MqttError as error:
print(f'Error "{error}". Reconnecting in {reconnect_interval} seconds.')
finally:
await asyncio.sleep(reconnect_interval)
asyncio.run(main())
TLS configuration for MQTT client
asyncio-mqtt also exposes paho-mqtt's tls_set functionality for the users. The following minimal example explains how to enable SSL/TLS support for asyncio-mqtt client
import ssl
from asyncio_mqtt import Client, TLSParameters, ProtocolVersion
"""
ca_certs : a string path to the Certificate Authority certificate files
that are to be treated as trusted by this client
certfile & keyfile: strings pointing to the PEM encoded client certificate and
private keys respectively
cert_reqs : allows the certificate requirements that the client imposes on
the broker to be changed. By default this is ssl.CERT_REQUIRED
tls_version : allows the version of the SSL/TLS protocol used to be specified.
By default TLS v1 is used
ciphers : string specifying which encryption ciphers are allowable for this
connection, or None to use the defaults
keyfile_password : if either certfile or keyfile is encrypted and needs a password to
decrypt it, then this can be passed using the keyfile_password
argument. If you do not provide keyfile_password, the password will
be requested to be typed in at a terminal window
"""
tls_params = TLSParameters(
ca_certs="/path/to/certificates",
certfile="/path/to/certfile",
keyfile="/path/to/keyfile",
cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLSv2,
ciphers=None,
keyfile_password=None,
)
async with Client(
"test.mosquitto.org",
username="username",
password="password",
protocol=ProtocolVersion.V31,
tls_params=tls_params,
) as client:
async with client.filtered_messages("floors/+/humidity") as messages:
# subscribe is done afterwards so that we just start receiving messages
# from this point on
await client.subscribe("floors/#")
async for message in messages:
print(message.topic)
print(json.loads(message.payload))
Proxy settings for asyncio-mqtt client
asyncio-mqtt allows the user to configure proxing of MQTT connection and enables the support for SOCKS or HTTP proxies. asyncio-mqtt uses the paho-mqtt proxy_set functionality to allow setting up the proxy. One thing to note here is that setting up a proxy is an extra feature (even in paho-mqtt) that requires the PySocks dependency.
The following minimal example depicts how to configure proxing of the MQTT connection
import socks
from asyncio_mqtt import Client, ProxySettings
proxy_params = ProxySettings(
proxy_type=socks.HTTP,
proxy_addr="example.com",
proxy_rdns=True,
proxy_username="username",
proxy_password="password",
)
async with Client(
"test.mosquitto.org",
username="username",
password="password",
protocol=ProtocolVersion.V31,
procxy=proxy_params,
) as client:
...
...
Related projects
Is asyncio-mqtt not what you are looking for? Try another client:
- paho-mqtt — Own protocol implementation. Synchronous.
- gmqtt — Own protocol implementation. Asynchronous.
- fastapi-mqtt — Asynchronous wrapper around gmqtt. Simplifies integration in your FastAPI application.
- amqtt — Own protocol implementation. Asynchronous. Includes a broker.
- mqttools — Own protocol implementation. Asynchronous.
- trio-paho-mqtt — Asynchronous wrapper around paho-mqtt (similar to asyncio-mqtt). Based on trio instead of asyncio.
Requirements
Python 3.7 or later. The only dependency is paho-mqtt.
Note for Windows Users
Since Python 3.8, the default asyncio event loop is the ProactorEventLoop. Said loop doesn't support the add_reader method that is required by asyncio-mqtt. To use asyncio-mqtt, please switch to an event loop that supports the add_reader method such as the built-in SelectorEventLoop. E.g:
# Change to the "Selector" event loop
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
# Run your async application as usual
asyncio.run(main())
Changelog
Please refer to the CHANGELOG document. It adheres to the principles of Keep a Changelog.
Versioning
This project adheres to Semantic Versioning.
Expect API changes until we reach version 1.0.0. After 1.0.0, breaking changes will only occur in major release (e.g., 2.0.0, 3.0.0, etc.).
License
Note that the underlying paho-mqtt library is dual-licensed. One of the licenses is the so-called Eclipse Distribution License v1.0. It is almost word-for-word identical to the BSD 3-clause License. The only differences are:
- One use of "COPYRIGHT OWNER" (EDL) instead of "COPYRIGHT HOLDER" (BSD)
- One use of "Eclipse Foundation, Inc." (EDL) instead of "copyright holder" (BSD)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file asyncio_mqtt-0.13.0.tar.gz.
File metadata
- Download URL: asyncio_mqtt-0.13.0.tar.gz
- Upload date:
- Size: 18.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.8.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f3bee8dfdd080524e93feccec562c644786314ae4bb47ef6d2bd9a667970b825
|
|
| MD5 |
aa3ef51314a12774db77d07afeeaa20b
|
|
| BLAKE2b-256 |
669d370e2d7ce8d75f9384bf326a46e90026bef3f2308c43cf74a45caa2d4e8f
|
File details
Details for the file asyncio_mqtt-0.13.0-py3-none-any.whl.
File metadata
- Download URL: asyncio_mqtt-0.13.0-py3-none-any.whl
- Upload date:
- Size: 14.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.8.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
438842f57e27b85e3daf65a4d4001460ba6d46ed3b85923c44a1bd75e56d5d53
|
|
| MD5 |
83cacfd9b9dd1b2ef8a8e1bcbdc49f0e
|
|
| BLAKE2b-256 |
1d28c625996741bc21a59f52a62e8fa41dc2fd7cb0fa2acb0bbfd417457830fe
|