Compartilhar via


Hubs de Eventos do Azure biblioteca de clientes para Python – versão 5.11.5

Hubs de Eventos do Azure é um serviço de publicação-assinatura altamente escalonável que pode ingerir milhões de eventos por segundo e transmiti-los para vários consumidores. Isso permite processar e analisar as grandes quantidades de dados produzidos por seus dispositivos e aplicativos conectados. Depois que os Hubs de Eventos coletarem os dados, você poderá recuperá-los, transformá-los e armazená-los usando qualquer provedor de análise em tempo real ou com adaptadores de armazenamento/envio em lote. Se você quiser saber mais sobre Hubs de Eventos do Azure, talvez queira examinar: O que são Hubs de Eventos?

A biblioteca de clientes dos Hubs de Eventos do Azure permite publicar e consumir eventos dos Hubs de Eventos do Azure e pode ser usada para:

  • Emitir telemetria sobre seu aplicativo para fins de diagnóstico e business intelligence.
  • Publicar fatos sobre o estado do seu aplicativo, que as partes interessadas podem observar e usar como gatilho para tomar medidas.
  • Observar operações interessantes e interações que ocorrem na sua empresa ou em outro ecossistema, permitindo que sistemas flexíveis interajam sem a necessidade de vinculá-los.
  • Receber eventos de um ou mais editores, transformá-los para atender melhor às necessidades do ecossistema e publicar os eventos transformados em um novo fluxo para que os consumidores observem.

Código-fonte | Pacote (PyPi) | Pacote (Conda) | Documentação | de referência da APIDocumentação do produto | Amostras

Introdução

Pré-requisitos

  • Python 3.7 ou posterior.

  • Assinatura do Microsoft Azure: Para usar os serviços do Azure, incluindo Hubs de Eventos do Azure, você precisará de uma assinatura. Se você não tiver uma conta existente do Azure, poderá se inscrever para uma avaliação gratuita ou usar os benefícios do assinante do MSDN ao criar uma conta.

  • Namespace dos Hubs de Eventos com um Hub de Eventos: Para interagir com Hubs de Eventos do Azure, você também precisará ter um namespace e um Hub de Eventos disponíveis. Se você não estiver familiarizado com a criação de recursos do Azure, convém seguir o guia passo a passo para criar um Hub de Eventos usando o portal do Azure. Lá, você também pode encontrar instruções detalhadas para usar os modelos da CLI do Azure, Azure PowerShell ou ARM (Azure Resource Manager) para criar um Hub de Eventos.

Instalar o pacote

Instale a biblioteca de clientes do Hubs de Eventos do Azure para Python com pip:

$ pip install azure-eventhub

Autenticar o cliente

A interação com os Hubs de Eventos começa com uma instância da classe EventHubConsumerClient ou EventHubProducerClient. Você precisa do nome do host, da credencial SAS/AAD e do nome do hub de eventos ou de um cadeia de conexão para instanciar o objeto cliente.

Crie um cliente do cadeia de conexão:

Para que a biblioteca de clientes dos Hubs de Eventos interaja com um Hub de Eventos, o meio mais fácil é usar um cadeia de conexão, que é criado automaticamente ao criar um namespace dos Hubs de Eventos. Se você não estiver familiarizado com políticas de acesso compartilhado no Azure, convém seguir o guia passo a passo para obter uma cadeia de conexão dos Hubs de Eventos.

  • O from_connection_string método leva o cadeia de conexão do formulário Endpoint=sb://<yournamespace>.servicebus.windows.net/;SharedAccessKeyName=<yoursharedaccesskeyname>;SharedAccessKey=<yoursharedaccesskey> e do nome da entidade para sua instância do Hub de Eventos. Você pode obter a cadeia de conexão no portal do Azure.

Crie um cliente usando a biblioteca azure-identity:

Como alternativa, é possível usar um objeto Credential para autenticar por meio do AAD com o pacote azure-identity.

  • Esse construtor demonstrado no exemplo vinculado acima usa o nome do host e o nome da entidade da instância e da credencial do Hub de Eventos que implementa o protocolo TokenCredential . Há implementações do TokenCredential protocolo disponível no pacote azure-identity. O nome do host é do formato <yournamespace.servicebus.windows.net>.
  • Para usar os tipos de credencial fornecidos pelo azure-identity, instale o pacote: pip install azure-identity
  • Além disso, para usar a API assíncrona, primeiro você deve instalar um transporte assíncrono, como aiohttp: pip install aiohttp
  • Ao usar o Azure Active Directory, sua entidade de segurança deve receber uma função que permita o acesso aos Hubs de Eventos, como a função proprietário de dados do Hubs de Eventos do Azure. Para obter mais informações sobre como usar a autorização do Azure Active Directory com hubs de eventos, consulte a documentação associada.

