Partilhar via


Biblioteca cliente do Registo Azure Schema Avro Encoder para Python - versão 1.0.0

Azure Schema Registry é um serviço de repositório de esquemas hospedado por Hubs de Eventos do Azure, fornecendo armazenamento de esquemas, versão e gestão. Este pacote fornece um codificadora Avro capaz de codificar e descodificar cargas que contenham identificadores de schema do registo de Schema e conteúdo codificado pela Avro.

Código fonte | Pacote (PyPi) | Documentação de | referência da API Amostras | Alterlog

Exclusão de Responsabilidade

O apoio aos pacotes Azure SDK Python para python 2.7 terminou em 01 de janeiro de 2022. Para mais informações e perguntas, consulte https://github.com/Azure/azure-sdk-for-python/issues/20691

Introdução

Instale o pacote

Instale a biblioteca cliente do Registo Azure Schema Avro Encoder para Python com pip:

pip install azure-schemaregistry-avroencoder

Pré-requisitos:

Para utilizar este pacote, você deve ter:

Autenticar o cliente

A interação com o Schema Registry Avro Encoder começa com uma instância da classe AvroEncoder, que tem o nome de grupo de schema e a classe De Cliente de Registo De Schema . O construtor cliente leva o espaço de nome totalmente qualificado e a credencial de Diretório Ativo Azure:

  • O espaço de nome totalmente qualificado da instância do registo de Schema deve seguir o formato: <yournamespace>.servicebus.windows.net.

  • Uma credencial AAD que implementa o protocolo TokenCredential deve ser passada ao construtor. Existem implementações do TokenCredential protocolo disponível no pacote de identidade azul. Para utilizar os tipos de credenciais fornecidos por, por azure-identityfavor instale a biblioteca cliente Azure Identity para Python com pip:

pip install azure-identity
  • Além disso, para utilizar a API assínc, deve primeiro instalar um transporte de async, como aiohttp:
pip install aiohttp

Criar AvroEncoder utilizando a biblioteca de esquemas de azure:

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

Conceitos-chave

AvroEncoder

Fornece a API para codificar e descodificar da Avro Binary Encoding mais um tipo de conteúdo com identificação de esquema. Usa SchemaRegistryClient para obter IDs de esquema do conteúdo do esquema ou vice-versa.

Modelos de mensagens suportadas

O suporte foi adicionado a certas classes de modelos Azure Messaging SDK para interoperabilidade com o AvroEncoder. Estes modelos são subtipos do MessageType protocolo definido no espaço de azure.schemaregistry.encoder.avroencoder nome. Atualmente, as classes de modelos suportados são:

  • azure.eventhub.EventData para azure-eventhub>=5.9.0

Formato de mensagem

Se um tipo de mensagem que segue o protocolo MessageType for fornecido ao codificar para codificação, definirá as propriedades correspondentes do conteúdo e do tipo de conteúdo, onde:

  • content: Carga útil avro (em geral, carga útil específica do formato)

    • Avro Binary Encoding
    • NÃO Arquivo do Contentor de Objetos Avro, que inclui o esquema e derrota o propósito deste codificante de mover o esquema para fora da carga útil da mensagem e para o registo de esquemas.
  • content type: uma cadeia do formato avro/binary+<schema ID>, onde:

    • avro/binary é o indicador de formato
    • <schema ID> é a representação hexadecimal do GUID, mesmo formato e ordem byte que a cadeia do serviço de registo de Schema.

Se EventData for passado como o tipo de mensagem, as seguintes propriedades serão definidas no EventData objeto:

  • A body propriedade será definida ao valor do conteúdo.
  • A content_type propriedade será definida para o valor do tipo de conteúdo.

Se o tipo de mensagem não for fornecido e, por defeito, o codificar criará o seguinte dict: {"content": <Avro encoded payload>, "content_type": 'avro/binary+<schema ID>' }

Exemplos

As seguintes secções fornecem vários fragmentos de código que cobrem algumas das tarefas mais comuns do Registo de Schema, incluindo:

Encoding

Utilize o AvroEncoder.encode método para codificar o conteúdo com o esquema avro dado. O método utilizará um esquema previamente registado no serviço de registo de Schema e manterá o esquema em cache para futura utilização da codificação. Para evitar o pré-registo do esquema ao serviço e registá-lo automaticamente com o encode método, o argumento auto_register=True da palavra-chave deve ser passado para o AvroEncoder construtor.

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventData

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
name = "example.avro.User"
format = "Avro"

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_registry_client.register_schema(group_name, name, definition, format)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

