Bibliothèque cliente Azure Schema Registry Avro Encoder pour Python - version 1.0.0
Azure Schema Registry est un service de référentiel de schémas hébergé par Azure Event Hubs, qui fournit le stockage des schémas, le contrôle de version et la gestion. Ce package fournit un encodeur Avro capable d’encoder et de décoder des charges utiles contenant des identificateurs de schéma du registre de schémas et du contenu encodé avro.
| Code sourcePackage (PyPi) | Documentation de référence sur les | API Échantillons | Changelog
Clause d’exclusion de responsabilité
La prise en charge des packages Python du SDK Azure pour Python 2.7 a pris fin le 1er janvier 2022. Pour obtenir plus d’informations et poser des questions, reportez-vous à https://github.com/Azure/azure-sdk-for-python/issues/20691
Prise en main
Installer le package
Installez la bibliothèque cliente Azure Schema Registry Avro Encoder pour Python avec pip :
pip install azure-schemaregistry-avroencoder
Configuration requise :
Pour utiliser ce package, vous devez disposer des éléments suivants :
- Abonnement Azure - Créer un compte gratuit
- Azure Schema Registry - Voici le guide de démarrage rapide pour créer un groupe Registre de schémas à l’aide de l’Portail Azure.
- Python 3.6 ou version ultérieure - Installer Python
Authentifier le client
L’interaction avec l’encodeur Avro Registry de schémas commence par une instance de la classe AvroEncoder, qui prend le nom du groupe de schémas et la classe cliente Schema Registry . Le constructeur client prend l’espace de noms complet Event Hubs et les informations d’identification Azure Active Directory :
L’espace de noms complet de l’instance du Registre de schémas doit suivre le format :
<yournamespace>.servicebus.windows.net
.Les informations d’identification AAD qui implémentent le protocole TokenCredential doivent être passées au constructeur. Des implémentations du
TokenCredential
protocole sont disponibles dans le package azure-identity. Pour utiliser les types d’informations d’identification fournis parazure-identity
, installez la bibliothèque cliente Azure Identity pour Python avec pip :
pip install azure-identity
- En outre, pour utiliser l’API asynchrone, vous devez d’abord installer un transport asynchrone, tel que aiohttp :
pip install aiohttp
Créez AvroEncoder à l’aide de la bibliothèque azure-schemaregistry :
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
Concepts clés
AvroEncoder
Fournit une API pour encoder et décoder à partir d’Avro Binary Encoding plus un type de contenu avec l’ID de schéma. Utilise SchemaRegistryClient pour obtenir des ID de schéma à partir du contenu du schéma ou inversement.
Modèles de message pris en charge
La prise en charge a été ajoutée à certaines classes de modèles du KIT de développement logiciel (SDK) Azure Messaging pour l’interopérabilité avec .AvroEncoder
Ces modèles sont des sous-types du MessageType
protocole défini sous l’espace de azure.schemaregistry.encoder.avroencoder
noms. Actuellement, les classes de modèle prises en charge sont les suivantes :
azure.eventhub.EventData
pourazure-eventhub>=5.9.0
Format de message
Si un type de message qui suit le protocole MessageType est fourni à l’encodeur pour l’encodage, il définit les propriétés de contenu et de type de contenu correspondantes, où :
content
: charge utile Avro (en général, charge utile spécifique au format)- Encodage binaire Avro
- NOT Avro Object Container File, qui inclut le schéma et contredise l’objectif de cet encodeur de déplacer le schéma hors de la charge utile du message et dans le registre de schémas.
content type
: chaîne du formatavro/binary+<schema ID>
, où :avro/binary
est l’indicateur de format<schema ID>
est la représentation hexadécimale du GUID, le même format et l’ordre d’octet que la chaîne du service Registre de schémas.
Si EventData
est transmis en tant que type de message, les propriétés suivantes sont définies sur l’objet EventData
:
- La
body
propriété sera définie sur la valeur de contenu. - La
content_type
propriété sera définie sur la valeur du type de contenu.
Si le type de message n’est pas fourni et par défaut, l’encodeur crée la dict suivante : {"content": <Avro encoded payload>, "content_type": 'avro/binary+<schema ID>' }
Exemples
Les sections suivantes fournissent plusieurs extraits de code couvrant certaines des tâches les plus courantes du Registre de schémas, notamment :
Encodage
Utilisez la AvroEncoder.encode
méthode pour encoder du contenu avec le schéma Avro donné.
La méthode utilise un schéma précédemment inscrit auprès du service Registre de schémas et conserve le schéma mis en cache pour une utilisation future de l’encodage. Pour éviter de préinscrire le schéma auprès du service et de l’inscrire automatiquement auprès de la encode
méthode, l’argument auto_register=True
de mot clé doit être passé au AvroEncoder
constructeur.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventData
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
name = "example.avro.User"
format = "Avro"
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_registry_client.register_schema(group_name, name, definition, format)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
with encoder:
dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
# OR
message_content_dict = encoder.encode(dict_content, schema=definition)
event_data = EventData.from_message_content(message_content_dict["content"], message_content_dict["content_type"])
Décodage
Utilisez la AvroEncoder.decode
méthode pour décoder le contenu encodé avro par :
- Passage d’un objet de message qui est un sous-type du protocole MessageType.
- Passage d’une dict avec des clés
content
(type octets) etcontent_type
(type string). La méthode récupère automatiquement le schéma à partir du service De Registre de schémas et conserve le schéma mis en cache pour une utilisation ultérieure du décodage.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
encoder = AvroEncoder(client=schema_registry_client)
with encoder:
# event_data is an EventData object with Avro encoded body
dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
decoded_content = encoder.decode(event_data)
# OR
encoded_bytes = b'<content_encoded_by_azure_schema_registry_avro_encoder>'
content_type = 'avro/binary+<schema_id_of_corresponding_schema>'
content_dict = {"content": encoded_bytes, "content_type": content_type}
decoded_content = encoder.decode(content_dict)
Intégration d’envoi d’Event Hubs
Intégration à Event Hubs pour envoyer un EventData
objet avec body
défini sur le contenu encodé avro et correspondant content_type
.
import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)
eventhub_producer = EventHubProducerClient.from_connection_string(
conn_str=eventhub_connection_str,
eventhub_name=eventhub_name
)
with eventhub_producer, avro_encoder:
event_data_batch = eventhub_producer.create_batch()
dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
event_data_batch.add(event_data)
eventhub_producer.send_batch(event_data_batch)
Intégration de réception d’Event Hubs
Intégration à Event Hubs pour recevoir un EventData
objet et décoder la valeur encodée body
avro.
import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
eventhub_consumer = EventHubConsumerClient.from_connection_string(
conn_str=eventhub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name,
)
def on_event(partition_context, event):
decoded_content = avro_encoder.decode(event)
with eventhub_consumer, avro_encoder:
eventhub_consumer.receive(on_event=on_event, starting_position="-1")
Dépannage
Général
Azure Schema Registry Avro Encoder déclenche des exceptions définies dans Azure Core si des erreurs sont rencontrées lors de la communication avec le service Schema Registry. Les erreurs liées aux types de contenu/contenu non valides et aux schémas non valides seront générées en tant que azure.schemaregistry.encoder.avroencoder.InvalidContentError
et azure.schemaregistry.encoder.avroencoder.InvalidSchemaError
, respectivement, où __cause__
contiendra l’exception sous-jacente levée par la bibliothèque Apache Avro.
Journalisation
Cette bibliothèque utilise la bibliothèque de journalisation standard pour la journalisation. Les informations de base sur les sessions HTTP (URL, en-têtes, etc.) sont enregistrées au niveau INFO.
La journalisation détaillée au niveau DEBUG, y compris les corps de requête/réponse et les en-têtes non expurgés, peut être activée sur un client avec l’argument logging_enable
:
import sys
import os
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)
# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
De la même façon, logging_enable
peut activer la journalisation détaillée pour une seule opération, même quand elle n’est pas activée pour le client :
encoder.encode(dict_content, schema=definition, logging_enable=True)
Étapes suivantes
Autres exemples de code
D’autres exemples illustrant les scénarios Courants d’Azure Schema Registry Avro Encoder se trouvent dans le répertoire d’exemples .
Contribution
Ce projet accepte les contributions et les suggestions. La plupart des contributions vous demandent d’accepter un contrat de licence de contribution (CLA) déclarant que vous avez le droit de nous accorder, et que vous nous accordez réellement, les droits d’utilisation de votre contribution. Pour plus d’informations, visitez https://cla.microsoft.com.
Quand vous envoyez une demande de tirage (pull request), un bot CLA détermine automatiquement si vous devez fournir un contrat CLA et agrémenter la demande de tirage de façon appropriée (par exemple, avec une étiquette ou un commentaire). Suivez simplement les instructions fournies par le bot. Vous ne devez effectuer cette opération qu’une seule fois sur tous les dépôts utilisant notre contrat CLA.
Ce projet a adopté le Code de conduite Open Source de Microsoft. Pour plus d’informations, consultez les Questions fréquentes (FAQ) sur le code de conduite ou envoyez vos questions ou vos commentaires à opencode@microsoft.com.
Azure SDK for Python