Partilhar via


Hubs de Eventos do Azure biblioteca de cliente para Python - versão 5.11.5

Hubs de Eventos do Azure é um serviço de publicação-subscrição altamente dimensionável que pode ingerir milhões de eventos por segundo e transmiti-los em fluxo para vários consumidores. Isto permite-lhe processar e analisar as enormes quantidades de dados produzidos pelos seus dispositivos e aplicações ligados. Assim que os Hubs de Eventos recolherem os dados, pode recuperá-los, transformá-los e armazená-los utilizando qualquer fornecedor de análise em tempo real ou com adaptadores de armazenamento/lotes. Se quiser saber mais sobre Hubs de Eventos do Azure, poderá querer rever: O que são os Hubs de Eventos?

A biblioteca de cliente dos Hubs de Eventos do Azure permite a publicação e o consumo de eventos dos Hubs de Eventos do Azure e pode ser utilizada para:

  • Emitir telemetria sobre a aplicação para fins de diagnóstico e business intelligence.
  • Publicar factos sobre o estado da aplicação que as partes interessadas podem observar e utilizar como acionador para tomar medidas.
  • Observar operações e interações interessantes que ocorrem na sua empresa ou noutro ecossistema, o que permite aos sistemas acoplados interagir sem terem de estar associados.
  • Receber eventos de um ou mais publicadores, transformá-los para satisfazer melhor as necessidades do seu ecossistema e, em seguida, publicar os eventos transformados num novo fluxo para os consumidores observarem.

Código fonte | Pacote (PyPi) | Pacote (Conda) | Documentação | de referência da APIDocumentação do | produtoExemplos

Introdução

Pré-requisitos

  • Python 3.7 ou posterior.

  • Subscrição do Microsoft Azure: Para utilizar os serviços do Azure, incluindo Hubs de Eventos do Azure, precisará de uma subscrição. Se não tiver uma conta do Azure existente, poderá inscrever-se numa avaliação gratuita ou utilizar os benefícios do subscritor do MSDN quando criar uma conta.

  • Espaço de nomes dos Hubs de Eventos com um Hub de Eventos: Para interagir com Hubs de Eventos do Azure, também terá de ter um espaço de nomes e o Hub de Eventos disponíveis. Se não estiver familiarizado com a criação de recursos do Azure, poderá querer seguir o guia passo a passo para criar um Hub de Eventos com o portal do Azure. Aí, também pode encontrar instruções detalhadas para utilizar os modelos da CLI do Azure, Azure PowerShell ou do Azure Resource Manager (ARM) para criar um Hub de Eventos.

Instalar o pacote

Instale a biblioteca de cliente Hubs de Eventos do Azure para Python com pip:

$ pip install azure-eventhub

Autenticar o cliente

A interação com os Hubs de Eventos começa com uma instância da classe EventHubConsumerClient ou EventHubProducerClient. Precisa do nome do anfitrião, da credencial SAS/AAD e do nome do hub de eventos ou de um cadeia de ligação para instanciar o objeto de cliente.

Criar cliente a partir de cadeia de ligação:

Para que a biblioteca de cliente dos Hubs de Eventos interaja com um Hub de Eventos, o meio mais fácil é utilizar um cadeia de ligação, que é criado automaticamente ao criar um espaço de nomes dos Hubs de Eventos. Se não estiver familiarizado com as políticas de acesso partilhado no Azure, poderá querer seguir o guia passo a passo para obter uma cadeia de ligação dos Hubs de Eventos.

  • O from_connection_string método utiliza o cadeia de ligação do formulário Endpoint=sb://<yournamespace>.servicebus.windows.net/;SharedAccessKeyName=<yoursharedaccesskeyname>;SharedAccessKey=<yoursharedaccesskey> e do nome da entidade para a instância do Hub de Eventos. Pode obter o cadeia de ligação do portal do Azure.

Criar cliente com a biblioteca de identidades do azure:

