Skip to main content

Python Rest Client to interact against Schema Registry Confluent Server to manage Avro Schemas

Project description

Python Rest Client Schema Registry

Build Status GitHub license codecov Python Version

Python Rest Client to interact against schema-registry confluent server to manage Avro Schemas resources.

Requirements

python 3.6+

Installation

pip install python-schema-registry-client

If you want the Faust functionality:

pip install python-schema-registry-client[faust]

Note that this will automatically add a dependency on the faust-streaming fork of faust. If you want to use the old faust version, simply install it manually and then install python-schema-registry-client without the faust extra enabled, the functionality will be the same.

Client API, Serializer, Faust Integration and Schema Server description

Documentation: https://marcosschroh.github.io/python-schema-registry-client.io

Usage

from schema_registry.client import SchemaRegistryClient, schema

client = SchemaRegistryClient(url="http://127.0.0.1:8081")

deployment_schema = {
    "type": "record",
    "namespace": "com.kubertenes",
    "name": "AvroDeployment",
    "fields": [
        {"name": "image", "type": "string"},
        {"name": "replicas", "type": "int"},
        {"name": "port", "type": "int"},
    ],
}

avro_schema = schema.AvroSchema(deployment_schema)

schema_id = client.register("test-deployment", avro_schema)

or async

from schema_registry.client import AsyncSchemaRegistryClient, schema

async_client = AsyncSchemaRegistryClient(url="http://127.0.0.1:8081")

deployment_schema = {
    "type": "record",
    "namespace": "com.kubertenes",
    "name": "AvroDeployment",
    "fields": [
        {"name": "image", "type": "string"},
        {"name": "replicas", "type": "int"},
        {"name": "port", "type": "int"},
    ],
}

avro_schema = schema.AvroSchema(deployment_schema)

schema_id = await async_client.register("test-deployment", avro_schema)

Usage with dataclasses-avroschema

You can generate the avro schema directely from a python class using dataclasses-avroschema and use it in the API for register schemas, check versions and test compatibility:

import dataclasses

from dataclasses_avroschema import AvroModel, types

from schema_registry.client import SchemaRegistryClient

client = SchemaRegistryClient(url="http://127.0.0.1:8081")


@dataclasses.dataclass
class UserAdvance(AvroModel):
    name: str
    age: int
    pets: typing.List[str] = dataclasses.field(default_factory=lambda: ["dog", "cat"])
    accounts: typing.Dict[str, int] = dataclasses.field(default_factory=lambda: {"key": 1})
    has_car: bool = False
    favorite_colors: types.Enum = types.Enum(["BLUE", "YELLOW", "GREEN"], default="BLUE")
    country: str = "Argentina"
    address: str = None

# register the schema
schema_id = client.register(subject, UserAdvance.avro_schema())

print(schema_id)
# >>> 12

result = client.check_version(subject, UserAdvance.avro_schema())
print(result)
# >>> SchemaVersion(subject='dataclasses-avroschema-subject-2', schema_id=12, schema=1, version={"type":"record" ...')

compatibility = client.test_compatibility(subject, UserAdvance.avro_schema())
print(compatibility)

# >>> True

When use this library

Usually, we have a situacion like this:

Confluent Architecture

So, our producers/consumers have to serialize/deserialize messages every time that they send/receive from Kafka topics. In this picture, we can imagine a Faust application receiving messages (encoded with an Avro schema) and we want to deserialize them, so we can ask the schema server to do that for us. In this scenario, the MessageSerializer is perfect.

Also, could be a use case that we would like to have an Application only to administrate Avro Schemas (register, update compatibilities, delete old schemas, etc.), so the SchemaRegistryClient is perfect.

Development

Install the project and development utilities in edit mode:

pip3 install -e ".[tests,docs,faust]

The tests are run against the Schema Server using docker compose, so you will need Docker and Docker Compose installed.

./scripts/test

Run code linting:

./scripts/lint

To perform tests using the python shell you can execute docker-compose up and the schema registry server will run on http://127.0.0.1:8081, the you can interact against it using the SchemaRegistryClient:

from schema_registry.client import SchemaRegistryClient, schema

client = SchemaRegistryClient(url="http://127.0.0.1:8081")

# do some operations with the client...

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

python-schema-registry-client-1.7.2.tar.gz (21.3 kB view details)

Uploaded Source

File details

Details for the file python-schema-registry-client-1.7.2.tar.gz.

File metadata

  • Download URL: python-schema-registry-client-1.7.2.tar.gz
  • Upload date:
  • Size: 21.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/50.3.0 requests-toolbelt/0.9.1 tqdm/4.48.2 CPython/3.7.0

File hashes

Hashes for python-schema-registry-client-1.7.2.tar.gz
Algorithm Hash digest
SHA256 bbd680bdcf7c8ee929711f0aed98591e2b9530ecb4b14805442dd0a1aaced624
MD5 9a134ec0d6a6a3c2ac2758bf954afe7d
BLAKE2b-256 7221e2f6b7223121d1da35c45d3b5665d97ba316c9f1ec40e37d0fa1551cf459

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page