Condividi tramite


Libreria client avro serializer del Registro schemi di Azure per Python - versione 1.0.0b4

Registro schemi di Azure è un servizio di repository di schemi ospitato da Hub eventi di Azure, che fornisce archiviazione dello schema, controllo delle versioni e gestione. Questo pacchetto fornisce un serializzatore Avro in grado di serializzare e deserializzare i payload contenenti identificatori dello schema del Registro schemi e dati con codifica Avro.

Codice | sorgente Pacchetto (PyPi) | Documentazione | di riferimento sulle APICampioni | Changelog

Dichiarazione di non responsabilità

Il supporto dei pacchetti Python di Azure SDK per Python 2.7 termina il 01 gennaio 2022. Per altre informazioni e domande, fare riferimento a https://github.com/Azure/azure-sdk-for-python/issues/20691

Introduzione

Installare il pacchetto

Installare la libreria client avro serializer del Registro schemi di Azure e la libreria client di Identità di Azure per Python con pip:

pip install azure-schemaregistry-avroserializer azure-identity

Prerequisiti:

Per usare questo pacchetto, è necessario disporre di:

Autenticare il client

L'interazione con il serializzatore Avro del Registro schemi inizia con un'istanza della classe AvroSerializer, che accetta il nome del gruppo di schemi e la classe client del Registro schemi . Il costruttore client accetta lo spazio dei nomi completo di Hub eventi e le credenziali di Azure Active Directory:

  • Lo spazio dei nomi completo dell'istanza del Registro schemi deve seguire il formato : <yournamespace>.servicebus.windows.net.

  • Una credenziale di AAD che implementa il protocollo TokenCredential deve essere passata al costruttore. Nel pacchetto azure-identity sono disponibili implementazioni del TokenCredential protocollo. Per usare i tipi di credenziali forniti da azure-identity, installare la libreria client di Identità di Azure per Python con pip:

pip install azure-identity
  • Inoltre, per usare l'API asincrona supportata in Python 3.6+, è prima necessario installare un trasporto asincrono, ad esempio aiohttp:
pip install aiohttp

Creare AvroSerializer usando la libreria azure-schemaregistry:

from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = '<< FULLY QUALIFIED NAMESPACE OF THE SCHEMA REGISTRY >>'
group_name = '<< GROUP NAME OF THE SCHEMA >>'
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

Concetti chiave

AvroSerializer

Fornisce l'API per serializzare e deserializzare da Codifica binaria Avro più un'intestazione con ID schema. Usa SchemaRegistryClient per ottenere gli ID schema dal contenuto dello schema o viceversa.

Formato dei messaggi

Lo stesso formato viene usato dai serializzatori del Registro di sistema dello schema nei linguaggi di Azure SDK.

I messaggi vengono codificati come segue:

  • 4 byte: Indicatore di formato

    • Attualmente zero per indicare il formato seguente.
  • 32 byte: ID schema

    • Rappresentazione esadecimale UTF-8 del GUID.
    • 32 cifre esadecimale, senza trattini.
    • Stesso formato e ordine di byte della stringa dal servizio Registro schemi.
  • Byte rimanenti: payload Avro (in generale, payload specifico del formato)

    • Codifica binaria Avro
    • NOT Avro Object Container File, che include lo schema e sconfigge lo scopo di questo serialzer per spostare lo schema dal payload del messaggio e nel Registro di sistema dello schema.

Esempio

Le sezioni seguenti forniscono diversi frammenti di codice che illustrano alcune delle attività più comuni del Registro schemi, tra cui:

Serializzazione

Usare il AvroSerializer.serialize metodo per serializzare i dati dict con lo schema avro specificato. Il metodo usa uno schema registrato in precedenza nel servizio Registro schemi e mantiene lo schema memorizzato nella cache per un utilizzo futuro della serializzazione. È anche possibile evitare di preregistrare lo schema nel servizio e registrarlo automaticamente con il serialize metodo creando un'istanza di con l'argomento AvroSerializerauto_register_schemas=Trueparola chiave .

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
name = "example.avro.User"
format = "Avro"

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_register_client.register(group_name, name, definition, format)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

