Partilhar via


Biblioteca cliente da Azure EventHubs Checkpoint Store para Python - versão 1.1.4

usando bolhas de armazenamento

A Azure EventHubs Checkpoint Store é utilizada para armazenar postos de controlo durante o processamento de eventos a partir de Hubs de Eventos do Azure. Este pacote checkpoint Store funciona como um pacote plug-in para EventHubConsumerClient. Utiliza a Azure Storage Blob como loja persistente para manter os pontos de verificação e informações sobre a propriedade das divisórias.

Por favor, note que esta é uma biblioteca async, para a versão sincronizada da biblioteca de clientes Azure EventHubs Checkpoint Store, consulte a loja de pontos de verificação azure-eventhub.

Código fonte | Pacote (PyPi) | Documentação de | referência da API Documentação | do Azure EventhubsDocumentação de armazenamento azul

Introdução

Pré-requisitos

  • Python 3.6 ou mais tarde.

  • Subscrição do Microsoft Azure: Para utilizar os serviços Azure, incluindo Hubs de Eventos do Azure, você precisará de uma subscrição. Se não tiver uma conta Azure existente, poderá inscrever-se para um teste gratuito ou utilizar os benefícios do seu assinante MSDN quando criar uma conta.

  • Espaço de nomes do Event Hubs com um Centro de Eventos: Para interagir com Hubs de Eventos do Azure, também terá de ter um espaço de nome e um Centro de Eventos disponíveis. Se não estiver familiarizado com a criação de recursos Azure, poderá desejar seguir o guia passo a passo para criar um Centro de Eventos utilizando o portal do Azure. Lá, também pode encontrar instruções detalhadas para a utilização dos modelos Azure CLI, Azure PowerShell ou Azure Resource Manager (ARM) para criar um Centro de Eventos.

  • Conta de Armazenamento Azure: Você precisará ter uma conta de armazenamento Azure e criar um Armazenamento de Blobs do Azure Bloco de Contentores para armazenar os dados de verificação com bolhas. Pode seguir o guia criando uma conta de armazenamento Azure Block Blob.

Instale o pacote

$ pip install azure-eventhub-checkpointstoreblob-aio

Conceitos-chave

Pontos de verificação

O ponto de verificação é um processo pelo qual os leitores marcam ou confirmam a respetiva posição dentro de uma sequência de eventos da partição. O ponto de verificação é da responsabilidade do consumidor e ocorre numa base por partição dentro de um grupo de consumidores. Esta responsabilidade significa que para cada grupo de consumidores, cada leitor da partição tem de manter um controlo da respetiva posição atual no fluxo de eventos e pode informar o serviço quando considera o fluxo de dados completo. Se um leitor for desligado de uma partição, quando voltar a ser ligado, começa a leitura no ponto de verificação que foi previamente submetido pelo último leitor dessa partição nesse grupo de consumidores. Quando o leitor se conecta, passa a offset para o centro do evento para especificar o local onde começar a ler. Desta forma, pode utilizar o ponto de verificação para marcar os eventos como “concluídos” pelas aplicações a jusante e para fornecer resiliência se ocorrer uma ativação pós-falha entre os leitores em execução em computadores diferentes. É possível devolver dados mais antigos ao especificar um desvio inferior a partir deste processo de ponto de verificação. Através deste mecanismo, o ponto de verificação ativa a resiliência pós-falha e a repetição do fluxo de eventos.

Compensa números de & sequência

Ambos os números de sequência offset & referem-se à posição de um evento dentro de uma partição. Pode pensar neles como um cursor do lado do cliente. O desvio é uma numeração de bytes do evento. O número de compensação/sequência permite que um consumidor de eventos (leitor) especifique um ponto no fluxo de eventos a partir do qual pretende começar a ler eventos. Pode especificar um tempotando de modo a receber eventos apenas após a marca de tempo dada. Os consumidores são responsáveis por armazenarem os seus próprios valores de desvio fora do serviço dos Event Hubs. Dentro de uma partição, cada evento inclui um offset, número de sequência e o tempo de data de quando foi enraedido.

Exemplos

Criar uma EventHubConsumerClient

A maneira mais fácil de criar um EventHubConsumerClient é usar uma cadeia de ligação.

from azure.eventhub.aio import EventHubConsumerClient
eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_namespace_connection_string", "my_consumer_group", eventhub_name="my_eventhub")

Para outras formas de criar um EventHubConsumerClient, consulte a biblioteca do EventHubs para obter mais detalhes.

Consumir eventos usando um BlobCheckpointStore para fazer checkpoint

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
container_name = '<< STORAGE CONTAINER NAME>>'

async def on_event(partition_context, event):
    # Put your code here.
    await partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(
        storage_connection_str,
        container_name
    )
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,
    )

    async with client:
        await client.receive(on_event)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Utilizar BlobCheckpointStore com uma versão diferente da Azure Storage Service API

Alguns ambientes têm versões diferentes da Azure Storage Service API. BlobCheckpointStore por predefinição utiliza a versão API do Serviço de Armazenamento 2019-07-07. Para usá-lo contra uma versão diferente, especifique api_version quando cria o BlobCheckpointStore objeto.

Resolução de problemas

Geral

Permitir a exploração madeireira será útil para causar problemas de tiro.

Registo

  • Permita ao azure.eventhub.extensions.checkpointstoreblobaio madeireiros recolher vestígios da biblioteca.
  • Permita que azure.eventhub o madeiriro recolha vestígios da biblioteca principal azure-eventhub.
  • Permita que azure.eventhub.extensions.checkpointstoreblobaio._vendor.storage o madeireiro recolha vestígios da biblioteca de bolhas de armazenamento azul.
  • Permita que uamqp o madeireiro recolha vestígios da biblioteca uAMQP subjacente.
  • Ativar o traço do nível de quadro AMQP definindo logging_enable=True ao criar o cliente.

Passos seguintes

Mais código de amostra

Começa com as nossas amostras de async da EventHubs Checkpoint Store.

Documentação

A documentação de referência está disponível aqui.

Enviar Comentários

Se encontrar algum bug ou tiver sugestões, por favor preencha um problema na secção Questões do projeto.

Contribuir

Agradecemos todas as contribuições e sugestões para este projeto. A maioria das contribuições requerem que celebre um Contrato de Licença de Contribuição (CLA) no qual se declare que tem o direito de conceder e que, na verdade, concede-nos os direitos para utilizar a sua contribuição. Para mais detalhes, visite https://cla.microsoft.com.

Quando submete um pedido Pull, um bot do CLA determina automaticamente se tem de fornecer um CLA e decorar o PR de forma adequada (por exemplo, etiqueta, comentário). Só tem de seguir as instruções fornecidas pelo bot. Apenas terá de fazer isto uma vez em todos os repositórios com o nosso CLA.

Este projeto adotou o Microsoft Open Source Code of Conduct (Código de Conduta do Microsoft Open Source). Para mais informações consulte o Código de Conduta FAQ ou contacte opencode@microsoft.com com quaisquer perguntas ou comentários adicionais.

Impressões