EventHubConsumerClient Classe
A classe EventHubConsumerClient define uma interface de alto nível para receber eventos do serviço Hubs de Eventos do Azure.
A meta main do EventHubConsumerClient é receber eventos de todas as partições de um EventHub com balanceamento de carga e ponto de verificação.
Quando várias instâncias EventHubConsumerClient estiverem em execução no mesmo hub de eventos, grupo de consumidores e local de ponto de verificação, as partições serão distribuídas uniformemente entre elas.
Para habilitar o balanceamento de carga e os pontos de verificação persistentes, checkpoint_store deve ser definido ao criar o EventHubConsumerClient. Se um repositório de ponto de verificação não for fornecido, o ponto de verificação será mantido internamente na memória.
Um EventHubConsumerClient também pode receber de uma partição específica quando você chama seu método receive() ou receive_batch() e especifica o partition_id. O balanceamento de carga não funcionará no modo de partição única. Mas os usuários ainda poderão salvar pontos de verificação se o checkpoint_store estiver definido.
- Herança
-
azure.eventhub._client_base.ClientBaseEventHubConsumerClient
Construtor
EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)
Parâmetros
- fully_qualified_namespace
- str
O nome do host totalmente qualificado para o namespace dos Hubs de Eventos. O formato do namespace é: .servicebus.windows.net.
- credential
- TokenCredential ou AzureSasCredential ou AzureNamedKeyCredential
O objeto de credencial usado para autenticação que implementa uma interface específica para obter tokens. Ele aceita EventHubSharedKeyCredential, ou objetos de credencial gerados pela biblioteca azure-identity e objetos que implementam o método *get_token(self, scopes).
- logging_enable
- bool
Se os logs de rastreamento de rede devem ser gerados para o agente. O padrão é False.
- auth_timeout
- float
O tempo em segundos para aguardar que um token seja autorizado pelo serviço. O valor padrão é 60 segundos. Se definido como 0, nenhum tempo limite será imposto do cliente.
- user_agent
- str
Se especificado, isso será adicionado na frente da cadeia de caracteres do agente do usuário.
- retry_total
- int
O número total de tentativas de refazer uma operação com falha quando ocorre um erro. O valor padrão é 3. O contexto de retry_total no recebimento é especial: o método receive é implementado por um método de recebimento interno de chamada de loop while em cada iteração. No caso de recebimento , retry_total especifica os números de repetição após o erro gerado pelo método de recebimento interno no loop while. Se as tentativas de repetição forem esgotadas, o on_error retorno de chamada será chamado (se fornecido) com as informações de erro. O consumidor de partição interna com falha será fechado (on_partition_close será chamado se fornecido) e o novo consumidor de partição interna será criado (on_partition_initialize será chamado se fornecido) para retomar o recebimento.
- retry_backoff_factor
- float
Um fator de retirada a ser aplicado entre tentativas após a segunda tentativa (a maioria dos erros é resolvida imediatamente por uma segunda tentativa sem atraso). No modo fixo, a política de repetição sempre será suspensa para {fator de retirada}. No modo 'exponencial', a política de repetição será suspensa para: {fator de retirada} * (2 ** ({número de tentativas totais} - 1)) segundos. Se o backoff_factor for 0,1, a repetição será suspensa para [0.0s, 0.2s, 0.4s, ...] entre as repetições. O valor padrão é 0,8.
- retry_backoff_max
- float
O tempo máximo de retirada. O valor padrão é 120 segundos (2 minutos).
- retry_mode
- str
O comportamento de atraso entre tentativas de repetição. Os valores com suporte são 'fixo' ou 'exponencial', em que o padrão é 'exponencial'.
- idle_timeout
- float
Tempo limite, em segundos, após o qual esse cliente fechará a conexão subjacente se não houver mais atividade. Por padrão, o valor é None, o que significa que o cliente não será desligado devido à inatividade, a menos que seja iniciado pelo serviço.
- transport_type
- TransportType
O tipo de protocolo de transporte que será usado para se comunicar com o serviço de Hubs de Eventos. O padrão é TransportType.Amqp , caso em que a porta 5671 é usada. Se a porta 5671 não estiver disponível/bloqueada no ambiente de rede, TransportType.AmqpOverWebsocket poderá ser usado, em vez disso, que usa a porta 443 para comunicação.
Configurações de proxy HTTP. Deve ser um dicionário com as seguintes chaves: 'proxy_hostname' (valor str) e 'proxy_port' (valor int). Além disso, as seguintes chaves também podem estar presentes: 'username', 'password'.
- checkpoint_store
- CheckpointStore ou None
Um gerenciador que armazena os dados de balanceamento de carga e ponto de verificação de partição ao receber eventos. O repositório de ponto de verificação será usado em ambos os casos de recebimento de todas as partições ou de uma única partição. No último caso, o balanceamento de carga não se aplica. Se um repositório de ponto de verificação não for fornecido, o ponto de verificação será mantido internamente na memória e a instância EventHubConsumerClient receberá eventos sem balanceamento de carga.
- load_balancing_interval
- float
Quando o balanceamento de carga é inicial. Esse é o intervalo, em segundos, entre duas avaliações de balanceamento de carga. O padrão é 30 segundos.
- partition_ownership_expiration_interval
- float
Uma propriedade de partição expirará após esse número de segundos. Cada avaliação de balanceamento de carga estenderá automaticamente o tempo de expiração da propriedade. O padrão é 6 * load_balancing_interval, ou seja, 180 segundos ao usar o load_balancing_interval padrão de 30 segundos.
- load_balancing_strategy
- str ou LoadBalancingStrategy
Quando o balanceamento de carga iniciar, ele usará essa estratégia para reivindicar e equilibrar a propriedade da partição. Use "greedy" ou LoadBalancingStrategy.GREEDY para a estratégia greedy, que, para cada avaliação de balanceamento de carga, pegará quantas partições não reclamadas forem necessárias para equilibrar a carga. Use "balanced" ou LoadBalancingStrategy.BALANCED para a estratégia equilibrada, que, para cada avaliação de balanceamento de carga, declara apenas uma partição que não é reivindicada por outro EventHubConsumerClient. Se todas as partições de um EventHub forem reivindicadas por outro EventHubConsumerClient e esse cliente tiver reivindicado poucas partições, esse cliente roubará uma partição de outros clientes para cada avaliação de balanceamento de carga, independentemente da estratégia de balanceamento de carga. A estratégia greedy é usada por padrão.
O endereço do ponto de extremidade personalizado a ser usado para estabelecer uma conexão com o serviço de Hubs de Eventos, permitindo que as solicitações de rede sejam roteadas por meio de quaisquer gateways de aplicativo ou outros caminhos necessários para o ambiente de host. O padrão é None. O formato seria como "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Se a porta não for especificada no custom_endpoint_address, por padrão, a porta 443 será usada.
Caminho para o arquivo de CA_BUNDLE personalizado do certificado SSL que é usado para autenticar a identidade do ponto de extremidade de conexão. O padrão é None, caso em que certifi.where() será usado.
- uamqp_transport
- bool
Se a biblioteca uamqp deve ser usada como o transporte subjacente. O valor padrão é False e a biblioteca AMQP pura do Python será usada como o transporte subjacente.
- socket_timeout
- float
O tempo em segundos que o soquete subjacente na conexão deve aguardar ao enviar e receber dados antes de atingir o tempo limite. O valor padrão é 0,2 para TransportType.Amqp e 1 para TransportType.AmqpOverWebsocket. Se os erros eventHubsConnectionError estiverem ocorrendo devido ao tempo limite de gravação, talvez seja necessário passar um valor maior que o padrão. Isso é para cenários de uso avançado e, normalmente, o valor padrão deve ser suficiente.
Exemplos
Crie uma nova instância do EventHubConsumerClient.
import os
from azure.eventhub import EventHubConsumerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
consumer = EventHubConsumerClient(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
consumer_group='$Default',
credential=credential)
Métodos
close |
Pare de recuperar eventos do Hub de Eventos e feche a conexão e os links amqp subjacentes. |
from_connection_string |
Crie um EventHubConsumerClient de um cadeia de conexão. |
get_eventhub_properties |
Obter propriedades do Hub de Eventos. As chaves no dicionário retornado incluem:
|
get_partition_ids |
Obter IDs de partição do Hub de Eventos. |
get_partition_properties |
Obter propriedades da partição especificada. As chaves no dicionário de propriedades incluem:
|
receive |
Receber eventos de partições, com balanceamento de carga e ponto de verificação opcionais. |
receive_batch |
Receber eventos de partições, com balanceamento de carga e ponto de verificação opcionais. |
close
Pare de recuperar eventos do Hub de Eventos e feche a conexão e os links amqp subjacentes.
close() -> None
Tipo de retorno
Exemplos
Feche o cliente.
import os
import threading
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
from azure.eventhub import EventHubConsumerClient
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group="$Default",
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
logger = logging.getLogger("azure.eventhub")
def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
# The 'receive' method is a blocking call, it can be executed in a thread for
# non-blocking behavior, and combined with the 'close' method.
worker = threading.Thread(
target=consumer.receive,
kwargs={
"on_event": on_event,
"starting_position": "-1", # "-1" is from the beginning of the partition.
}
)
worker.start()
time.sleep(10) # Keep receiving for 10s then close.
# Close down the consumer handler explicitly.
consumer.close()
from_connection_string
Crie um EventHubConsumerClient de um cadeia de conexão.
from_connection_string(conn_str: str, consumer_group: str, **kwargs: Any) -> EventHubConsumerClient
Parâmetros
- eventhub_name
- str
O caminho do Hub de Eventos específico ao qual conectar o cliente.
- logging_enable
- bool
Se os logs de rastreamento de rede devem ser gerados para o agente. O padrão é False.
- auth_timeout
- float
O tempo em segundos para aguardar que um token seja autorizado pelo serviço. O valor padrão é 60 segundos. Se definido como 0, nenhum tempo limite será imposto do cliente.
- user_agent
- str
Se especificado, isso será adicionado na frente da cadeia de caracteres do agente do usuário.
- retry_total
- int
O número total de tentativas de refazer uma operação com falha quando ocorre um erro. O valor padrão é 3. O contexto de retry_total no recebimento é especial: o método receive é implementado por um método de recebimento interno de chamada de loop while em cada iteração. No caso de recebimento , retry_total especifica os números de repetição após o erro gerado pelo método de recebimento interno no loop while. Se as tentativas de repetição forem esgotadas, o on_error retorno de chamada será chamado (se fornecido) com as informações de erro. O consumidor de partição interna com falha será fechado (on_partition_close será chamado se fornecido) e o novo consumidor de partição interna será criado (on_partition_initialize será chamado se fornecido) para retomar o recebimento.
- retry_backoff_factor
- float
Um fator de retirada a ser aplicado entre tentativas após a segunda tentativa (a maioria dos erros é resolvida imediatamente por uma segunda tentativa sem atraso). No modo fixo, a política de repetição sempre será suspensa para {fator de retirada}. No modo 'exponencial', a política de repetição será suspensa para: {fator de retirada} * (2 ** ({número de tentativas totais} - 1)) segundos. Se o backoff_factor for 0,1, a repetição será suspensa para [0.0s, 0.2s, 0.4s, ...] entre as repetições. O valor padrão é 0,8.
- retry_backoff_max
- float
O tempo máximo de retirada. O valor padrão é 120 segundos (2 minutos).
- retry_mode
- str
O comportamento de atraso entre tentativas de repetição. Os valores com suporte são 'fixo' ou 'exponencial', em que o padrão é 'exponencial'.
- idle_timeout
- float
Tempo limite, em segundos, após o qual esse cliente fechará a conexão subjacente se não houver nenhuma atividade furthur. Por padrão, o valor é None, o que significa que o cliente não será desligado devido à inatividade, a menos que seja iniciado pelo serviço.
- transport_type
- TransportType
O tipo de protocolo de transporte que será usado para se comunicar com o serviço de Hubs de Eventos. O padrão é TransportType.Amqp , caso em que a porta 5671 é usada. Se a porta 5671 não estiver disponível/bloqueada no ambiente de rede, TransportType.AmqpOverWebsocket poderá ser usado, em vez disso, que usa a porta 443 para comunicação.
- http_proxy
- dict
Configurações de proxy HTTP. Deve ser um dicionário com as seguintes chaves: 'proxy_hostname' (valor str) e 'proxy_port' (valor int). Além disso, as seguintes chaves também podem estar presentes: 'username', 'password'.
- checkpoint_store
- CheckpointStore ou None
Um gerenciador que armazena os dados de balanceamento de carga e ponto de verificação de partição ao receber eventos. O repositório de ponto de verificação será usado em ambos os casos de recebimento de todas as partições ou de uma única partição. No último caso, o balanceamento de carga não se aplica. Se um repositório de ponto de verificação não for fornecido, o ponto de verificação será mantido internamente na memória e a instância EventHubConsumerClient receberá eventos sem balanceamento de carga.
- load_balancing_interval
- float
Quando o balanceamento de carga é inicial. Esse é o intervalo, em segundos, entre duas avaliações de balanceamento de carga. O padrão é 10 segundos.
- partition_ownership_expiration_interval
- float
Uma propriedade de partição expirará após esse número de segundos. Cada avaliação de balanceamento de carga estenderá automaticamente o tempo de expiração da propriedade. O padrão é 6 * load_balancing_interval, ou seja, 60 segundos ao usar o load_balancing_interval padrão de 30 segundos.
- load_balancing_strategy
- str ou LoadBalancingStrategy
Quando o balanceamento de carga iniciar, ele usará essa estratégia para reivindicar e equilibrar a propriedade da partição. Use "greedy" ou LoadBalancingStrategy.GREEDY para a estratégia greedy, que, para cada avaliação de balanceamento de carga, pegará quantas partições não reclamadas forem necessárias para equilibrar a carga. Use "balanced" ou LoadBalancingStrategy.BALANCED para a estratégia equilibrada, que, para cada avaliação de balanceamento de carga, declara apenas uma partição que não é reivindicada por outro EventHubConsumerClient. Se todas as partições de um EventHub forem reivindicadas por outro EventHubConsumerClient e esse cliente tiver reivindicado poucas partições, esse cliente roubará uma partição de outros clientes para cada avaliação de balanceamento de carga, independentemente da estratégia de balanceamento de carga. A estratégia greedy é usada por padrão.
O endereço do ponto de extremidade personalizado a ser usado para estabelecer uma conexão com o serviço de Hubs de Eventos, permitindo que as solicitações de rede sejam roteadas por meio de quaisquer gateways de aplicativo ou outros caminhos necessários para o ambiente de host. O padrão é None. O formato seria como "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Se a porta não for especificada no custom_endpoint_address, por padrão, a porta 443 será usada.
Caminho para o arquivo de CA_BUNDLE personalizado do certificado SSL que é usado para autenticar a identidade do ponto de extremidade de conexão. O padrão é None, caso em que certifi.where() será usado.
- uamqp_transport
- bool
Se a biblioteca uamqp deve ser usada como o transporte subjacente. O valor padrão é False e a biblioteca AMQP pura do Python será usada como o transporte subjacente.
Tipo de retorno
Exemplos
Crie uma nova instância do EventHubConsumerClient do cadeia de conexão.
import os
from azure.eventhub import EventHubConsumerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_eventhub_properties
Obter propriedades do Hub de Eventos.
As chaves no dicionário retornado incluem:
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (list[str])
get_eventhub_properties() -> Dict[str, Any]
Retornos
Um dicionário que contém informações sobre o Hub de Eventos.
Tipo de retorno
Exceções
get_partition_ids
Obter IDs de partição do Hub de Eventos.
get_partition_ids() -> List[str]
Retornos
Uma lista de IDs de partição.
Tipo de retorno
Exceções
get_partition_properties
Obter propriedades da partição especificada.
As chaves no dicionário de propriedades incluem:
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (bool)
get_partition_properties(partition_id: str) -> Dict[str, Any]
Parâmetros
Retornos
Um dicionário que contém propriedades de partição.
Tipo de retorno
Exceções
receive
Receber eventos de partições, com balanceamento de carga e ponto de verificação opcionais.
receive(on_event: Callable[[PartitionContext, EventData | None], None], **kwargs: Any) -> None
Parâmetros
- on_event
- callable[PartitionContext, EventData ou None]
A função de retorno de chamada para lidar com um evento recebido. O retorno de chamada usa dois parâmetros: partition_context que contém o contexto de partição e o evento que é o evento recebido. A função de retorno de chamada deve ser definida como: on_event(partition_context, evento). Para obter informações detalhadas de contexto de partição, consulte PartitionContext.
- max_wait_time
- float
O intervalo máximo em segundos que o processador de eventos aguardará antes de chamar o retorno de chamada. Se nenhum evento for recebido dentro desse intervalo, o retorno de chamada on_event será chamado com Nenhum. Se esse valor for definido como Nenhum ou 0 (o padrão), o retorno de chamada não será chamado até que um evento seja recebido.
- partition_id
- str
Se especificado, o cliente receberá somente dessa partição. Caso contrário, o cliente receberá de todas as partições.
- owner_level
- int
A prioridade para um consumidor exclusivo. Um consumidor exclusivo será criado se owner_level estiver definido. Um consumidor com um owner_level mais alto tem maior prioridade exclusiva. O nível de proprietário também é conhecido como o 'valor de época' do consumidor.
- prefetch
- int
O número de eventos a serem pré-buscados do serviço para processamento. O padrão é 300.
- track_last_enqueued_event_properties
- bool
Indica se o consumidor deve solicitar informações sobre o último evento enfileirado em sua partição associada e acompanhar essas informações à medida que os eventos são recebidos. Quando informações sobre o último evento enfileirado de partições estiverem sendo rastreadas, cada evento recebido do serviço hubs de eventos carregará metadados sobre a partição. Isso resulta em uma pequena quantidade de consumo adicional de largura de banda de rede que geralmente é uma compensação favorável quando considerada para fazer periodicamente solicitações de propriedades de partição usando o cliente do Hub de Eventos. Ele é definido como False por padrão.
Comece a receber dessa posição de evento se não houver dados de ponto de verificação para uma partição. Os dados de ponto de verificação serão usados se disponíveis. Isso pode ser um dict com a ID da partição como a chave e a posição como o valor para partições individuais ou um único valor para todas as partições. O tipo de valor pode ser str, int ou datetime.datetime. Também há suporte para os valores "-1" para receber desde o início do fluxo e "@latest" para receber apenas novos eventos. O valor padrão é "@latest".
Determine se o starting_position determinado é inclusivo(>=) ou não (>). True para inclusive e False para exclusivo. Isso pode ser um dict com a ID de partição como a chave e bool como o valor que indica se o starting_position para uma partição específica é inclusivo ou não. Isso também pode ser um único valor bool para todos os starting_position. O valor padrão é False.
- on_error
- callable[[PartitionContext, Exception]]
A função de retorno de chamada que será chamada quando um erro for gerado durante o recebimento após o esgotamento das tentativas de repetição ou durante o processo de balanceamento de carga. O retorno de chamada usa dois parâmetros: partition_context que contém informações de partição e erro sendo a exceção. partition_context poderá ser Nenhum se o erro for gerado durante o processo de balanceamento de carga. O retorno de chamada deve ser definido como: on_error(partition_context, error). O retorno de chamada on_error também será chamado se uma exceção sem tratamento for gerada durante o retorno de chamada on_event .
- on_partition_initialize
- callable[[PartitionContext]]
A função de retorno de chamada que será chamada depois que um consumidor de uma determinada partição concluir a inicialização. Ele também seria chamado quando um novo consumidor de partição interna é criado para assumir o processo de recebimento de um consumidor de partição interna com falha e fechado. O retorno de chamada usa um único parâmetro: partition_context que contém as informações de partição. O retorno de chamada deve ser definido como: on_partition_initialize(partition_context).
- on_partition_close
- callable[[PartitionContext, CloseReason]]
A função de retorno de chamada que será chamada depois que um consumidor de uma determinada partição for fechado. Ele também seria chamado quando o erro é gerado durante o recebimento após o esgotamento das tentativas de repetição. O retorno de chamada usa dois parâmetros: partition_context que contém informações de partição e motivo para o fechamento. O retorno de chamada deve ser definido como: on_partition_close(partition_context, reason). CloseReason Consulte os vários motivos finais.
Tipo de retorno
Exemplos
Receber eventos do EventHub.
logger = logging.getLogger("azure.eventhub")
def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
with consumer:
consumer.receive(on_event=on_event)
receive_batch
Receber eventos de partições, com balanceamento de carga e ponto de verificação opcionais.
receive_batch(on_event_batch: Callable[[PartitionContext, List[EventData]], None], **kwargs: Any) -> None
Parâmetros
- on_event_batch
- callable[PartitionContext, list[EventData]]
A função de retorno de chamada para lidar com um lote de eventos recebidos. O retorno de chamada usa dois parâmetros: partition_context que contém contexto de partição e event_batch, que são os eventos recebidos. A função de retorno de chamada deve ser definida como: on_event_batch(partition_context, event_batch). event_batch poderá ser uma lista vazia se max_wait_time não for Nenhum nem 0 e nenhum evento for recebido após max_wait_time. Para obter informações detalhadas de contexto de partição, consulte PartitionContext.
- max_batch_size
- int
O número máximo de eventos em um lote passado para o retorno de chamada on_event_batch. Se o número real de eventos recebidos for maior que max_batch_size, os eventos recebidos serão divididos em lotes e chamarão o retorno de chamada para cada lote com até max_batch_size eventos.
- max_wait_time
- float
O intervalo máximo em segundos que o processador de eventos aguardará antes de chamar o retorno de chamada. Se nenhum evento for recebido nesse intervalo, o retorno de chamada on_event_batch será chamado com uma lista vazia.
- partition_id
- str
Se especificado, o cliente receberá somente dessa partição. Caso contrário, o cliente receberá de todas as partições.
- owner_level
- int
A prioridade para um consumidor exclusivo. Um consumidor exclusivo será criado se owner_level estiver definido. Um consumidor com uma owner_level mais alta tem prioridade exclusiva mais alta. O nível de proprietário também é conhecido como o "valor de época" do consumidor.
- prefetch
- int
O número de eventos a serem pré-buscados do serviço para processamento. O padrão é 300.
- track_last_enqueued_event_properties
- bool
Indica se o consumidor deve solicitar informações sobre o último evento enfileirado em sua partição associada e acompanhar essas informações à medida que os eventos são recebidos. Quando informações sobre o evento enfileirado de partições estiverem sendo controladas, cada evento recebido do serviço hubs de eventos levará metadados sobre a partição. Isso resulta em uma pequena quantidade de consumo adicional de largura de banda de rede que geralmente é uma compensação favorável quando considerada para fazer periodicamente solicitações de propriedades de partição usando o cliente do Hub de Eventos. Ele é definido como False por padrão.
Comece a receber dessa posição de evento se não houver dados de ponto de verificação para uma partição. Os dados de ponto de verificação serão usados se disponíveis. Isso pode ser um ditado com ID de partição como a chave e a posição como o valor para partições individuais ou um único valor para todas as partições. O tipo de valor pode ser str, int ou datetime.datetime. Também há suporte para os valores "-1" para recebimento do início do fluxo e "@latest" para receber apenas novos eventos. O valor padrão é "@latest".
Determine se o starting_position determinado é inclusivo(>=) ou não (>). True para inclusive e False para exclusivo. Isso pode ser um ditado com ID de partição como a chave e bool como o valor que indica se o starting_position para uma partição específica é inclusivo ou não. Isso também pode ser um único valor bool para todos os starting_position. O valor padrão é False.
- on_error
- callable[[PartitionContext, Exception]]
A função de retorno de chamada que será chamada quando um erro for gerado durante o recebimento após o esgotamento das tentativas de repetição ou durante o processo de balanceamento de carga. O retorno de chamada usa dois parâmetros: partition_context que contém informações de partição e o erro é a exceção. partition_context pode ser Nenhum se o erro for gerado durante o processo de balanceamento de carga. O retorno de chamada deve ser definido como: on_error(partition_context, error). O retorno de chamada on_error também será chamado se uma exceção sem tratamento for gerada durante o retorno de chamada on_event .
- on_partition_initialize
- callable[[PartitionContext]]
A função de retorno de chamada que será chamada depois que um consumidor de uma determinada partição concluir a inicialização. Ele também seria chamado quando um novo consumidor de partição interna é criado para assumir o processo de recebimento de um consumidor de partição interno com falha e fechado. O retorno de chamada usa um único parâmetro: partition_context que contém as informações de partição. O retorno de chamada deve ser definido como: on_partition_initialize(partition_context).
- on_partition_close
- callable[[PartitionContext, CloseReason]]
A função de retorno de chamada que será chamada depois que um consumidor de uma determinada partição for fechado. Ele também seria chamado quando o erro for gerado durante o recebimento após o esgotamento das tentativas de repetição. O retorno de chamada usa dois parâmetros: partition_context que contém informações de partição e motivo para o fechamento. O retorno de chamada deve ser definido como: on_partition_close(partition_context, reason). CloseReason Consulte os vários motivos finais.
Tipo de retorno
Exemplos
Receber eventos em lotes do EventHub.
logger = logging.getLogger("azure.eventhub")
def on_event_batch(partition_context, event_batch):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received events from partition: {}".format(partition_context.partition_id))
with consumer:
consumer.receive_batch(on_event_batch=on_event_batch)
Azure SDK for Python