Principais conceitos

  • Um EventHubProducerClient é uma fonte de dados de telemetria, diagnóstico informações, logs de uso ou outros dados de log, como parte de uma solução de dispositivo inserido, um aplicativo de dispositivo móvel, um título de jogo em execução em um console ou outro dispositivo, alguma solução de negócios baseada em cliente ou servidor ou um site.

  • Um EventHubConsumerClient pega essas informações do Hub de Eventos e as processa. O processamento pode envolver agregação, computação complexa e filtragem. O processamento também pode envolver a distribuição ou o armazenamento das informações de maneira bruta ou transformada. Os consumidores do Hub de Eventos geralmente são partes de infraestrutura de plataforma robustas e de alta escala com recursos de análise integrados, como Azure Stream Analytics, Apache Spark ou Apache Storm.

  • Uma partição é uma sequência ordenada de eventos que é mantida em um Hub de Eventos. Os Hubs de Eventos do Azure fornecem streaming de mensagens por meio de um padrão de consumidor particionado no qual cada consumidor lê apenas um subconjunto específico, ou partição, do fluxo de mensagens. À medida que novos eventos chegam, eles são adicionados ao final dessa sequência. O número de partições é especificado no momento em que um Hub de Eventos é criado e não pode ser alterado.

  • Um grupo de consumidores é uma exibição de um Hub de Eventos inteiro. Os grupos de consumidores habilitam vários aplicativos de consumo para que cada um tenha um modo de exibição do fluxo de evento separado e para ler o fluxo de forma independente em seu próprio ritmo e com seus próprio deslocamentos. Pode haver no máximo cinco leitores simultâneos em uma partição por grupo de consumidores; no entanto, é recomendável que haja apenas um consumidor ativo para um determinado emparelhamento de partição e grupo de consumidores. Cada leitor ativo recebe todos os eventos de sua partição; se houver vários leitores na mesma partição, eles receberão eventos duplicados.

Para obter mais conceitos e uma discussão mais profunda, consulte: Recursos dos Hubs de Eventos. Além disso, os conceitos para AMQP estão bem documentados no OASIS ADVANCED Messaging Queuing Protocol (AMQP) versão 1.0.

Acesso thread-safe

Não garantimos que EventHubProducerClient ou EventHubConsumerClient sejam thread-safe. Não recomendamos reutilizações dessas instâncias entre threads. Cabe ao aplicativo em execução usar essas classes de maneira thread-safe.

O tipo EventDataBatch de modelo de dados não é thread-safe. Ele não deve ser compartilhado entre threads nem usado simultaneamente com métodos de cliente.

Exemplos

As seções a seguir fornecem vários snippets de código que abrangem algumas das tarefas mais comuns dos Hubs de Eventos, incluindo:

Inspecione um Hub de Eventos

Obtenha as IDs de partição de um Hub de Eventos.

import os
from azure.eventhub import EventHubConsumerClient

CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

consumer_client = EventHubConsumerClient.from_connection_string(
    conn_str=CONNECTION_STR,
    consumer_group='$Default',
    eventhub_name=EVENTHUB_NAME,
)

with consumer_client:
    pass # consumer_client is now ready to be used.

Envio de eventos para um Hub de Eventos

Use o create_batch método em EventHubProducerClient para criar um EventDataBatch objeto que pode ser enviado usando o send_batch método . Os eventos podem ser adicionados ao EventDataBatch usando o add método até que o limite máximo de tamanho do lote em bytes seja atingido.

def send_event_data_batch(producer):
    # Without specifying partition_id or partition_key
    # the events will be distributed to available partitions via round-robin.
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData('Single message'))
    producer.send_batch(event_data_batch)

Consumir eventos de um Hub de Eventos

