Biblioteca cliente do Azure Schema Registry Avro Serializer para Python - versão 1.0.0b4
Azure Schema Registry é um serviço de repositório de esquemas hospedado por Hubs de Eventos do Azure, fornecendo armazenamento de esquemas, versão e gestão. Este pacote fornece um serializador Avro capaz de serializar e deserizar cargas que contenham identificadores de esquemas de schema e dados codificados pela Avro.
Código fonte | Pacote (PyPi) | Documentação de | referência da API Amostras | Alterlog
Exclusão de Responsabilidade
O suporte de pacotes Azure SDK Python para Python 2.7 termina a 01 de janeiro de 2022. Para mais informações e perguntas, consulte https://github.com/Azure/azure-sdk-for-python/issues/20691
Introdução
Instale o pacote
Instale a biblioteca de clientes Azure Schema Registry Avro Serializer e a biblioteca de clientes Azure Identity para Python com pip:
pip install azure-schemaregistry-avroserializer azure-identity
Pré-requisitos:
Para utilizar este pacote, você deve ter:
- Subscrição do Azure - Criar uma conta gratuita
- Registo Azure Schema
- Python 2.7, 3.6 ou mais tarde - Instalar Python
Autenticar o cliente
A interação com o Schema Registry Avro Serializer começa com uma instância da classe AvroSerializer, que tem o nome de grupo de schema e a classe Schema Registry Client . O construtor cliente leva o espaço de nome totalmente qualificado e a credencial de Diretório Ativo Azure:
O espaço de nome totalmente qualificado da instância do registo de Schema deve seguir o formato:
<yournamespace>.servicebus.windows.net
.Uma credencial AAD que implementa o protocolo TokenCredential deve ser passada ao construtor. Existem implementações do
TokenCredential
protocolo disponível no pacote de identidade azul. Para utilizar os tipos de credenciais fornecidos por, porazure-identity
favor instale a biblioteca cliente Azure Identity para Python com pip:
pip install azure-identity
- Além disso, para utilizar a API async suportada no Python 3.6+, deve primeiro instalar um transporte de async, como aiohttp:
pip install aiohttp
Criar o AvroSerializer utilizando a biblioteca de esquemas azure::
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = '<< FULLY QUALIFIED NAMESPACE OF THE SCHEMA REGISTRY >>'
group_name = '<< GROUP NAME OF THE SCHEMA >>'
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)
Conceitos-chave
AvroSerializador
Fornece a API para serializar e deserializar da Avro Binary Encoding mais um cabeçalho com identificação de esquema. Usa SchemaRegistryClient para obter IDs de esquema do conteúdo do esquema ou vice-versa.
Formato de mensagem
O mesmo formato é utilizado por serializadores de registo de esquemas em línguas Azure SDK.
As mensagens são codificadas da seguinte forma:
4 bytes: Indicador de Formato
- Atualmente sempre zero para indicar o formato abaixo.
32 bytes: Schema ID
- UTF-8 representação hexadecimal da GUID.
- 32 dígitos de hexáxinho, sem hífenes.
- O mesmo formato e encomenda de byte como string do serviço de registo de Schema.
Bytes restantes: Carga útil avro (em geral, carga útil específica do formato)
- Avro Binary Encoding
- NÃO Arquivo de Contentores de Objetos Avro, que inclui o esquema e derrota o propósito deste serialzer de mover o esquema para fora da carga útil da mensagem e para o registo de esquemas.
Exemplos
As seguintes secções fornecem vários fragmentos de código que cobrem algumas das tarefas mais comuns do Registo de Schema, incluindo:
- Serialização
- Deserialização
- Centros de Eventos Enviando Integração
- Centros de Eventos Que Recebem Integração
Serialização
Utilize AvroSerializer.serialize
o método para serializar dados dict com o esquema avro dado.
O método utilizaria um esquema previamente registado no serviço de registo de Schema e manteria o esquema em cache para futura utilização da serialização. Também é possível evitar o pré-registo do esquema ao serviço e registar-se automaticamente com o serialize
método, através da AvroSerializer
instantânea com o argumento auto_register_schemas=True
da palavra-chave .
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
name = "example.avro.User"
format = "Avro"
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_register_client.register(group_name, name, definition, format)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)
with serializer:
dict_data = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
encoded_bytes = serializer.serialize(dict_data, schema=definition)
Deserialização
Utilize AvroSerializer.deserialize
o método para deserializar bytes crus em dados dict.
O método recupera automaticamente o esquema do Serviço de Registo de Schema e mantém o esquema em cache para futura utilização da deserialização.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)
with serializer:
encoded_bytes = b'<data_encoded_by_azure_schema_registry_avro_serializer>'
decoded_data = serializer.deserialize(encoded_bytes)
Centros de Eventos Enviando Integração
Integração com os Centros de Eventos para enviar dados avro dict serializados como o corpo do EventData.
import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name, auto_register_schemas=True)
eventhub_producer = EventHubProducerClient.from_connection_string(
conn_str=eventhub_connection_str,
eventhub_name=eventhub_name
)
with eventhub_producer, avro_serializer:
event_data_batch = eventhub_producer.create_batch()
dict_data = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
payload_bytes = avro_serializer.serialize(dict_data, schema=definition)
event_data_batch.add(EventData(body=payload_bytes))
eventhub_producer.send_batch(event_data_batch)
Centros de Eventos Que Recebem Integração
Integração com os Centros de Eventos para receber EventData
e deserizar bytes crus em dados avro dict.
import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)
eventhub_consumer = EventHubConsumerClient.from_connection_string(
conn_str=eventhub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name,
)
def on_event(partition_context, event):
bytes_payload = b"".join(b for b in event.body)
deserialized_data = avro_serializer.deserialize(bytes_payload)
with eventhub_consumer, avro_serializer:
eventhub_consumer.receive(on_event=on_event, starting_position="-1")
Resolução de problemas
Geral
Azure Schema Registry Avro Serializer levanta exceções definidas em Azure Core.
Registo
Esta biblioteca utiliza a biblioteca de registos padrão para registar registos. Informações básicas sobre sessões HTTP (URLs, cabeçalhos, etc.) são registadas ao nível info.
A registo detalhado do nível DEBUG, incluindo os órgãos de pedido/resposta e os cabeçalhos não redigidos, pode ser ativado num cliente com o logging_enable
argumento:
import sys
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)
# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient("<your-fully_qualified_namespace>", credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
serializer = AvroSerializer(client=schema_registry_client, group_name="<your-group-name>")
Da mesma forma, logging_enable
pode permitir a registo detalhado para uma única operação, mesmo quando não está habilitado para o cliente:
serializer.serialize(dict_data, schema=schema_definition, logging_enable=True)
Passos seguintes
Mais código de amostra
Por favor, encontre mais exemplos no diretório de amostras que demonstre cenários comuns de Azure Schema Registry Avro Serializer.
Contribuir
Agradecemos todas as contribuições e sugestões para este projeto. A maioria das contribuições requerem que celebre um Contrato de Licença de Contribuição (CLA) no qual se declare que tem o direito de conceder e que, na verdade, concede-nos os direitos para utilizar a sua contribuição. Para mais detalhes, visite https://cla.microsoft.com.
Quando submete um pedido Pull, um bot do CLA determina automaticamente se tem de fornecer um CLA e decorar o PR de forma adequada (por exemplo, etiqueta, comentário). Só tem de seguir as instruções fornecidas pelo bot. Apenas terá de fazer isto uma vez em todos os repositórios com o nosso CLA.
Este projeto adotou o Microsoft Open Source Code of Conduct (Código de Conduta do Microsoft Open Source). Para mais informações consulte o Código de Conduta FAQ ou contacte opencode@microsoft.com com quaisquer perguntas ou comentários adicionais.
Azure SDK for Python