Condividi tramite


Libreria client del Registro schema di Azure Avro Encoder per Python - versione 1.0.0

Registro schemi di Azure è un servizio repository di schemi ospitato da Hub eventi di Azure, fornendo archiviazione dello schema, controllo delle versioni e gestione. Questo pacchetto fornisce un codificatore Avro in grado di codificare e decodificare payload contenenti identificatori dello schema del Registro di sistema schema e contenuto con codifica Avro.

Codice | sorgente Pacchetto (PyPi) | Documentazione di | riferimento sulle API Campioni | Changelog

Dichiarazione di non responsabilità

Il supporto dei pacchetti Python di Azure SDK per Python 2.7 è terminato 01 gennaio 2022. Per altre informazioni e domande, vedere https://github.com/Azure/azure-sdk-for-python/issues/20691

Introduzione

Installare il pacchetto

Installare la libreria client del Registro schemi di Azure Avro Encoder per Python con pip:

pip install azure-schemaregistry-avroencoder

Prerequisiti:

Per usare questo pacchetto, è necessario disporre di:

Autenticare il client

L'interazione con il codificatore avro del Registro schemi inizia con un'istanza della classe AvroEncoder, che accetta il nome del gruppo di schemi e la classe Client del Registro schemi. Il costruttore client accetta lo spazio dei nomi completo di Hub eventi e le credenziali di Azure Active Directory:

  • Lo spazio dei nomi completo dell'istanza del Registro schemi deve seguire il formato : <yournamespace>.servicebus.windows.net.

  • Le credenziali AAD che implementano il protocollo TokenCredential devono essere passate al costruttore. Esistono implementazioni del TokenCredential protocollo disponibili nel pacchetto azure-identity. Per usare i tipi di credenziali forniti da azure-identity, installare la libreria client di Identità di Azure per Python con pip:

pip install azure-identity
  • Inoltre, per usare l'API asincrona, è prima necessario installare un trasporto asincrono, ad esempio aiohttp:
pip install aiohttp

Creare AvroEncoder usando la libreria azure-schemaregistry:

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

Concetti chiave

AvroEncoder

Fornisce l'API per codificare e decodificare da Codifica binaria Avro e un tipo di contenuto con ID schema. Usa SchemaRegistryClient per ottenere gli ID dello schema dal contenuto dello schema o viceversa.

Modelli di messaggio supportati

Il supporto è stato aggiunto a determinate classi di modello di Azure Messaging SDK per l'interoperabilità AvroEncodercon . Questi modelli sono sottotipi del MessageType protocollo definito nello azure.schemaregistry.encoder.avroencoder spazio dei nomi. Attualmente, le classi di modello supportate sono:

  • azure.eventhub.EventData per azure-eventhub>=5.9.0

Formato dei messaggi

Se viene fornito un tipo di messaggio che segue il protocollo MessageType al codificatore per la codifica, verrà impostato il contenuto e le proprietà del tipo di contenuto corrispondenti, dove:

  • content: payload avro (in generale, payload specifico del formato)

    • Codifica binaria Avro
    • NOT Avro Object Container File, che include lo schema e sconfigge lo scopo di questo codificatore di spostare lo schema fuori dal payload del messaggio e nel Registro di sistema dello schema.
  • content type: stringa del formato avro/binary+<schema ID>, dove:

    • avro/binary è l'indicatore di formato
    • <schema ID> è la rappresentazione esadecimale del GUID, lo stesso formato e l'ordine di byte della stringa dal servizio Registro schemi.

Se EventData viene passato come tipo di messaggio, le proprietà seguenti verranno impostate sull'oggetto EventData :

  • La body proprietà verrà impostata sul valore del contenuto.
  • La content_type proprietà verrà impostata sul valore del tipo di contenuto.

Se il tipo di messaggio non viene fornito e, per impostazione predefinita, il codificatore creerà la dict seguente: {"content": <Avro encoded payload>, "content_type": 'avro/binary+<schema ID>' }

Esempio

Le sezioni seguenti forniscono diversi frammenti di codice che coprono alcune delle attività del Registro schemi più comuni, tra cui:

Codifica

Usare il AvroEncoder.encode metodo per codificare il contenuto con lo schema Avro specificato. Il metodo userà uno schema registrato in precedenza nel servizio Registro schemi e manterrà memorizzato nella cache lo schema per un utilizzo futuro della codifica. Per evitare di registrare lo schema nel servizio e registrarlo automaticamente con il encode metodo, l'argomento auto_register=True parola chiave deve essere passato al AvroEncoder costruttore.

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventData

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
name = "example.avro.User"
format = "Avro"

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_registry_client.register_schema(group_name, name, definition, format)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