Em alternativa, pode-se utilizar um objeto Credencial para autenticar através do AAD com o pacote azure-identity.

  • Este construtor demonstrado no exemplo associado acima utiliza o nome do anfitrião e o nome da entidade da instância do Hub de Eventos e a credencial que implementa o protocolo TokenCredential . Existem implementações do TokenCredential protocolo disponíveis no pacote azure-identity. O nome do anfitrião é do formato <yournamespace.servicebus.windows.net>.
  • Para utilizar os tipos de credenciais fornecidos pelo azure-identity, instale o pacote: pip install azure-identity
  • Além disso, para utilizar a API assíncrona, primeiro tem de instalar um transporte assíncrono, como aiohttp: pip install aiohttp
  • Ao utilizar o Azure Active Directory, tem de lhe ser atribuída uma função principal que permita o acesso aos Hubs de Eventos, como a função de Proprietário de Dados Hubs de Eventos do Azure. Para obter mais informações sobre como utilizar a autorização do Azure Active Directory com os Hubs de Eventos, veja a documentação associada.

Conceitos-chave

  • Um EventHubProducerClient é uma origem de dados telemétricos, informações de diagnóstico, registos de utilização ou outros dados de registo, como parte de uma solução de dispositivo incorporado, uma aplicação de dispositivo móvel, um título de jogo em execução numa consola ou outro dispositivo, uma solução empresarial baseada em cliente ou servidor ou um site.

  • Um EventHubConsumerClient recolhe essas informações do Hub de Eventos e processa-as. O processamento pode envolver agregação, computação complexa e filtragem. O processamento também pode envolver a distribuição ou armazenamento das informações de forma não processada ou transformada. Os consumidores do Hub de Eventos são muitas vezes peças de infraestrutura de plataforma robustas e de grande escala com capacidades de análise incorporadas, como o Azure Stream Analytics, o Apache Spark ou o Apache Storm.

  • Uma partição é uma sequência ordenada de eventos que é realizada num Hub de Eventos. Hubs de Eventos do Azure fornece a transmissão em fluxo de mensagens através de um padrão de consumidor particionado no qual cada consumidor lê apenas um subconjunto específico, ou partição, do fluxo de mensagens. À medida que chegam novos eventos, eles são adicionados ao final desta sequência. O número de partições é especificado no momento em que um Hub de Eventos é criado e não pode ser alterado.

  • Um grupo de consumidores é uma vista de um Hub de Eventos inteiro. Os grupos de consumidores permitem que várias aplicações de consumo tenham uma vista separada do fluxo de eventos e leiam o fluxo de forma independente ao seu próprio ritmo e da sua própria posição. Pode haver, no máximo, 5 leitores simultâneos numa partição por grupo de consumidores; no entanto, recomenda-se que exista apenas um consumidor ativo para uma determinada partição e emparelhamento de grupos de consumidores. Cada leitor ativo recebe todos os eventos da sua partição; se existirem vários leitores na mesma partição, receberão eventos duplicados.

Para obter mais conceitos e uma discussão mais aprofundada, consulte: Funcionalidades dos Hubs de Eventos. Além disso, os conceitos para AMQP estão bem documentados na Versão 1.0 do PROTOCOLO DE Mensagens Avançadas do OASIS (AMQP).

Segurança de threads

Não garantimos que o EventHubProducerClient ou EventHubConsumerClient sejam seguros para threads. Não recomendamos a reutilização destas instâncias entre threads. Cabe à aplicação em execução utilizar estas classes de forma segura para threads.

O tipo EventDataBatch de modelo de dados não é seguro para threads. Não deve ser partilhado entre threads nem utilizado em simultâneo com métodos de cliente.

Exemplos

As secções seguintes fornecem vários fragmentos de código que abrangem algumas das tarefas mais comuns dos Hubs de Eventos, incluindo:

Inspecionar um Hub de Eventos

Obtenha os IDs de partição de um Hub de Eventos.

import os
from azure.eventhub import EventHubConsumerClient

CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

consumer_client = EventHubConsumerClient.from_connection_string(
    conn_str=CONNECTION_STR,
    consumer_group='$Default',
    eventhub_name=EVENTHUB_NAME,
)

with consumer_client:
    pass # consumer_client is now ready to be used.

Publicar eventos num Hub de Eventos

Utilize o create_batch método ativado EventHubProducerClient para criar um EventDataBatch objeto que pode ser enviado com o send_batch método . Os eventos podem ser adicionados ao através do EventDataBatchadd método até que o limite máximo de tamanho do lote em bytes tenha sido atingido.

def send_event_data_batch(producer):
    # Without specifying partition_id or partition_key
    # the events will be distributed to available partitions via round-robin.
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData('Single message'))
    producer.send_batch(event_data_batch)

Consumir eventos a partir de um Hub de Eventos