with encoder:
    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)

    # OR

    message_content_dict = encoder.encode(dict_content, schema=definition)
    event_data = EventData.from_message_content(message_content_dict["content"], message_content_dict["content_type"])

Descodição

Utilize o AvroEncoder.decode método para descodificar o conteúdo codificado por Avro:

  • Passar um objeto de mensagem que é um subtipo do protocolo MessageType.
  • Passando num dict com teclas content(tipo bytes) e content_type (tipo de corda). O método recupera automaticamente o esquema do Serviço de Registo de Schema e mantém o esquema em cache para futura utilização da descodagem.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
encoder = AvroEncoder(client=schema_registry_client)

with encoder:
    # event_data is an EventData object with Avro encoded body
    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
    decoded_content = encoder.decode(event_data)

    # OR 

    encoded_bytes = b'<content_encoded_by_azure_schema_registry_avro_encoder>'
    content_type = 'avro/binary+<schema_id_of_corresponding_schema>'
    content_dict = {"content": encoded_bytes, "content_type": content_type}
    decoded_content = encoder.decode(content_dict)

Centros de Eventos Enviando Integração

Integração com os Centros de Eventos para enviar um EventData objeto com body conteúdo codificado pela Avro e correspondente content_type.

import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)

eventhub_producer = EventHubProducerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    eventhub_name=eventhub_name
)

with eventhub_producer, avro_encoder:
    event_data_batch = eventhub_producer.create_batch()
    dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
    event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
    event_data_batch.add(event_data)
    eventhub_producer.send_batch(event_data_batch)

Centros de Eventos Que Recebem Integração

Integração com os Centros de Eventos para receber um EventData objeto e descodificar o valor codificado body por Avro.

import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

eventhub_consumer = EventHubConsumerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    consumer_group='$Default',
    eventhub_name=eventhub_name,
)

def on_event(partition_context, event):
    decoded_content = avro_encoder.decode(event)

with eventhub_consumer, avro_encoder:
    eventhub_consumer.receive(on_event=on_event, starting_position="-1")

Resolução de problemas

Geral

Azure Schema Registry Avro Encoder levanta exceções definidas no Núcleo Azure se forem encontrados erros ao comunicar com o serviço de registo de Schema. Os erros relacionados com os tipos de conteúdo/conteúdo inválido e os esquemas inválidos serão levantados à medida azure.schemaregistry.encoder.avroencoder.InvalidContentError que, azure.schemaregistry.encoder.avroencoder.InvalidSchemaErrorrespectivamente, __cause__ conterão a exceção subjacente levantada pela biblioteca Apache Avro.

Registo

Esta biblioteca utiliza a biblioteca de registos padrão para registar registos. Informações básicas sobre sessões HTTP (URLs, cabeçalhos, etc.) são registadas ao nível info.

A registo detalhado do nível DEBUG, incluindo os órgãos de pedido/resposta e os cabeçalhos não redigidos, pode ser ativado num cliente com o logging_enable argumento:

import sys
import os
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)

# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

Da mesma forma, logging_enable pode permitir a registo detalhado para uma única operação, mesmo quando não está habilitado para o cliente:

encoder.encode(dict_content, schema=definition, logging_enable=True)

Passos seguintes

Mais código de amostra

Outros exemplos que demonstram cenários comuns do Registo Azure Schema Avro Encoder estão no diretório de amostras .

Contribuir

Agradecemos todas as contribuições e sugestões para este projeto. A maioria das contribuições requerem que celebre um Contrato de Licença de Contribuição (CLA) no qual se declare que tem o direito de conceder e que, na verdade, concede-nos os direitos para utilizar a sua contribuição. Para mais detalhes, visite https://cla.microsoft.com.

Quando submete um pedido Pull, um bot do CLA determina automaticamente se tem de fornecer um CLA e decorar o PR de forma adequada (por exemplo, etiqueta, comentário). Só tem de seguir as instruções fornecidas pelo bot. Apenas terá de fazer isto uma vez em todos os repositórios com o nosso CLA.

Este projeto adotou o Microsoft Open Source Code of Conduct (Código de Conduta do Microsoft Open Source). Para mais informações consulte o Código de Conduta FAQ ou contacte opencode@microsoft.com com quaisquer perguntas ou comentários adicionais.