Há várias maneiras de consumir eventos de um EventHub. Para simplesmente disparar um retorno de chamada quando um evento for recebido, o EventHubConsumerClient.receive método será usado da seguinte maneira:

import logging
from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

def on_event(partition_context, event):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    partition_context.update_checkpoint(event)

with client:
    client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
    # receive events from specified partition:
    # client.receive(on_event=on_event, partition_id='0')

Consumir eventos de um Hub de Eventos em lotes

Enquanto o exemplo acima dispara o retorno de chamada para cada mensagem conforme ela é recebida, o exemplo a seguir dispara o retorno de chamada em um lote de eventos, tentando receber um número de cada vez.

import logging
from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

def on_event_batch(partition_context, events):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    partition_context.update_checkpoint()

with client:
    client.receive_batch(
        on_event_batch=on_event_batch,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
    # receive events from specified partition:
    # client.receive_batch(on_event_batch=on_event_batch, partition_id='0')

Publicar eventos em um Hub de Eventos de forma assíncrona

Use o create_batch método em EventHubProducer para criar um EventDataBatch objeto que pode ser enviado usando o send_batch método . Os eventos podem ser adicionados ao EventDataBatch usando o add método até que o limite máximo de tamanho do lote em bytes seja atingido.

import asyncio
from azure.eventhub.aio import EventHubProducerClient  # The package name suffixed with ".aio" for async
from azure.eventhub import EventData

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

async def create_batch(client):
    event_data_batch = await client.create_batch()
    can_add = True
    while can_add:
        try:
            event_data_batch.add(EventData('Message inside EventBatchData'))
        except ValueError:
            can_add = False  # EventDataBatch object reaches max_size.
    return event_data_batch

async def send():
    client = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name)
    batch_data = await create_batch(client)
    async with client:
        await client.send_batch(batch_data)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(send())

Consumir eventos de um Hub de Eventos de forma assíncrona

Esse SDK dá suporte a código síncrono e assíncrono. Para receber conforme demonstrado nos exemplos acima, mas dentro do aio, seria necessário o seguinte:

import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event(partition_context, event):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    await partition_context.update_checkpoint(event)

async def receive():
    client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
    async with client:
        await client.receive(
            on_event=on_event,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
        # receive events from specified partition:
        # await client.receive(on_event=on_event, partition_id='0')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive())

Consumir eventos de um Hub de Eventos em lotes de forma assíncrona

Todas as funções síncronas também têm suporte no aio. Conforme demonstrado acima para o recebimento em lote síncrono, é possível realizar o mesmo dentro do asyncio da seguinte maneira:

import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event_batch(partition_context, events):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    await partition_context.update_checkpoint()

async def receive_batch():
    client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
    async with client:
        await client.receive_batch(
            on_event_batch=on_event_batch,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
        # receive events from specified partition:
        # await client.receive_batch(on_event_batch=on_event_batch, partition_id='0')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive_batch())

Consumir eventos e salvar pontos de verificação usando um repositório de ponto de verificação

EventHubConsumerClient é um constructo de alto nível que permite que você receba eventos de várias partições ao mesmo tempo e balancee a carga com outros consumidores usando o mesmo Hub de Eventos e grupo de consumidores.

Isso também permite que o usuário acompanhe o progresso quando os eventos são processados usando pontos de verificação.

Um ponto de verificação deve representar o último evento processado com êxito pelo usuário de uma partição específica de um grupo de consumidores em uma instância do Hub de Eventos. O EventHubConsumerClient usa uma instância do para atualizar pontos de CheckpointStore verificação e armazenar as informações relevantes exigidas pelo algoritmo de balanceamento de carga.

Pesquise pypi com o prefixo azure-eventhub-checkpointstore para localizar pacotes que dão suporte a isso e usem a CheckpointStore implementação de um desses pacotes. Observe que as bibliotecas de sincronização e assíncrona são fornecidas.

No exemplo abaixo, criamos uma instância de EventHubConsumerClient e usamos um BlobCheckpointStore. Você precisa criar uma conta de Armazenamento do Azure e um Contêiner de Blobs para executar o código.

Armazenamento de Blobs do Azure o Repositório de Ponto de Verificação Assíncrono e a Sincronização do Repositório de Ponto de Verificação Armazenamento de Blobs do Azure são uma das CheckpointStore implementações que fornecemos que se aplica Armazenamento de Blobs do Azure como o repositório persistente.

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING FOR THE STORAGE >>'
container_name = '<<NAME OF THE BLOB CONTAINER>>'

async def on_event(partition_context, event):
    # do something
    await partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

async def receive(client):
    await client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )

async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,  # For load balancing and checkpoint. Leave None for no load balancing
    )
    async with client:
        await receive(client)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Use EventHubConsumerClient para trabalhar com Hub IoT