Existem várias formas de consumir eventos a partir de um EventHub. Para simplesmente acionar uma chamada de retorno quando um evento é recebido, o EventHubConsumerClient.receive método será utilizado da seguinte forma:

import logging
from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

def on_event(partition_context, event):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    partition_context.update_checkpoint(event)

with client:
    client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
    # receive events from specified partition:
    # client.receive(on_event=on_event, partition_id='0')

Consumir eventos de um Hub de Eventos em lotes

Enquanto o exemplo acima aciona a chamada de retorno para cada mensagem à medida que é recebida, o exemplo seguinte aciona a chamada de retorno num lote de eventos, tentando receber um número de cada vez.

import logging
from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

def on_event_batch(partition_context, events):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    partition_context.update_checkpoint()

with client:
    client.receive_batch(
        on_event_batch=on_event_batch,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
    # receive events from specified partition:
    # client.receive_batch(on_event_batch=on_event_batch, partition_id='0')

Publicar eventos num Hub de Eventos de forma assíncrona

Utilize o create_batch método ativado EventHubProducer para criar um EventDataBatch objeto que pode ser enviado com o send_batch método . Os eventos podem ser adicionados ao através do EventDataBatchadd método até que o limite máximo de tamanho do lote em bytes tenha sido atingido.

import asyncio
from azure.eventhub.aio import EventHubProducerClient  # The package name suffixed with ".aio" for async
from azure.eventhub import EventData

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

async def create_batch(client):
    event_data_batch = await client.create_batch()
    can_add = True
    while can_add:
        try:
            event_data_batch.add(EventData('Message inside EventBatchData'))
        except ValueError:
            can_add = False  # EventDataBatch object reaches max_size.
    return event_data_batch

async def send():
    client = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name)
    batch_data = await create_batch(client)
    async with client:
        await client.send_batch(batch_data)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(send())

Consumir eventos de um Hub de Eventos de forma assíncrona

Este SDK suporta código síncrono e assíncio. Para receber como demonstrado nos exemplos acima, mas dentro do aio, seria necessário o seguinte:

import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event(partition_context, event):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    await partition_context.update_checkpoint(event)

async def receive():
    client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
    async with client:
        await client.receive(
            on_event=on_event,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
        # receive events from specified partition:
        # await client.receive(on_event=on_event, partition_id='0')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive())

Consumir eventos de um Hub de Eventos em lotes de forma assíncrona

Todas as funções síncronas também são suportadas no aio. Conforme demonstrado acima para o recibo de lote síncrono, pode-se realizar o mesmo no assíncio da seguinte forma:

import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event_batch(partition_context, events):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    await partition_context.update_checkpoint()

async def receive_batch():
    client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
    async with client:
        await client.receive_batch(
            on_event_batch=on_event_batch,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
        # receive events from specified partition:
        # await client.receive_batch(on_event_batch=on_event_batch, partition_id='0')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive_batch())

Consumir eventos e guardar pontos de verificação com um arquivo de pontos de verificação

EventHubConsumerClient é uma construção de alto nível que lhe permite receber eventos de várias partições ao mesmo tempo e o balanceamento de carga com outros consumidores através do mesmo Hub de Eventos e grupo de consumidores.

Isto também permite ao utilizador controlar o progresso quando os eventos são processados através de pontos de verificação.

Um ponto de verificação destina-se a representar o último evento processado com êxito pelo utilizador a partir de uma partição específica de um grupo de consumidores numa instância do Hub de Eventos. A EventHubConsumerClient utiliza uma instância de para atualizar pontos de CheckpointStore verificação e para armazenar as informações relevantes necessárias pelo algoritmo de balanceamento de carga.

Pesquise pypi com o prefixo azure-eventhub-checkpointstore para encontrar pacotes que o suportem e utilize a CheckpointStore implementação de um desses pacotes. Tenha em atenção que são fornecidas bibliotecas de sincronização e assíncronas.

No exemplo abaixo, criamos uma instância de EventHubConsumerClient e utilizamos um BlobCheckpointStore. Tem de criar uma conta de Armazenamento do Azure e um Contentor de Blobs para executar o código.

Armazenamento de Blobs do Azure Assíncrona do Arquivo de Pontos de Verificação e Armazenamento de Blobs do Azure a Sincronização do Arquivo de Pontos de Verificação são uma das CheckpointStore implementações que fornecemos que se aplica Armazenamento de Blobs do Azure como o arquivo persistente.

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING FOR THE STORAGE >>'
container_name = '<<NAME OF THE BLOB CONTAINER>>'

async def on_event(partition_context, event):
    # do something
    await partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

async def receive(client):
    await client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )

async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,  # For load balancing and checkpoint. Leave None for no load balancing
    )
    async with client:
        await receive(client)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Utilizar o EventHubConsumerClient para trabalhar com Hub IoT

