Hubs de Eventos do Azure biblioteca do Repositório de Ponto de Verificação para Javascript usando Blobs de Armazenamento
Uma solução baseada no Armazenamento de Blobs do Azure para armazenar pontos de verificação e auxiliar no balanceamento de carga ao usar EventHubConsumerClient
da biblioteca @azure/hubs de eventos
Código-fonte | Pacote (npm) | Documentação | de referência da APIAmostras
Introdução
Instalar o pacote
Instalar a biblioteca de Blobs do Repositório de Ponto de Verificação do Hubs de Eventos do Azure usando o npm
npm install @azure/eventhubs-checkpointstore-blob
Pré-requisitos: você deve ter uma assinatura do Azure, um namespace dos Hubs de Eventos para usar esse pacote e uma conta de armazenamento
Se você estiver usando esse pacote em um aplicativo Node.js, use Node.js 8.x ou superior.
Configurar Typescript
Os usuários do TypeScript precisam ter definições de tipo de nó instaladas:
npm install @types/node
Você também precisa habilitar compilerOptions.allowSyntheticDefaultImports
em seu tsconfig.json. Observe que, se você tiver habilitado compilerOptions.esModuleInterop
, allowSyntheticDefaultImports
será habilitado por padrão. Confira o manual de opções do compilador do TypeScript para obter mais informações.
Principais conceitos
Escala: criar vários consumidores, com cada consumidor assumindo a propriedade de leitura de algumas partições de Hubs de eventos.
Balanceamento de carga: Os aplicativos que dão suporte ao balanceamento de carga consistem em uma ou mais instâncias das
EventHubConsumerClient
quais foram configuradas para consumir eventos do mesmo Hub de Eventos e grupo de consumidores e o mesmoCheckpointStore
. Eles equilibram a carga de trabalho em diferentes instâncias distribuindo as partições a serem processadas entre si.Verificação: É um processo pelo qual os leitores marcam ou confirmam sua posição dentro de uma sequência de eventos de partição. O ponto de verificação é responsabilidade do consumidor e ocorre em uma base por partição dentro de um grupo de consumidores. Essa responsabilidade significa que, para cada grupo de consumidores, cada leitor de partição deve manter o controle da sua posição atual no fluxo de eventos e pode informar o serviço quando considerar o fluxo de dados concluído.
Se um leitor se desconecta de uma partição, ao se reconectar, ele começa a ler no ponto de verificação que foi anteriormente enviado pelo último leitor dessa partição nesse grupo de consumidores. Quando o leitor se conecta, ele passa esse deslocamento para o hub de eventos para especificar o local para começar a ler. Assim, você pode usar o ponto de verificação para marcar eventos como "concluídos" por aplicativos de downstream e oferecer resiliência caso ocorra um failover entre leitores em execução em máquinas diferentes. É possível retornar aos dados mais antigos, especificando um deslocamento inferior desse processo de ponto de verificação. Por meio desse mecanismo, o ponto de verificação permite resiliência de failover e reprodução de fluxo de eventos.
Um BlobCheckpointStore é uma classe que implementa os principais métodos exigidos pelo EventHubConsumerClient para balancear a carga e atualizar pontos de verificação.
Exemplos
- Criar um CheckpointStore usando Armazenamento de Blobs do Azure
- Eventos de ponto de verificação usando o Armazenamento de Blobs do Azure
Criar um CheckpointStore
usando Armazenamento de Blobs do Azure
Use o snippet de código abaixo para criar um CheckpointStore
. Você precisará fornecer o cadeia de conexão à sua conta de armazenamento.
import { ContainerClient } from "@azure/storage-blob",
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob"
const containerClient = new ContainerClient("storage-connection-string", "container-name");
if (!containerClient.exists()) {
await containerClient.create(); // This can be skipped if the container already exists
}
const checkpointStore = new BlobCheckpointStore(containerClient);
Eventos de ponto de verificação usando o Armazenamento de Blobs do Azure
Para eventos de ponto de verificação recebidos usando Armazenamento de Blobs do Azure, você precisará passar um objeto compatível com a interface SubscriptionEventHandlers junto com o código para chamar o updateCheckpoint()
método .
Neste exemplo, SubscriptionHandlers
implementa SubscriptionEventHandlers e também manipula o ponto de verificação.
import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
import { EventHubConsumerClient } from "@azure/event-hubs";
const consumerGroup = "consumer-group-name";
const connectionString = "event-hub-connectionstring";
const containerClient = new ContainerClient("storage-connection-string", "container-name");
if (!(await containerClient.exists())) {
await containerClient.create(); // This can be skipped if the container already exists
}
const checkpointStore = new BlobCheckpointStore(containerClient);
class SubscriptionHandlers {
async processEvents(event, context) {
// custom logic for processing events goes here
// Checkpointing will allow your service to restart and pick
// up from where it left off.
//
// You'll want to balance how often you checkpoint with the
// performance of your underlying checkpoint store.
await context.updateCheckpoint(event);
}
async processError(err, context) {
// handle any errors that occur during the course of
// this subscription
console.log(`Errors in subscription: ${err}`);
}
}
const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, checkpointStore);
const subscription = consumerClient.subscribe(new SubscriptionHandlers());
// events will now flow into the handlers defined above
// to stop the subscription:
subscription.close();
Solução de problemas
Habilitar logs
Você pode definir a AZURE_LOG_LEVEL
variável de ambiente como um dos seguintes valores para habilitar o registro em log como stderr
:
- verbose
- informações
- warning
- error
Você também pode definir o nível de log programaticamente importando o pacote @azure/agente e chamando a setLogLevel
função com um dos valores de nível de log.
Ao definir um nível de log programaticamente ou por meio da variável de ambiente, todos os AZURE_LOG_LEVEL
logs gravados usando um nível de log igual ou menor que o escolhido serão emitidos.
Por exemplo, quando você define o nível de log como info
, os logs gravados para níveis warning
e error
também são emitidos.
Esse SDK segue as diretrizes do SDK do Azure para TypeScript ao determinar em qual nível fazer logon.
Como alternativa, você pode definir a DEBUG
variável de ambiente para obter logs ao usar essa biblioteca.
Isso pode ser útil se você também quiser emitir logs das dependências rhea-promise
e rhea
também.
Nota: AZURE_LOG_LEVEL, se definido, tem precedência sobre DEBUG.
Não especifique nenhuma azure
biblioteca por meio de DEBUG ao especificar também AZURE_LOG_LEVEL ou chamar setLogLevel.
Você pode definir a variável de ambiente a seguir para obter os logs de depuração ao usar essa biblioteca.
- Obtendo apenas logs de depuração no nível de informações do Blob do Checkpointstore do Eventhubs.
export DEBUG=azure:eventhubs-checkpointstore-blob:info
Registrando em log em um arquivo
Habilite o registro em log conforme mostrado acima e execute o script de teste da seguinte maneira:
As instruções de log do script de teste vão para
out.log
e as instruções de registro em log do sdk vão paradebug.log
.node your-test-script.js > out.log 2>debug.log
As instruções de log do script de teste e do sdk vão para o mesmo arquivo
out.log
redirecionando stderr para stdout (&1) e redirecionando stdout para um arquivo:node your-test-script.js >out.log 2>&1
As instruções de log do script de teste e do sdk vão para o mesmo arquivo
out.log
.node your-test-script.js &> out.log
Próximas etapas
Dê uma olhada no diretório de exemplos para obter um exemplo detalhado.
Contribuição
Se você quiser contribuir com essa biblioteca, leia o guia de contribuição para saber como criar e testar o código.
Azure SDK for JavaScript