Você também pode usar EventHubConsumerClient para trabalhar com Hub IoT. Isso é útil para receber dados de telemetria de Hub IoT do EventHub vinculado. O cadeia de conexão associado não terá declarações de envio, portanto, o envio de eventos não é possível.

Observe que o cadeia de conexão precisa ser para um ponto de extremidade compatível com o Hub de Eventos, por exemplo, "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/; SharedAccessKeyName=my-SA-name; SharedAccessKey=my-SA-key; EntityPath=my-iot-hub-name"

Há duas maneiras de obter o ponto de extremidade compatível com os Hubs de Eventos:

  • Obtenha manualmente os "pontos de extremidade internos" do Hub IoT no Portal do Azure e receba dele.
from azure.eventhub import EventHubConsumerClient

connection_str = 'Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name'
consumer_group = '<< CONSUMER GROUP >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group)

partition_ids = client.get_partition_ids()

Solução de problemas

Consulte o azure-eventhubsguia de solução de problemas para obter detalhes sobre como diagnosticar vários cenários de falha.

Próximas etapas

Mais códigos de exemplo

Examine o diretório de exemplos para obter exemplos detalhados de como usar essa biblioteca para enviar e receber eventos de/para Hubs de Eventos.

Documentação

A documentação de referência está disponível aqui.

Registro de esquema e codificador Avro

O SDK do EventHubs se integra bem ao serviço do Registro de Esquema e ao Avro. Para obter mais informações, consulte SDK do Registro de Esquema e SDK do Codificador Avro do Registro de Esquema.

Suporte à compatibilidade com versões anteriores e transporte AMQP do Python puro

A biblioteca de clientes Hubs de Eventos do Azure agora é baseada em uma implementação amqp de Python pura. uAMQP foi removido como dependência necessária.

Para usar uAMQP como o transporte subjacente:

  1. Instale uamqp com pip.
$ pip install uamqp 
  1. Passe uamqp_transport=True durante a construção do cliente.
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

client = EventHubProducerClient.from_connection_string(
    connection_str, eventhub_name=eventhub_name, uamqp_transport=True
)
client = EventHubConsumerClient.from_connection_string(
    connection_str, consumer_group, eventhub_name=eventhub_name, uamqp_transport=True
)

Observação: o message atributo em EventData/EventDataBatch, que anteriormente expôs o uamqp.Message, foi preterido. Os objetos "Legacy" retornados por EventData.message/EventDataBatch.message foram introduzidos para ajudar a facilitar a transição.

Compilando a roda uAMQP da origem

Se uAMQP for destinado a ser usado como a implementação de protocolo AMQP subjacente para azure-eventhub, as rodas uAMQP poderão ser encontradas para a maioria dos principais sistemas operacionais.

Se você pretende usar uAMQP e está executando em uma plataforma para a qual as rodas uAMQP não são fornecidas, siga as diretrizes de instalação do uAMQP para instalar a partir da origem.

Forneça comentários

Se você encontrar bugs ou tiver sugestões, registre um problema na seção Problemas do projeto.

Contribuição

Este projeto aceita contribuições e sugestões. A maioria das contribuições exige que você concorde com um CLA (Contrato de Licença do Colaborador) declarando que você tem o direito de nos conceder, e de fato concede, os direitos de usar sua contribuição. Para obter detalhes, visite https://cla.microsoft.com.

Quando você envia uma solicitação de pull, um bot do CLA determina automaticamente se você precisa fornecer um CLA e preencher a PR corretamente (por exemplo, rótulo, comentário). Basta seguir as instruções fornecidas pelo bot. Você só precisará fazer isso uma vez em todos os repositórios que usam nosso CLA.

Este projeto adotou o Código de Conduta de Software Livre da Microsoft. Para obter mais informações, confira as Perguntas frequentes sobre o Código de Conduta ou contate opencode@microsoft.com para enviar outras perguntas ou comentários.

Impressões