Também pode utilizar EventHubConsumerClient para trabalhar com Hub IoT. Isto é útil para receber dados telemétricos de Hub IoT do EventHub ligado. O cadeia de ligação associado não terá afirmações de envio, pelo que o envio de eventos não é possível.

Tenha em atenção que o cadeia de ligação tem de ser para um ponto final compatível com o Hub de Eventos, por exemplo, "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/; SharedAccessKeyName=my-SA-name; SharedAccessKey=my-SA-key; EntityPath=my-iot-hub-name"

Existem duas formas de obter o ponto final compatível com os Hubs de Eventos:

  • Obtenha manualmente os "pontos finais incorporados" do Hub IoT no Portal do Azure e receba-os do mesmo.
from azure.eventhub import EventHubConsumerClient

connection_str = 'Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name'
consumer_group = '<< CONSUMER GROUP >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group)

partition_ids = client.get_partition_ids()

Resolução de problemas

Veja o azure-eventhubsguia de resolução de problemas para obter detalhes sobre como diagnosticar vários cenários de falha.

Passos seguintes

Mais código de exemplo

Veja o diretório de exemplos para obter exemplos detalhados sobre como utilizar esta biblioteca para enviar e receber eventos de/para os Hubs de Eventos.

Documentação

A documentação de referência está disponível aqui.

Registo de Esquemas e Codificador Avro

O SDK do EventHubs integra-se corretamente com o Serviço de Registo de Esquemas e o Avro. Para obter mais informações, veja SDK do Registo de Esquemas e SDK do Codificador avro do Registo de Esquemas.

Suporte de Transporte e Retrocompatibilidade do Python Puro

A biblioteca de cliente Hubs de Eventos do Azure baseia-se agora numa implementação amQP de Python pura. uAMQP foi removido como dependência necessária.

Para utilizar uAMQP como transporte subjacente:

  1. Instale uamqp com pip.
$ pip install uamqp 
  1. Passe uamqp_transport=True durante a construção do cliente.
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

client = EventHubProducerClient.from_connection_string(
    connection_str, eventhub_name=eventhub_name, uamqp_transport=True
)
client = EventHubConsumerClient.from_connection_string(
    connection_str, consumer_group, eventhub_name=eventhub_name, uamqp_transport=True
)

Nota: o message atributo em EventData/EventDataBatch, que expôs anteriormente o uamqp.Message, foi preterido. Os objetos "Legados" devolvidos por EventData.message/EventDataBatch.message foram introduzidos para ajudar a facilitar a transição.

Criar roda uAMQP a partir da origem

Se o uAMQP se destinar a ser utilizado como a implementação do protocolo AMQP subjacente para azure-eventhub, as rodas uAMQP podem ser encontradas para a maioria dos principais sistemas operativos.

Se pretender utilizar uAMQP e estiver a executar numa plataforma para a qual não são fornecidas rodas uAMQP, siga a documentação de orientação de instalação uAMQP para instalar a partir da origem.

Enviar Comentários

Se encontrar erros ou tiver sugestões, submeta um problema na secção Problemas do projeto.

Contribuir

Agradecemos todas as contribuições e sugestões para este projeto. A maioria das contribuições requerem que celebre um Contrato de Licença de Contribuição (CLA) no qual se declare que tem o direito de conceder e que, na verdade, concede-nos os direitos para utilizar a sua contribuição. Para mais detalhes, visite https://cla.microsoft.com.

Quando submete um pedido Pull, um bot do CLA determina automaticamente se tem de fornecer um CLA e decorar o PR de forma adequada (por exemplo, etiqueta, comentário). Só tem de seguir as instruções fornecidas pelo bot. Apenas terá de fazer isto uma vez em todos os repositórios com o nosso CLA.

Este projeto adotou o Microsoft Open Source Code of Conduct (Código de Conduta do Microsoft Open Source). Para obter mais informações, consulte as FAQ do Código de Conduta ou o contacto opencode@microsoft.com com quaisquer perguntas ou comentários adicionais.

Impressões