Skip to main content

A python3 client for jetblack-messagebus

Project description

jetblack-messagebus-python3

Overview

This is a Python 3.7+ client for the jetblack-messagebus.

It follows the publish-subscribe pattern, and includes support for "notification" of a subscription by another client. This allows it to provide data on-demand.

See the server documentation for more detailed information.

Example

The client below subscribes on feed "TEST" to topic "FOO" and prints out the data it receives.

import asyncio

from jetblack_messagebus import CallbackClient

async def on_data(user, host, feed, topic, data_packets, content_type):
    print(f'data: user="{user}",host="{host}",feed="{feed}",topic="{topic}",content_type={content_type}')
    if not data_packets:
        print("no data")
    else:
        for packet in data_packets:
            message = packet.data.decode('utf8') if packet.data else None
            print(f'packet: entitlements={packet.entitlements},message={message}')

async def main():
    client = await CallbackClient.create('localhost', 9001)
    client.data_handlers.append(on_data)
    await client.add_subscription('TEST', 'FOO')
    await client.start()

if __name__ == '__main__':
    asyncio.run(main())

SSL

To create an SSL subscriber, pass in the ssl context.

import ssl
from jetblack_messagebus import CallbackClient

...

async def main():
    """Start the demo"""
    ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
    client = await CallbackClient.create('myhost.example.com', 9001, ssl=ssl_context)
    await client.add_subscription('TEST', 'FOO')
    await client.start()

Authentication

The message bus currently supports the following authentication methods:

  • Password File
  • LDAP
  • JWT

Password File and LDAP

Both the password file and LDAP methods require a username and password. This is provided by the basic authenticator.

import ssl
from jetblack_messagebus import CallbackClient
from jetblack_messagebus.authentication import BasicAuthenticator

...

async def main():
    authenticator = BasicAuthenticator("john.doe@example.com", "pa$$word")
    ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
    client = await CallbackClient.create(
        'myhost.example.com',
        9001,
        ssl=ssl_context,
        authenticator=authenticator
    )
    await client.add_subscription('TEST', 'FOO')
    await client.start()

JWT

JWT authentication requires a valid token to be passed by the client. This is provided by the token authenticator.

import ssl
from jetblack_messagebus import CallbackClient
from jetblack_messagebus.authentication import TokenAuthenticator

...

async def main():
    authenticator = TokenAuthenticator(
        "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJqb2huLmRvZUBleGFtcGxlLmNvbSIsIm5hbWUiOiJKb2huIERvZSIsImlhdCI6MTUxNjIzOTAyMiwiZXhwIjoxNTE3MTgzNTAxfQ.wLSGBcNUT8r1DqQvaBrrGY4NHiiVOpoxrgeoPsSsJkY"
    )
    ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
    client = await CallbackClient.create(
        'myhost.example.com',
        9001,
        ssl=ssl_context,
        authenticator=authenticator
    )
    await client.add_subscription('TEST', 'FOO')
    await client.start()

Message handlers

There are three types of messages that can be received:

  • Data
  • Subscription notifications
  • Authorization requests.

Data

A data handler looks like this:

# A data handler.
async def on_data(
        user: str,
        host: str,
        feed: str,
        topic: str,
        data_packets: Optional[List[DataPacket]],
        content_type: str
) -> None:
    """Called when data is received"""
    pass

# Add the handler to the client.
client.data_handlers.append(on_data)
# Remove the handler
client.data_handlers.remove(on_data)

The data packets have two fields: entitlements and data.

The entitlements is a optional set of ints which express the entitlements that were required for the data to have been received.

The data is an optional bytes holding the encoded data. What this represents is agreed by the sender and receiver. For example it might be a simple string, some JSON text, or a protocol buffer.

Subscription notifications

A subscription notification handler looks like this:

# A notification handler.
async def on_notification(
        client_id: UUID,
        user: str,
        host: str,
        feed: str,
        topic: str,
        is_add: bool
) -> None:
    """Called for a notification"""
    pass

# Add the handler to the client.
client.notification_handlers.append(on_notification)
# Remove the handler
client.notification_handlers.remove(on_notification)

Authorization requests

# An authorization handler.
async def on_authorization(
        client_id: UUID,
        host: str,
        user: str,
        feed: str,
        topic: str
) -> None:
    """Called when authorization is requested"""
    pass

# Add the handler to the client.
client._authorization_handlers.append(on_authorization)
# Remove the handler
client._authorization_handlers.remove(on_authorization)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

jetblack-messagebus-python3-6.0.0a1.tar.gz (16.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

jetblack_messagebus_python3-6.0.0a1-py3-none-any.whl (20.9 kB view details)

Uploaded Python 3

File details

Details for the file jetblack-messagebus-python3-6.0.0a1.tar.gz.

File metadata

File hashes

Hashes for jetblack-messagebus-python3-6.0.0a1.tar.gz
Algorithm Hash digest
SHA256 e42832b7cc48b11c1410b037699a8b72b7c407b21d4e0d4f07b0f6ed10f7901a
MD5 d02726c0c096e692cf574df3aabc35f5
BLAKE2b-256 4d2bc4b022ae25e96ba0173b24e1db2d6c466737aafd550e311b99537ddc896c

See more details on using hashes here.

File details

Details for the file jetblack_messagebus_python3-6.0.0a1-py3-none-any.whl.

File metadata

File hashes

Hashes for jetblack_messagebus_python3-6.0.0a1-py3-none-any.whl
Algorithm Hash digest
SHA256 4a07ed4aa7623fd3c508ecc914e65cd0f77d09fb5e96bffa0b107270e9ec0c2e
MD5 b3428d99fdb7f378314c30211250d1cd
BLAKE2b-256 38b410019678a42506e017d64b6e404d91aad1ce8f5ef2e6e3c51136c9534a8c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page