Biblioteca cliente do Registo Azure Schema Avro Encoder para Python - versão 1.0.0
Azure Schema Registry é um serviço de repositório de esquemas hospedado por Hubs de Eventos do Azure, fornecendo armazenamento de esquemas, versão e gestão. Este pacote fornece um codificadora Avro capaz de codificar e descodificar cargas que contenham identificadores de schema do registo de Schema e conteúdo codificado pela Avro.
Código fonte | Pacote (PyPi) | Documentação de | referência da API Amostras | Alterlog
Exclusão de Responsabilidade
O apoio aos pacotes Azure SDK Python para python 2.7 terminou em 01 de janeiro de 2022. Para mais informações e perguntas, consulte https://github.com/Azure/azure-sdk-for-python/issues/20691
Introdução
Instale o pacote
Instale a biblioteca cliente do Registo Azure Schema Avro Encoder para Python com pip:
pip install azure-schemaregistry-avroencoder
Pré-requisitos:
Para utilizar este pacote, você deve ter:
- Subscrição do Azure - Criar uma conta gratuita
- Registo - Azure SchemaAqui está o guia de arranque rápido para criar um grupo de registo de Schema usando o portal do Azure.
- Python 3.6 ou mais tarde - Instalar Python
Autenticar o cliente
A interação com o Schema Registry Avro Encoder começa com uma instância da classe AvroEncoder, que tem o nome de grupo de schema e a classe De Cliente de Registo De Schema . O construtor cliente leva o espaço de nome totalmente qualificado e a credencial de Diretório Ativo Azure:
O espaço de nome totalmente qualificado da instância do registo de Schema deve seguir o formato:
<yournamespace>.servicebus.windows.net
.Uma credencial AAD que implementa o protocolo TokenCredential deve ser passada ao construtor. Existem implementações do
TokenCredential
protocolo disponível no pacote de identidade azul. Para utilizar os tipos de credenciais fornecidos por, porazure-identity
favor instale a biblioteca cliente Azure Identity para Python com pip:
pip install azure-identity
- Além disso, para utilizar a API assínc, deve primeiro instalar um transporte de async, como aiohttp:
pip install aiohttp
Criar AvroEncoder utilizando a biblioteca de esquemas de azure:
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
Conceitos-chave
AvroEncoder
Fornece a API para codificar e descodificar da Avro Binary Encoding mais um tipo de conteúdo com identificação de esquema. Usa SchemaRegistryClient para obter IDs de esquema do conteúdo do esquema ou vice-versa.
Modelos de mensagens suportadas
O suporte foi adicionado a certas classes de modelos Azure Messaging SDK para interoperabilidade com o AvroEncoder
. Estes modelos são subtipos do MessageType
protocolo definido no espaço de azure.schemaregistry.encoder.avroencoder
nome. Atualmente, as classes de modelos suportados são:
azure.eventhub.EventData
paraazure-eventhub>=5.9.0
Formato de mensagem
Se um tipo de mensagem que segue o protocolo MessageType for fornecido ao codificar para codificação, definirá as propriedades correspondentes do conteúdo e do tipo de conteúdo, onde:
content
: Carga útil avro (em geral, carga útil específica do formato)- Avro Binary Encoding
- NÃO Arquivo do Contentor de Objetos Avro, que inclui o esquema e derrota o propósito deste codificante de mover o esquema para fora da carga útil da mensagem e para o registo de esquemas.
content type
: uma cadeia do formatoavro/binary+<schema ID>
, onde:avro/binary
é o indicador de formato<schema ID>
é a representação hexadecimal do GUID, mesmo formato e ordem byte que a cadeia do serviço de registo de Schema.
Se EventData
for passado como o tipo de mensagem, as seguintes propriedades serão definidas no EventData
objeto:
- A
body
propriedade será definida ao valor do conteúdo. - A
content_type
propriedade será definida para o valor do tipo de conteúdo.
Se o tipo de mensagem não for fornecido e, por defeito, o codificar criará o seguinte dict: {"content": <Avro encoded payload>, "content_type": 'avro/binary+<schema ID>' }
Exemplos
As seguintes secções fornecem vários fragmentos de código que cobrem algumas das tarefas mais comuns do Registo de Schema, incluindo:
- Encoding (Codificação)
- Descodição
- Centros de Eventos Enviando Integração
- Centros de Eventos Que Recebem Integração
Encoding
Utilize o AvroEncoder.encode
método para codificar o conteúdo com o esquema avro dado.
O método utilizará um esquema previamente registado no serviço de registo de Schema e manterá o esquema em cache para futura utilização da codificação. Para evitar o pré-registo do esquema ao serviço e registá-lo automaticamente com o encode
método, o argumento auto_register=True
da palavra-chave deve ser passado para o AvroEncoder
construtor.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventData
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
name = "example.avro.User"
format = "Avro"
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_registry_client.register_schema(group_name, name, definition, format)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
with encoder:
dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
# OR
message_content_dict = encoder.encode(dict_content, schema=definition)
event_data = EventData.from_message_content(message_content_dict["content"], message_content_dict["content_type"])
Descodição
Utilize o AvroEncoder.decode
método para descodificar o conteúdo codificado por Avro:
- Passar um objeto de mensagem que é um subtipo do protocolo MessageType.
- Passando num dict com teclas
content
(tipo bytes) econtent_type
(tipo de corda). O método recupera automaticamente o esquema do Serviço de Registo de Schema e mantém o esquema em cache para futura utilização da descodagem.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
encoder = AvroEncoder(client=schema_registry_client)
with encoder:
# event_data is an EventData object with Avro encoded body
dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
decoded_content = encoder.decode(event_data)
# OR
encoded_bytes = b'<content_encoded_by_azure_schema_registry_avro_encoder>'
content_type = 'avro/binary+<schema_id_of_corresponding_schema>'
content_dict = {"content": encoded_bytes, "content_type": content_type}
decoded_content = encoder.decode(content_dict)
Centros de Eventos Enviando Integração
Integração com os Centros de Eventos para enviar um EventData
objeto com body
conteúdo codificado pela Avro e correspondente content_type
.
import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)
eventhub_producer = EventHubProducerClient.from_connection_string(
conn_str=eventhub_connection_str,
eventhub_name=eventhub_name
)
with eventhub_producer, avro_encoder:
event_data_batch = eventhub_producer.create_batch()
dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
event_data_batch.add(event_data)
eventhub_producer.send_batch(event_data_batch)
Centros de Eventos Que Recebem Integração
Integração com os Centros de Eventos para receber um EventData
objeto e descodificar o valor codificado body
por Avro.
import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
eventhub_consumer = EventHubConsumerClient.from_connection_string(
conn_str=eventhub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name,
)
def on_event(partition_context, event):
decoded_content = avro_encoder.decode(event)
with eventhub_consumer, avro_encoder:
eventhub_consumer.receive(on_event=on_event, starting_position="-1")
Resolução de problemas
Geral
Azure Schema Registry Avro Encoder levanta exceções definidas no Núcleo Azure se forem encontrados erros ao comunicar com o serviço de registo de Schema. Os erros relacionados com os tipos de conteúdo/conteúdo inválido e os esquemas inválidos serão levantados à medida azure.schemaregistry.encoder.avroencoder.InvalidContentError
que, azure.schemaregistry.encoder.avroencoder.InvalidSchemaError
respectivamente, __cause__
conterão a exceção subjacente levantada pela biblioteca Apache Avro.
Registo
Esta biblioteca utiliza a biblioteca de registos padrão para registar registos. Informações básicas sobre sessões HTTP (URLs, cabeçalhos, etc.) são registadas ao nível info.
A registo detalhado do nível DEBUG, incluindo os órgãos de pedido/resposta e os cabeçalhos não redigidos, pode ser ativado num cliente com o logging_enable
argumento:
import sys
import os
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)
# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
Da mesma forma, logging_enable
pode permitir a registo detalhado para uma única operação, mesmo quando não está habilitado para o cliente:
encoder.encode(dict_content, schema=definition, logging_enable=True)
Passos seguintes
Mais código de amostra
Outros exemplos que demonstram cenários comuns do Registo Azure Schema Avro Encoder estão no diretório de amostras .
Contribuir
Agradecemos todas as contribuições e sugestões para este projeto. A maioria das contribuições requerem que celebre um Contrato de Licença de Contribuição (CLA) no qual se declare que tem o direito de conceder e que, na verdade, concede-nos os direitos para utilizar a sua contribuição. Para mais detalhes, visite https://cla.microsoft.com.
Quando submete um pedido Pull, um bot do CLA determina automaticamente se tem de fornecer um CLA e decorar o PR de forma adequada (por exemplo, etiqueta, comentário). Só tem de seguir as instruções fornecidas pelo bot. Apenas terá de fazer isto uma vez em todos os repositórios com o nosso CLA.
Este projeto adotou o Microsoft Open Source Code of Conduct (Código de Conduta do Microsoft Open Source). Para mais informações consulte o Código de Conduta FAQ ou contacte opencode@microsoft.com com quaisquer perguntas ou comentários adicionais.
Azure SDK for Python