Compartilhar via


Hubs de Eventos do Azure biblioteca do Repositório de Ponto de Verificação para Javascript usando Blobs de Armazenamento

Uma solução baseada no Armazenamento de Blobs do Azure para armazenar pontos de verificação e auxiliar no balanceamento de carga ao usar EventHubConsumerClient da biblioteca @azure/hubs de eventos

Código-fonte | Pacote (npm) | Documentação | de referência da APIAmostras

Introdução

Instalar o pacote

Instalar a biblioteca de Blobs do Repositório de Ponto de Verificação do Hubs de Eventos do Azure usando o npm

npm install @azure/eventhubs-checkpointstore-blob

Pré-requisitos: você deve ter uma assinatura do Azure, um namespace dos Hubs de Eventos para usar esse pacote e uma conta de armazenamento

Se você estiver usando esse pacote em um aplicativo Node.js, use Node.js 8.x ou superior.

Configurar Typescript

Os usuários do TypeScript precisam ter definições de tipo de nó instaladas:

npm install @types/node

Você também precisa habilitar compilerOptions.allowSyntheticDefaultImports em seu tsconfig.json. Observe que, se você tiver habilitado compilerOptions.esModuleInterop, allowSyntheticDefaultImports será habilitado por padrão. Confira o manual de opções do compilador do TypeScript para obter mais informações.

Principais conceitos

  • Escala: criar vários consumidores, com cada consumidor assumindo a propriedade de leitura de algumas partições de Hubs de eventos.

  • Balanceamento de carga: Os aplicativos que dão suporte ao balanceamento de carga consistem em uma ou mais instâncias das EventHubConsumerClient quais foram configuradas para consumir eventos do mesmo Hub de Eventos e grupo de consumidores e o mesmo CheckpointStore. Eles equilibram a carga de trabalho em diferentes instâncias distribuindo as partições a serem processadas entre si.

  • Verificação: É um processo pelo qual os leitores marcam ou confirmam sua posição dentro de uma sequência de eventos de partição. O ponto de verificação é responsabilidade do consumidor e ocorre em uma base por partição dentro de um grupo de consumidores. Essa responsabilidade significa que, para cada grupo de consumidores, cada leitor de partição deve manter o controle da sua posição atual no fluxo de eventos e pode informar o serviço quando considerar o fluxo de dados concluído.

    Se um leitor se desconecta de uma partição, ao se reconectar, ele começa a ler no ponto de verificação que foi anteriormente enviado pelo último leitor dessa partição nesse grupo de consumidores. Quando o leitor se conecta, ele passa esse deslocamento para o hub de eventos para especificar o local para começar a ler. Assim, você pode usar o ponto de verificação para marcar eventos como "concluídos" por aplicativos de downstream e oferecer resiliência caso ocorra um failover entre leitores em execução em máquinas diferentes. É possível retornar aos dados mais antigos, especificando um deslocamento inferior desse processo de ponto de verificação. Por meio desse mecanismo, o ponto de verificação permite resiliência de failover e reprodução de fluxo de eventos.

    Um BlobCheckpointStore é uma classe que implementa os principais métodos exigidos pelo EventHubConsumerClient para balancear a carga e atualizar pontos de verificação.

Exemplos

Criar um CheckpointStore usando Armazenamento de Blobs do Azure

Use o snippet de código abaixo para criar um CheckpointStore. Você precisará fornecer o cadeia de conexão à sua conta de armazenamento.

import { ContainerClient } from "@azure/storage-blob",
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob"

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!containerClient.exists()) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore =  new BlobCheckpointStore(containerClient);

Eventos de ponto de verificação usando o Armazenamento de Blobs do Azure

Para eventos de ponto de verificação recebidos usando Armazenamento de Blobs do Azure, você precisará passar um objeto compatível com a interface SubscriptionEventHandlers junto com o código para chamar o updateCheckpoint() método .

Neste exemplo, SubscriptionHandlers implementa SubscriptionEventHandlers e também manipula o ponto de verificação.

import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
import { EventHubConsumerClient } from "@azure/event-hubs";

const consumerGroup = "consumer-group-name";
const connectionString = "event-hub-connectionstring";

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!(await containerClient.exists())) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore = new BlobCheckpointStore(containerClient);

class SubscriptionHandlers {
  async processEvents(event, context) {
    // custom logic for processing events goes here

    // Checkpointing will allow your service to restart and pick
    // up from where it left off.
    //
    // You'll want to balance how often you checkpoint with the
    // performance of your underlying checkpoint store.
    await context.updateCheckpoint(event);
  }

  async processError(err, context) {
    // handle any errors that occur during the course of
    // this subscription
    console.log(`Errors in subscription: ${err}`);
  }
}

const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, checkpointStore);

const subscription = consumerClient.subscribe(new SubscriptionHandlers());

// events will now flow into the handlers defined above
// to stop the subscription:
subscription.close();

Solução de problemas

Habilitar logs

Você pode definir a AZURE_LOG_LEVEL variável de ambiente como um dos seguintes valores para habilitar o registro em log como stderr:

  • verbose
  • informações
  • warning
  • error

Você também pode definir o nível de log programaticamente importando o pacote @azure/agente e chamando a setLogLevel função com um dos valores de nível de log.

Ao definir um nível de log programaticamente ou por meio da variável de ambiente, todos os AZURE_LOG_LEVEL logs gravados usando um nível de log igual ou menor que o escolhido serão emitidos. Por exemplo, quando você define o nível de log como info, os logs gravados para níveis warning e error também são emitidos. Esse SDK segue as diretrizes do SDK do Azure para TypeScript ao determinar em qual nível fazer logon.

Como alternativa, você pode definir a DEBUG variável de ambiente para obter logs ao usar essa biblioteca. Isso pode ser útil se você também quiser emitir logs das dependências rhea-promise e rhea também.

Nota: AZURE_LOG_LEVEL, se definido, tem precedência sobre DEBUG. Não especifique nenhuma azure biblioteca por meio de DEBUG ao especificar também AZURE_LOG_LEVEL ou chamar setLogLevel.

Você pode definir a variável de ambiente a seguir para obter os logs de depuração ao usar essa biblioteca.

  • Obtendo apenas logs de depuração no nível de informações do Blob do Checkpointstore do Eventhubs.
export DEBUG=azure:eventhubs-checkpointstore-blob:info

Registrando em log em um arquivo

  • Habilite o registro em log conforme mostrado acima e execute o script de teste da seguinte maneira:

    • As instruções de log do script de teste vão para out.log e as instruções de registro em log do sdk vão para debug.log.

      node your-test-script.js > out.log 2>debug.log
      
    • As instruções de log do script de teste e do sdk vão para o mesmo arquivo out.log redirecionando stderr para stdout (&1) e redirecionando stdout para um arquivo:

      node your-test-script.js >out.log 2>&1
      
    • As instruções de log do script de teste e do sdk vão para o mesmo arquivo out.log.

      node your-test-script.js &> out.log
      

Próximas etapas

Dê uma olhada no diretório de exemplos para obter um exemplo detalhado.

Contribuição

Se você quiser contribuir com essa biblioteca, leia o guia de contribuição para saber como criar e testar o código.

Impressões