Libreria client del Registro schema di Azure Avro Encoder per Python - versione 1.0.0
Registro schemi di Azure è un servizio repository di schemi ospitato da Hub eventi di Azure, fornendo archiviazione dello schema, controllo delle versioni e gestione. Questo pacchetto fornisce un codificatore Avro in grado di codificare e decodificare payload contenenti identificatori dello schema del Registro di sistema schema e contenuto con codifica Avro.
Codice | sorgente Pacchetto (PyPi) | Documentazione di | riferimento sulle API Campioni | Changelog
Dichiarazione di non responsabilità
Il supporto dei pacchetti Python di Azure SDK per Python 2.7 è terminato 01 gennaio 2022. Per altre informazioni e domande, vedere https://github.com/Azure/azure-sdk-for-python/issues/20691
Introduzione
Installare il pacchetto
Installare la libreria client del Registro schemi di Azure Avro Encoder per Python con pip:
pip install azure-schemaregistry-avroencoder
Prerequisiti:
Per usare questo pacchetto, è necessario disporre di:
- Sottoscrizione di Azure. Creare un account gratuito
- Registro schemi di - Azure Ecco la guida introduttiva per creare un gruppo del Registro schemi usando il portale di Azure.
- Python 3.6 o versione successiva - Installare Python
Autenticare il client
L'interazione con il codificatore avro del Registro schemi inizia con un'istanza della classe AvroEncoder, che accetta il nome del gruppo di schemi e la classe Client del Registro schemi. Il costruttore client accetta lo spazio dei nomi completo di Hub eventi e le credenziali di Azure Active Directory:
Lo spazio dei nomi completo dell'istanza del Registro schemi deve seguire il formato :
<yournamespace>.servicebus.windows.net
.Le credenziali AAD che implementano il protocollo TokenCredential devono essere passate al costruttore. Esistono implementazioni del
TokenCredential
protocollo disponibili nel pacchetto azure-identity. Per usare i tipi di credenziali forniti daazure-identity
, installare la libreria client di Identità di Azure per Python con pip:
pip install azure-identity
- Inoltre, per usare l'API asincrona, è prima necessario installare un trasporto asincrono, ad esempio aiohttp:
pip install aiohttp
Creare AvroEncoder usando la libreria azure-schemaregistry:
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
Concetti chiave
AvroEncoder
Fornisce l'API per codificare e decodificare da Codifica binaria Avro e un tipo di contenuto con ID schema. Usa SchemaRegistryClient per ottenere gli ID dello schema dal contenuto dello schema o viceversa.
Modelli di messaggio supportati
Il supporto è stato aggiunto a determinate classi di modello di Azure Messaging SDK per l'interoperabilità AvroEncoder
con . Questi modelli sono sottotipi del MessageType
protocollo definito nello azure.schemaregistry.encoder.avroencoder
spazio dei nomi. Attualmente, le classi di modello supportate sono:
azure.eventhub.EventData
perazure-eventhub>=5.9.0
Formato dei messaggi
Se viene fornito un tipo di messaggio che segue il protocollo MessageType al codificatore per la codifica, verrà impostato il contenuto e le proprietà del tipo di contenuto corrispondenti, dove:
content
: payload avro (in generale, payload specifico del formato)- Codifica binaria Avro
- NOT Avro Object Container File, che include lo schema e sconfigge lo scopo di questo codificatore di spostare lo schema fuori dal payload del messaggio e nel Registro di sistema dello schema.
content type
: stringa del formatoavro/binary+<schema ID>
, dove:avro/binary
è l'indicatore di formato<schema ID>
è la rappresentazione esadecimale del GUID, lo stesso formato e l'ordine di byte della stringa dal servizio Registro schemi.
Se EventData
viene passato come tipo di messaggio, le proprietà seguenti verranno impostate sull'oggetto EventData
:
- La
body
proprietà verrà impostata sul valore del contenuto. - La
content_type
proprietà verrà impostata sul valore del tipo di contenuto.
Se il tipo di messaggio non viene fornito e, per impostazione predefinita, il codificatore creerà la dict seguente: {"content": <Avro encoded payload>, "content_type": 'avro/binary+<schema ID>' }
Esempio
Le sezioni seguenti forniscono diversi frammenti di codice che coprono alcune delle attività del Registro schemi più comuni, tra cui:
Codifica
Usare il AvroEncoder.encode
metodo per codificare il contenuto con lo schema Avro specificato.
Il metodo userà uno schema registrato in precedenza nel servizio Registro schemi e manterrà memorizzato nella cache lo schema per un utilizzo futuro della codifica. Per evitare di registrare lo schema nel servizio e registrarlo automaticamente con il encode
metodo, l'argomento auto_register=True
parola chiave deve essere passato al AvroEncoder
costruttore.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventData
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
name = "example.avro.User"
format = "Avro"
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_registry_client.register_schema(group_name, name, definition, format)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
with encoder:
dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
# OR
message_content_dict = encoder.encode(dict_content, schema=definition)
event_data = EventData.from_message_content(message_content_dict["content"], message_content_dict["content_type"])
Decodifica
Usare il metodo per decodificare il AvroEncoder.decode
contenuto con codifica Avro in base a:
- Passaggio di un oggetto messaggio che è un sottotipo del protocollo MessageType.
- Passando una dict con chiavi
content
(byte di tipo) econtent_type
(stringa di tipo). Il metodo recupera automaticamente lo schema dal servizio Registro schemi e mantiene memorizzato nella cache lo schema per l'utilizzo futuro della decodifica.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
encoder = AvroEncoder(client=schema_registry_client)
with encoder:
# event_data is an EventData object with Avro encoded body
dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
decoded_content = encoder.decode(event_data)
# OR
encoded_bytes = b'<content_encoded_by_azure_schema_registry_avro_encoder>'
content_type = 'avro/binary+<schema_id_of_corresponding_schema>'
content_dict = {"content": encoded_bytes, "content_type": content_type}
decoded_content = encoder.decode(content_dict)
Hub eventi che inviano l'integrazione
Integrazione con Hub eventi per inviare un EventData
oggetto con body
impostato sul contenuto con codifica Avro e corrispondente content_type
.
import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)
eventhub_producer = EventHubProducerClient.from_connection_string(
conn_str=eventhub_connection_str,
eventhub_name=eventhub_name
)
with eventhub_producer, avro_encoder:
event_data_batch = eventhub_producer.create_batch()
dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
event_data_batch.add(event_data)
eventhub_producer.send_batch(event_data_batch)
Hub eventi che ricevono l'integrazione
Integrazione con Hub eventi per ricevere un EventData
oggetto e decodificare il valore con codifica body
Avro.
import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
eventhub_consumer = EventHubConsumerClient.from_connection_string(
conn_str=eventhub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name,
)
def on_event(partition_context, event):
decoded_content = avro_encoder.decode(event)
with eventhub_consumer, avro_encoder:
eventhub_consumer.receive(on_event=on_event, starting_position="-1")
Risoluzione dei problemi
Generale
Azure Schema Registry Avro Encoder genera eccezioni definite in Azure Core se vengono rilevati errori durante la comunicazione con il servizio Registro schemi. Gli errori relativi ai tipi di contenuto/contenuto non validi e agli schemi non validi verranno generati rispettivamente come azure.schemaregistry.encoder.avroencoder.InvalidContentError
e azure.schemaregistry.encoder.avroencoder.InvalidSchemaError
, dove __cause__
conterrà l'eccezione sottostante generata dalla libreria Apache Avro.
Registrazione
Questa libreria usa la libreria di registrazione standard per la registrazione. Le informazioni di base sulle sessioni HTTP (URL, intestazioni e così via) vengono registrate a livello di INFO.
La registrazione dettagliata del livello DEBUG, inclusi i corpi di richiesta/risposta e le intestazioni non attendibili, può essere abilitata in un client con l'argomento logging_enable
:
import sys
import os
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)
# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
Analogamente, logging_enable
può abilitare la registrazione dettagliata per una singola operazione, anche quando non è abilitata per il client:
encoder.encode(dict_content, schema=definition, logging_enable=True)
Passaggi successivi
Altro codice di esempio
Altri esempi che illustrano gli scenari comuni del Registro di sistema di Azure Avro Encoder si trovano nella directory degli esempi .
Contributo
In questo progetto sono benvenuti i contributi e i suggerimenti. Per la maggior parte dei contenuti è necessario sottoscrivere un contratto di licenza di collaborazione (CLA, Contributor License Agreement) che stabilisce che l'utente ha il diritto di concedere, e di fatto concede a Microsoft i diritti d'uso del suo contributo. Per informazioni dettagliate, vedere https://cla.microsoft.com.
Quando si invia una richiesta pull, un bot CLA determina automaticamente se è necessario specificare un contratto CLA e completare la richiesta pull in modo appropriato (ad esempio con un'etichetta e un commento). Seguire le istruzioni specificate dal bot. È sufficiente eseguire questa operazione una sola volta per tutti i repository che usano il contratto CLA Microsoft.
Questo progetto ha adottato il Codice di comportamento di Microsoft per l'open source. Per altre informazioni, vedere Code of Conduct FAQ (Domande frequenti sul Codice di comportamento Open Source di Microsoft) oppure contattare opencode@microsoft.com per eventuali altre domande o commenti.
Azure SDK for Python