Biblioteca cliente da Azure EventHubs Checkpoint Store para Python - versão 1.1.4
usando bolhas de armazenamento
A Azure EventHubs Checkpoint Store é utilizada para armazenar postos de controlo durante o processamento de eventos a partir de Hubs de Eventos do Azure.
Este pacote checkpoint Store funciona como um pacote plug-in para EventHubConsumerClient
. Utiliza a Azure Storage Blob como loja persistente para manter os pontos de verificação e informações sobre a propriedade das divisórias.
Por favor, note que esta é uma biblioteca async, para a versão sincronizada da biblioteca de clientes Azure EventHubs Checkpoint Store, consulte a loja de pontos de verificação azure-eventhub.
Código fonte | Pacote (PyPi) | Documentação de | referência da API Documentação | do Azure EventhubsDocumentação de armazenamento azul
Introdução
Pré-requisitos
Python 3.6 ou mais tarde.
Subscrição do Microsoft Azure: Para utilizar os serviços Azure, incluindo Hubs de Eventos do Azure, você precisará de uma subscrição. Se não tiver uma conta Azure existente, poderá inscrever-se para um teste gratuito ou utilizar os benefícios do seu assinante MSDN quando criar uma conta.
Espaço de nomes do Event Hubs com um Centro de Eventos: Para interagir com Hubs de Eventos do Azure, também terá de ter um espaço de nome e um Centro de Eventos disponíveis. Se não estiver familiarizado com a criação de recursos Azure, poderá desejar seguir o guia passo a passo para criar um Centro de Eventos utilizando o portal do Azure. Lá, também pode encontrar instruções detalhadas para a utilização dos modelos Azure CLI, Azure PowerShell ou Azure Resource Manager (ARM) para criar um Centro de Eventos.
Conta de Armazenamento Azure: Você precisará ter uma conta de armazenamento Azure e criar um Armazenamento de Blobs do Azure Bloco de Contentores para armazenar os dados de verificação com bolhas. Pode seguir o guia criando uma conta de armazenamento Azure Block Blob.
Instale o pacote
$ pip install azure-eventhub-checkpointstoreblob-aio
Conceitos-chave
Pontos de verificação
O ponto de verificação é um processo pelo qual os leitores marcam ou confirmam a respetiva posição dentro de uma sequência de eventos da partição. O ponto de verificação é da responsabilidade do consumidor e ocorre numa base por partição dentro de um grupo de consumidores. Esta responsabilidade significa que para cada grupo de consumidores, cada leitor da partição tem de manter um controlo da respetiva posição atual no fluxo de eventos e pode informar o serviço quando considera o fluxo de dados completo. Se um leitor for desligado de uma partição, quando voltar a ser ligado, começa a leitura no ponto de verificação que foi previamente submetido pelo último leitor dessa partição nesse grupo de consumidores. Quando o leitor se conecta, passa a offset para o centro do evento para especificar o local onde começar a ler. Desta forma, pode utilizar o ponto de verificação para marcar os eventos como “concluídos” pelas aplicações a jusante e para fornecer resiliência se ocorrer uma ativação pós-falha entre os leitores em execução em computadores diferentes. É possível devolver dados mais antigos ao especificar um desvio inferior a partir deste processo de ponto de verificação. Através deste mecanismo, o ponto de verificação ativa a resiliência pós-falha e a repetição do fluxo de eventos.
Compensa números de & sequência
Ambos os números de sequência offset & referem-se à posição de um evento dentro de uma partição. Pode pensar neles como um cursor do lado do cliente. O desvio é uma numeração de bytes do evento. O número de compensação/sequência permite que um consumidor de eventos (leitor) especifique um ponto no fluxo de eventos a partir do qual pretende começar a ler eventos. Pode especificar um tempotando de modo a receber eventos apenas após a marca de tempo dada. Os consumidores são responsáveis por armazenarem os seus próprios valores de desvio fora do serviço dos Event Hubs. Dentro de uma partição, cada evento inclui um offset, número de sequência e o tempo de data de quando foi enraedido.
Exemplos
Criar uma EventHubConsumerClient
A maneira mais fácil de criar um EventHubConsumerClient
é usar uma cadeia de ligação.
from azure.eventhub.aio import EventHubConsumerClient
eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_namespace_connection_string", "my_consumer_group", eventhub_name="my_eventhub")
Para outras formas de criar um EventHubConsumerClient
, consulte a biblioteca do EventHubs para obter mais detalhes.
Consumir eventos usando um BlobCheckpointStore
para fazer checkpoint
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
container_name = '<< STORAGE CONTAINER NAME>>'
async def on_event(partition_context, event):
# Put your code here.
await partition_context.update_checkpoint(event) # Or update_checkpoint every N events for better performance.
async def main():
checkpoint_store = BlobCheckpointStore.from_connection_string(
storage_connection_str,
container_name
)
client = EventHubConsumerClient.from_connection_string(
connection_str,
consumer_group,
eventhub_name=eventhub_name,
checkpoint_store=checkpoint_store,
)
async with client:
await client.receive(on_event)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Utilizar BlobCheckpointStore
com uma versão diferente da Azure Storage Service API
Alguns ambientes têm versões diferentes da Azure Storage Service API.
BlobCheckpointStore
por predefinição utiliza a versão API do Serviço de Armazenamento 2019-07-07. Para usá-lo contra uma versão diferente, especifique api_version
quando cria o BlobCheckpointStore
objeto.
Resolução de problemas
Geral
Permitir a exploração madeireira será útil para causar problemas de tiro.
Registo
- Permita ao
azure.eventhub.extensions.checkpointstoreblobaio
madeireiros recolher vestígios da biblioteca. - Permita que
azure.eventhub
o madeiriro recolha vestígios da biblioteca principal azure-eventhub. - Permita que
azure.eventhub.extensions.checkpointstoreblobaio._vendor.storage
o madeireiro recolha vestígios da biblioteca de bolhas de armazenamento azul. - Permita que
uamqp
o madeireiro recolha vestígios da biblioteca uAMQP subjacente. - Ativar o traço do nível de quadro AMQP definindo
logging_enable=True
ao criar o cliente.
Passos seguintes
Mais código de amostra
Começa com as nossas amostras de async da EventHubs Checkpoint Store.
- receive_events_using_checkpoint_store_async.py - EventHubConsumerClient com exemplo de loja de ponto de verificação blob
- receive_events_using_checkpoint_store_storage_api_version_async.py - EventHubConsumerClient com loja de ponto de verificação blob e exemplo de versão de armazenamento
Documentação
A documentação de referência está disponível aqui.
Enviar Comentários
Se encontrar algum bug ou tiver sugestões, por favor preencha um problema na secção Questões do projeto.
Contribuir
Agradecemos todas as contribuições e sugestões para este projeto. A maioria das contribuições requerem que celebre um Contrato de Licença de Contribuição (CLA) no qual se declare que tem o direito de conceder e que, na verdade, concede-nos os direitos para utilizar a sua contribuição. Para mais detalhes, visite https://cla.microsoft.com.
Quando submete um pedido Pull, um bot do CLA determina automaticamente se tem de fornecer um CLA e decorar o PR de forma adequada (por exemplo, etiqueta, comentário). Só tem de seguir as instruções fornecidas pelo bot. Apenas terá de fazer isto uma vez em todos os repositórios com o nosso CLA.
Este projeto adotou o Microsoft Open Source Code of Conduct (Código de Conduta do Microsoft Open Source). Para mais informações consulte o Código de Conduta FAQ ou contacte opencode@microsoft.com com quaisquer perguntas ou comentários adicionais.
Azure SDK for Python