Compartilhar via


Biblioteca de clientes do Repositório de Ponto de Verificação do Azure EventHubs para Python – versão 1.1.4

usando Blobs de Armazenamento

O Repositório de Ponto de Verificação do Azure EventHubs é usado para armazenar pontos de verificação durante o processamento de eventos de Hubs de Eventos do Azure. Esse pacote do Checkpoint Store funciona como um pacote de plug-in para EventHubConsumerClient. Ele usa o Blob de Armazenamento do Azure como o repositório persistente para manter pontos de verificação e informações de propriedade de partição.

Observe que esta é uma biblioteca assíncrona, para ver a versão de sincronização da biblioteca de clientes do Repositório de Pontos de Verificação do Azure EventHubs, consulte azure-eventhub-checkpointstoreblob.

Código-fonte | Pacote (PyPi) | Documentação | de referência da API Documentação | do Azure EventhubsDocumentação do Armazenamento do Azure

Introdução

Pré-requisitos

  • Python 3.6 ou posterior.

  • Assinatura do Microsoft Azure: Para usar os serviços do Azure, incluindo Hubs de Eventos do Azure, você precisará de uma assinatura. Se você não tiver uma conta existente do Azure, poderá se inscrever para uma avaliação gratuita ou usar os benefícios do assinante do MSDN ao criar uma conta.

  • Namespace dos Hubs de Eventos com um Hub de Eventos: Para interagir com Hubs de Eventos do Azure, você também precisará ter um namespace e o Hub de Eventos disponíveis. Se você não estiver familiarizado com a criação de recursos do Azure, convém seguir o guia passo a passo para criar um Hub de Eventos usando o portal do Azure. Lá, você também pode encontrar instruções detalhadas para usar os modelos da CLI do Azure, Azure PowerShell ou ARM (Azure Resource Manager) para criar um Hub de Eventos.

  • Conta de Armazenamento do Azure: Você precisará ter uma Conta de Armazenamento do Azure e criar uma Armazenamento de Blobs do Azure Bloquear Contêiner para armazenar os dados de ponto de verificação com blobs. Você pode seguir o guia criando uma Conta de Armazenamento de Blobs de Blocos do Azure.

Instalar o pacote

$ pip install azure-eventhub-checkpointstoreblob-aio

Principais conceitos

Definindo o ponto de verificação

Ponto de verificação é um processo pelo qual os leitores marcam ou confirmam sua posição em uma sequência de eventos da partição. O ponto de verificação é responsabilidade do consumidor e ocorre em uma base por partição dentro de um grupo de consumidores. Essa responsabilidade significa que, para cada grupo de consumidores, cada leitor de partição deve manter o controle da sua posição atual no fluxo de eventos e pode informar o serviço quando considerar o fluxo de dados concluído. Se um leitor se desconecta de uma partição, ao se reconectar, ele começa a ler no ponto de verificação que foi anteriormente enviado pelo último leitor dessa partição nesse grupo de consumidores. Quando o leitor se conecta, ele passa esse deslocamento para o hub de eventos para especificar o local para começar a ler. Assim, você pode usar o ponto de verificação para marcar eventos como "concluídos" por aplicativos de downstream e oferecer resiliência caso ocorra um failover entre leitores em execução em máquinas diferentes. É possível retornar aos dados mais antigos, especificando um deslocamento inferior desse processo de ponto de verificação. Por meio desse mecanismo, o ponto de verificação permite resiliência de failover e reprodução de fluxo de eventos.

Números de sequência de deslocamentos &

Ambos os números de sequência de deslocamento & referem-se à posição de um evento dentro de uma partição. Você pode pensar neles como um cursor do lado do cliente. O deslocamento é uma numeração em bytes do evento. O número de deslocamento/sequência permite que um consumidor de evento (leitor) especifique um ponto no fluxo de eventos do qual deseja começar a ler eventos. Você pode especificar um carimbo de data/hora de modo que receba eventos enfileirados somente após o carimbo de data/hora fornecido. Os consumidores são responsáveis por armazenar seu próprios valores de deslocamento fora do serviço de Hubs de Evento. Dentro de uma partição, cada evento inclui um deslocamento, um número de sequência e o carimbo de data/hora de quando foi enfileirado.

Exemplos

Criar uma EventHubConsumerClient

A maneira mais fácil de criar um EventHubConsumerClient é usar uma cadeia de conexão.

from azure.eventhub.aio import EventHubConsumerClient
eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_namespace_connection_string", "my_consumer_group", eventhub_name="my_eventhub")

Para obter outras maneiras de criar um EventHubConsumerClient, consulte a biblioteca EventHubs para obter mais detalhes.

Consumir eventos usando um BlobCheckpointStore para fazer ponto de verificação

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
container_name = '<< STORAGE CONTAINER NAME>>'

async def on_event(partition_context, event):
    # Put your code here.
    await partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(
        storage_connection_str,
        container_name
    )
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,
    )

    async with client:
        await client.receive(on_event)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Usar BlobCheckpointStore com uma versão diferente da API do Serviço de Armazenamento do Azure

Alguns ambientes têm versões diferentes da API do Serviço de Armazenamento do Azure. BlobCheckpointStore por padrão, usa a API do Serviço de Armazenamento versão 2019-07-07. Para usá-lo em uma versão diferente, especifique api_version quando você criar o BlobCheckpointStore objeto.

Solução de problemas

Geral

Habilitar o registro em log será útil para solucionar problemas.

Log

  • Habilite azure.eventhub.extensions.checkpointstoreblobaio o agente para coletar rastreamentos da biblioteca.
  • Habilite azure.eventhub o agente para coletar rastreamentos da biblioteca principal do azure-eventhub.
  • Habilite o azure.eventhub.extensions.checkpointstoreblobaio._vendor.storage agente para coletar rastreamentos da biblioteca de blobs de armazenamento do Azure.
  • Habilite uamqp o agente para coletar rastreamentos da biblioteca uAMQP subjacente.
  • Habilite o rastreamento no nível do quadro AMQP definindo logging_enable=True ao criar o cliente.

Próximas etapas

Mais códigos de exemplo

Introdução aos nossos exemplos assíncronos do Repositório de Ponto de Verificação do EventHubs.

Documentação

A documentação de referência está disponível aqui.

Forneça comentários

Se você encontrar bugs ou tiver sugestões, registre um problema na seção Problemas do projeto.

Contribuição

Este projeto aceita contribuições e sugestões. A maioria das contribuições exige que você concorde com um CLA (Contrato de Licença do Colaborador) declarando que você tem o direito de nos conceder, e de fato concede, os direitos de usar sua contribuição. Para obter detalhes, visite https://cla.microsoft.com.

Quando você envia uma solicitação de pull, um bot do CLA determina automaticamente se você precisa fornecer um CLA e preencher a PR corretamente (por exemplo, rótulo, comentário). Basta seguir as instruções fornecidas pelo bot. Você só precisará fazer isso uma vez em todos os repositórios que usam nosso CLA.

Este projeto adotou o Código de Conduta de Software Livre da Microsoft. Para obter mais informações, confira as Perguntas frequentes sobre o Código de Conduta ou contate opencode@microsoft.com para enviar outras perguntas ou comentários.

Impressões