Biblioteca de clientes do Avro Serializer do Registro de Esquema do Azure para Python – versão 1.0.0b4
O Registro de Esquema do Azure é um serviço de repositório de esquema hospedado por Hubs de Eventos do Azure, fornecendo armazenamento de esquema, controle de versão e gerenciamento. Esse pacote fornece um serializador Avro capaz de serializar e desserializar cargas que contêm identificadores de esquema do Registro de Esquema e dados codificados em Avro.
Código-fonte | Pacote (PyPi) | Documentação | de referência da APIAmostras | Changelog
Aviso de isenção de responsabilidade
O suporte a pacotes do Python do SDK do Azure para o Python 2.7 vai terminar em 1º de janeiro de 2022. Para obter mais informações e tirar dúvidas, consulte https://github.com/Azure/azure-sdk-for-python/issues/20691
Introdução
Instalar o pacote
Instale a biblioteca de clientes do Avro Serializer do Registro de Esquema do Azure e a biblioteca de clientes do Azure Identity para Python com pip:
pip install azure-schemaregistry-avroserializer azure-identity
Pré-requisitos:
Para usar esse pacote, você deve ter:
- Assinatura do Azure – Criar uma conta gratuita
- Registro de Esquema do Azure
- Python 2.7, 3.6 ou posterior – Instalar o Python
Autenticar o cliente
A interação com o Serializador Avro do Registro de Esquema começa com uma instância da classe AvroSerializer, que usa o nome do grupo de esquemas e a classe de Cliente do Registro de Esquema . O construtor do cliente usa o namespace totalmente qualificado dos Hubs de Eventos e a credencial do Azure Active Directory:
O namespace totalmente qualificado da instância do Registro de Esquema deve seguir o formato:
<yournamespace>.servicebus.windows.net
.Uma credencial do AAD que implementa o protocolo TokenCredential deve ser passada para o construtor. Há implementações do
TokenCredential
protocolo disponível no pacote azure-identity. Para usar os tipos de credencial fornecidos peloazure-identity
, instale a biblioteca de clientes da Identidade do Azure para Python com pip:
pip install azure-identity
- Além disso, para usar a API assíncrona com suporte no Python 3.6+, primeiro você deve instalar um transporte assíncrono, como aiohttp:
pip install aiohttp
Crie o AvroSerializer usando a biblioteca azure-schemaregistry:
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = '<< FULLY QUALIFIED NAMESPACE OF THE SCHEMA REGISTRY >>'
group_name = '<< GROUP NAME OF THE SCHEMA >>'
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)
Principais conceitos
AvroSerializer
Fornece a API para serializar e desserializar da Codificação Binária do Avro mais um cabeçalho com ID de esquema. Usa SchemaRegistryClient para obter IDs de esquema do conteúdo do esquema ou vice-versa.
Formato da mensagem
O mesmo formato é usado por serializadores de registro de esquema em linguagens do SDK do Azure.
As mensagens são codificadas da seguinte maneira:
4 bytes: Formatar Indicador
- Atualmente, sempre zero para indicar o formato abaixo.
32 bytes: ID do esquema
- Representação hexadecimal UTF-8 do GUID.
- 32 dígitos hexadecimais, sem hifens.
- Mesmo formato e ordem de bytes que a cadeia de caracteres do serviço registro de esquema.
Bytes restantes: conteúdo Avro (em geral, conteúdo específico do formato)
- Codificação binária do Avro
- NOT Avro Object Container File, que inclui o esquema e derrota a finalidade desse serialzer de mover o esquema para fora do conteúdo da mensagem e para o registro de esquema.
Exemplos
As seções a seguir fornecem vários snippets de código que abrangem algumas das tarefas mais comuns do Registro de Esquema, incluindo:
- Serialização
- Desserialização
- Integração de envio de Hubs de Eventos
- Integração de recebimento dos Hubs de Eventos
Serialização
Use AvroSerializer.serialize
o método para serializar dados de ditado com o esquema avro fornecido.
O método usaria um esquema registrado anteriormente no serviço registro de esquema e manteria o esquema armazenado em cache para uso futuro de serialização. Também é possível evitar o pré-registro do esquema no serviço e registrar-se automaticamente com o serialize
método instanciando o AvroSerializer
com o argumento auto_register_schemas=True
de palavra-chave .
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
name = "example.avro.User"
format = "Avro"
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_register_client.register(group_name, name, definition, format)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)
with serializer:
dict_data = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
encoded_bytes = serializer.serialize(dict_data, schema=definition)
Desserialização
Use AvroSerializer.deserialize
o método para desserializar bytes brutos em dados de ditado.
O método recupera automaticamente o esquema do Serviço de Registro de Esquema e mantém o esquema armazenado em cache para uso futuro de desserialização.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)
with serializer:
encoded_bytes = b'<data_encoded_by_azure_schema_registry_avro_serializer>'
decoded_data = serializer.deserialize(encoded_bytes)
Integração de envio de Hubs de Eventos
Integração com os Hubs de Eventos para enviar dados de dict avro serializados como o corpo de EventData.
import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name, auto_register_schemas=True)
eventhub_producer = EventHubProducerClient.from_connection_string(
conn_str=eventhub_connection_str,
eventhub_name=eventhub_name
)
with eventhub_producer, avro_serializer:
event_data_batch = eventhub_producer.create_batch()
dict_data = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
payload_bytes = avro_serializer.serialize(dict_data, schema=definition)
event_data_batch.add(EventData(body=payload_bytes))
eventhub_producer.send_batch(event_data_batch)
Integração de recebimento dos Hubs de Eventos
Integração com os Hubs de Eventos para receber EventData
e desserializar bytes brutos em dados de dict avro.
import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)
eventhub_consumer = EventHubConsumerClient.from_connection_string(
conn_str=eventhub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name,
)
def on_event(partition_context, event):
bytes_payload = b"".join(b for b in event.body)
deserialized_data = avro_serializer.deserialize(bytes_payload)
with eventhub_consumer, avro_serializer:
eventhub_consumer.receive(on_event=on_event, starting_position="-1")
Solução de problemas
Geral
O Serializador Avro do Registro de Esquema do Azure gera exceções definidas no Azure Core.
Log
Essa biblioteca usa a biblioteca de log padrão para registro em log. As informações básicas sobre sessões HTTP (URLs, cabeçalhos etc.) são registradas no nível info.
O log detalhado no nível de DEBUG, incluindo corpos de solicitação/resposta e cabeçalhos não redigidos, pode ser habilitado em um cliente com o logging_enable
argumento :
import sys
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)
# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient("<your-fully_qualified_namespace>", credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
serializer = AvroSerializer(client=schema_registry_client, group_name="<your-group-name>")
Da mesma forma, logging_enable
pode habilitar o log detalhado para uma operação individual, mesmo quando ela não está habilitada para o cliente:
serializer.serialize(dict_data, schema=schema_definition, logging_enable=True)
Próximas etapas
Mais códigos de exemplo
Encontre mais exemplos no diretório de exemplos que demonstram cenários comuns do Serializador Avro do Registro de Esquema do Azure.
Contribuição
Este projeto aceita contribuições e sugestões. A maioria das contribuições exige que você concorde com um CLA (Contrato de Licença do Colaborador) declarando que você tem o direito de nos conceder, e de fato concede, os direitos de usar sua contribuição. Para obter detalhes, visite https://cla.microsoft.com.
Quando você envia uma solicitação de pull, um bot do CLA determina automaticamente se você precisa fornecer um CLA e preencher a PR corretamente (por exemplo, rótulo, comentário). Basta seguir as instruções fornecidas pelo bot. Você só precisará fazer isso uma vez em todos os repositórios que usam nosso CLA.
Este projeto adotou o Código de Conduta de Software Livre da Microsoft. Para obter mais informações, confira as Perguntas frequentes sobre o Código de Conduta ou contate opencode@microsoft.com para enviar outras perguntas ou comentários.
Azure SDK for Python