with encoder:
    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)

    # OR

    message_content_dict = encoder.encode(dict_content, schema=definition)
    event_data = EventData.from_message_content(message_content_dict["content"], message_content_dict["content_type"])

Decodifica

Usare il metodo per decodificare il AvroEncoder.decode contenuto con codifica Avro in base a:

  • Passaggio di un oggetto messaggio che è un sottotipo del protocollo MessageType.
  • Passando una dict con chiavi content(byte di tipo) e content_type (stringa di tipo). Il metodo recupera automaticamente lo schema dal servizio Registro schemi e mantiene memorizzato nella cache lo schema per l'utilizzo futuro della decodifica.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
encoder = AvroEncoder(client=schema_registry_client)

with encoder:
    # event_data is an EventData object with Avro encoded body
    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
    decoded_content = encoder.decode(event_data)

    # OR 

    encoded_bytes = b'<content_encoded_by_azure_schema_registry_avro_encoder>'
    content_type = 'avro/binary+<schema_id_of_corresponding_schema>'
    content_dict = {"content": encoded_bytes, "content_type": content_type}
    decoded_content = encoder.decode(content_dict)

Hub eventi che inviano l'integrazione

Integrazione con Hub eventi per inviare un EventData oggetto con body impostato sul contenuto con codifica Avro e corrispondente content_type.

import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)

eventhub_producer = EventHubProducerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    eventhub_name=eventhub_name
)

with eventhub_producer, avro_encoder:
    event_data_batch = eventhub_producer.create_batch()
    dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
    event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
    event_data_batch.add(event_data)
    eventhub_producer.send_batch(event_data_batch)

Hub eventi che ricevono l'integrazione

Integrazione con Hub eventi per ricevere un EventData oggetto e decodificare il valore con codifica body Avro.

import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

eventhub_consumer = EventHubConsumerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    consumer_group='$Default',
    eventhub_name=eventhub_name,
)

def on_event(partition_context, event):
    decoded_content = avro_encoder.decode(event)

with eventhub_consumer, avro_encoder:
    eventhub_consumer.receive(on_event=on_event, starting_position="-1")

Risoluzione dei problemi

Generale

Azure Schema Registry Avro Encoder genera eccezioni definite in Azure Core se vengono rilevati errori durante la comunicazione con il servizio Registro schemi. Gli errori relativi ai tipi di contenuto/contenuto non validi e agli schemi non validi verranno generati rispettivamente come azure.schemaregistry.encoder.avroencoder.InvalidContentError e azure.schemaregistry.encoder.avroencoder.InvalidSchemaError, dove __cause__ conterrà l'eccezione sottostante generata dalla libreria Apache Avro.

Registrazione

Questa libreria usa la libreria di registrazione standard per la registrazione. Le informazioni di base sulle sessioni HTTP (URL, intestazioni e così via) vengono registrate a livello di INFO.

La registrazione dettagliata del livello DEBUG, inclusi i corpi di richiesta/risposta e le intestazioni non attendibili, può essere abilitata in un client con l'argomento logging_enable :

import sys
import os
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)

# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

Analogamente, logging_enable può abilitare la registrazione dettagliata per una singola operazione, anche quando non è abilitata per il client:

encoder.encode(dict_content, schema=definition, logging_enable=True)

Passaggi successivi

Altro codice di esempio

Altri esempi che illustrano gli scenari comuni del Registro di sistema di Azure Avro Encoder si trovano nella directory degli esempi .

Contributo

In questo progetto sono benvenuti i contributi e i suggerimenti. Per la maggior parte dei contenuti è necessario sottoscrivere un contratto di licenza di collaborazione (CLA, Contributor License Agreement) che stabilisce che l'utente ha il diritto di concedere, e di fatto concede a Microsoft i diritti d'uso del suo contributo. Per informazioni dettagliate, vedere https://cla.microsoft.com.

Quando si invia una richiesta pull, un bot CLA determina automaticamente se è necessario specificare un contratto CLA e completare la richiesta pull in modo appropriato (ad esempio con un'etichetta e un commento). Seguire le istruzioni specificate dal bot. È sufficiente eseguire questa operazione una sola volta per tutti i repository che usano il contratto CLA Microsoft.

Questo progetto ha adottato il Codice di comportamento di Microsoft per l'open source. Per altre informazioni, vedere Code of Conduct FAQ (Domande frequenti sul Codice di comportamento Open Source di Microsoft) oppure contattare opencode@microsoft.com per eventuali altre domande o commenti.