with serializer:
    dict_data = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    encoded_bytes = serializer.serialize(dict_data, schema=definition)

Deserializzazione

Utilizzare il AvroSerializer.deserialize metodo per deserializzare i byte non elaborati in dati dict. Il metodo recupera automaticamente lo schema dal servizio Registro schemi e mantiene lo schema memorizzato nella cache per un utilizzo futuro della deserializzazione.

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

with serializer:
    encoded_bytes = b'<data_encoded_by_azure_schema_registry_avro_serializer>'
    decoded_data = serializer.deserialize(encoded_bytes)

Integrazione dell'invio di Hub eventi

Integrazione con Hub eventi per inviare dati avro dict serializzati come corpo di EventData.

import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name, auto_register_schemas=True)

eventhub_producer = EventHubProducerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    eventhub_name=eventhub_name
)

with eventhub_producer, avro_serializer:
    event_data_batch = eventhub_producer.create_batch()
    dict_data = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
    payload_bytes = avro_serializer.serialize(dict_data, schema=definition)
    event_data_batch.add(EventData(body=payload_bytes))
    eventhub_producer.send_batch(event_data_batch)

Integrazione di ricezione di Hub eventi

Integrazione con Hub eventi per ricevere EventData e deserializzare byte non elaborati in dati avro dict.

import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

eventhub_consumer = EventHubConsumerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    consumer_group='$Default',
    eventhub_name=eventhub_name,
)

def on_event(partition_context, event):
    bytes_payload = b"".join(b for b in event.body)
    deserialized_data = avro_serializer.deserialize(bytes_payload)

with eventhub_consumer, avro_serializer:
    eventhub_consumer.receive(on_event=on_event, starting_position="-1")

Risoluzione dei problemi

Generale

Il serializzatore Avro del Registro schemi di Azure genera eccezioni definite in Azure Core.

Registrazione

Questa libreria usa la libreria di registrazione standard per la registrazione. Le informazioni di base sulle sessioni HTTP (URL, intestazioni e così via) vengono registrate a livello di INFO.

La registrazione dettagliata a livello di DEBUG, inclusi i corpi di richiesta/risposta e le intestazioni non contrassegnate, può essere abilitata in un client con l'argomento logging_enable :

import sys
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)

# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient("<your-fully_qualified_namespace>", credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
serializer = AvroSerializer(client=schema_registry_client, group_name="<your-group-name>")

Analogamente, logging_enable può abilitare la registrazione dettagliata per una singola operazione, anche quando non è abilitata per il client:

serializer.serialize(dict_data, schema=schema_definition, logging_enable=True)

Passaggi successivi

Altro codice di esempio

Per altri esempi, vedere la directory degli esempi che illustra gli scenari comuni del serializzatore Avro del Registro schemi di Azure.

Contributo

In questo progetto sono benvenuti i contributi e i suggerimenti. Per la maggior parte dei contenuti è necessario sottoscrivere un contratto di licenza di collaborazione (CLA, Contributor License Agreement) che stabilisce che l'utente ha il diritto di concedere, e di fatto concede a Microsoft i diritti d'uso del suo contributo. Per informazioni dettagliate, vedere https://cla.microsoft.com.

Quando si invia una richiesta pull, un bot CLA determina automaticamente se è necessario specificare un contratto CLA e completare la richiesta pull in modo appropriato (ad esempio con un'etichetta e un commento). Seguire le istruzioni specificate dal bot. È sufficiente eseguire questa operazione una sola volta per tutti i repository che usano il contratto CLA Microsoft.

Questo progetto ha adottato il Codice di comportamento di Microsoft per l'open source. Per altre informazioni, vedere Code of Conduct FAQ (Domande frequenti sul Codice di comportamento Open Source di Microsoft) oppure contattare opencode@microsoft.com per eventuali altre domande o commenti.