Condividi tramite


Hub eventi di Azure libreria dell'archivio checkpoint per Javascript con BLOB di archiviazione

Una soluzione basata sull'archiviazione BLOB di Azure per archiviare i checkpoint e per facilitare il bilanciamento del carico quando si usa EventHubConsumerClient dalla libreria @azure/hub eventi

Codice | sorgentePacchetto (npm) | Documentazione di | riferimento sulle APICampioni

Introduzione

Installare il pacchetto

Installare la libreria BLOB dell'archivio checkpoint Hub eventi di Azure tramite npm

npm install @azure/eventhubs-checkpointstore-blob

Prerequisiti: è necessario avere una sottoscrizione di Azure, uno spazio dei nomi di Hub eventi per usare questo pacchetto e un account di archiviazione

Se si usa questo pacchetto in un'applicazione Node.js, usare Node.js 8.x o versione successiva.

Configurare Typescript

Gli utenti typeScript devono avere le definizioni dei tipi di nodo installate:

npm install @types/node

È anche necessario abilitare compilerOptions.allowSyntheticDefaultImports nel file tsconfig.json. Si noti che se è stato abilitato , allowSyntheticDefaultImports è abilitato compilerOptions.esModuleInteropper impostazione predefinita. Per altre informazioni, vedere il manuale delle opzioni del compilatore di TypeScript .

Concetti chiave

  • Scalabilità: creare diversi consumer, ciascuno tra questi che dispone di proprietà di lettura derivanti da alcune partizioni di Hub eventi.

  • Bilanciamento del carico: Le applicazioni che supportano il bilanciamento del carico sono costituite da una o più istanze di che sono state configurate per l'utilizzo di EventHubConsumerClient eventi dello stesso hub eventi e del gruppo di consumer e dello stesso CheckpointStore. Bilanciano il carico di lavoro tra istanze diverse distribuendo le partizioni da elaborare tra loro.

  • Checkpointing: Si tratta di un processo in base al quale i lettori contrassegnano o eseguono il commit della posizione all'interno di una sequenza di eventi di partizione. Il checkpoint è responsabilità del consumer e si verifica per partizione all'interno di un gruppo di consumer. Questa responsabilità significa che per ogni gruppo di consumer, ogni lettore di partizione deve tenere traccia della posizione corrente nel flusso di eventi e può informare il servizio quando considera completo il flusso di dati.

    Se un lettore si disconnette da una partizione, quando riconnette inizia a leggere in corrispondenza del checkpoint inviato in precedenza dall’ulitimo lettore di tale partizione in tale gruppo di consumer. Quando il lettore si connette, passa l'offset all'hub eventi per specificare la posizione da cui iniziare la lettura. In questo modo è possibile usare la funzionalità di checkpoint sia per contrassegnare gli eventi come "completi" dalle applicazioni a valle sia per fornire la resilienza in caso di failover tra i lettori in esecuzione in computer diversi. È possibile tornare a dati precedenti specificando un offset inferiore da questo processo di checkpoint. Tramite questo meccanismo il checkpoint consente sia la resilienza del failover che la riproduzione del flusso di eventi.

    BlobCheckpointStore è una classe che implementa i metodi chiave richiesti da EventHubConsumerClient per bilanciare il carico e aggiornare i checkpoint.

Esempio

Creare un CheckpointStore oggetto usando Archiviazione BLOB di Azure

Usare il frammento di codice seguente per creare un oggetto CheckpointStore. Sarà necessario fornire il stringa di connessione all'account di archiviazione.

import { ContainerClient } from "@azure/storage-blob",
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob"

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!containerClient.exists()) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore =  new BlobCheckpointStore(containerClient);

Eventi di checkpoint con l'archiviazione BLOB di Azure

Per inviare eventi di checkpoint ricevuti tramite Archiviazione BLOB di Azure, è necessario passare un oggetto compatibile con l'interfaccia SubscriptionEventHandlers insieme al codice per chiamare il updateCheckpoint() metodo .

In questo esempio implementa SubscriptionHandlersSubscriptionEventHandlers e gestisce anche il checkpoint.

import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
import { EventHubConsumerClient } from "@azure/event-hubs";

const consumerGroup = "consumer-group-name";
const connectionString = "event-hub-connectionstring";

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!(await containerClient.exists())) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore = new BlobCheckpointStore(containerClient);

class SubscriptionHandlers {
  async processEvents(event, context) {
    // custom logic for processing events goes here

    // Checkpointing will allow your service to restart and pick
    // up from where it left off.
    //
    // You'll want to balance how often you checkpoint with the
    // performance of your underlying checkpoint store.
    await context.updateCheckpoint(event);
  }

  async processError(err, context) {
    // handle any errors that occur during the course of
    // this subscription
    console.log(`Errors in subscription: ${err}`);
  }
}

const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, checkpointStore);

const subscription = consumerClient.subscribe(new SubscriptionHandlers());

// events will now flow into the handlers defined above
// to stop the subscription:
subscription.close();

Risoluzione dei problemi

Abilitare i log

È possibile impostare la AZURE_LOG_LEVEL variabile di ambiente su uno dei valori seguenti per abilitare la registrazione in stderr:

  • verbose
  • info
  • warning
  • error

È anche possibile impostare il livello di log a livello di log a livello di codice importando il pacchetto @azure/logger e chiamando la setLogLevel funzione con uno dei valori del livello di log.

Quando si imposta un livello di log a livello programmato o tramite la AZURE_LOG_LEVEL variabile di ambiente, tutti i log scritti usando un livello di log uguale o inferiore a quello scelto verranno generati. Ad esempio, quando si imposta il livello di log su info, vengono generati anche i log scritti per i livelli warning e error . Questo SDK segue le linee guida di Azure SDK per TypeScript per determinare a quale livello accedere.

In alternativa, è possibile impostare la DEBUG variabile di ambiente per ottenere i log quando si usa questa libreria. Ciò può essere utile anche se si vogliono generare log dalle dipendenze rhea-promise e rhea .

Nota: AZURE_LOG_LEVEL, se impostato, ha la precedenza su DEBUG. Non specificare librerie azure tramite DEBUG quando si specificano anche AZURE_LOG_LEVEL o chiamando setLogLevel.

È possibile impostare la variabile di ambiente seguente per ottenere i log di debug quando si usa questa libreria.

  • Ottenere solo i log di debug a livello di informazioni dal BLOB dell'archivio checkpoint eventhubs.
export DEBUG=azure:eventhubs-checkpointstore-blob:info

Registrazione in un file

  • Abilitare la registrazione come illustrato in precedenza e quindi eseguire lo script di test come indicato di seguito:

    • Le istruzioni di registrazione dello script di test passano alle out.log istruzioni di registrazione e dall'SDK passano a debug.log.

      node your-test-script.js > out.log 2>debug.log
      
    • Le istruzioni di registrazione dallo script di test e l'SDK passano allo stesso file out.log reindirizzando stderr a stdout (&1) e quindi reindirizzare stdout a un file:

      node your-test-script.js >out.log 2>&1
      
    • Le istruzioni di registrazione dallo script di test e l'SDK passano allo stesso file out.log.

      node your-test-script.js &> out.log
      

Passaggi successivi

Esaminare la directory degli esempi per un esempio dettagliato.

Contributo

Per contribuire a questa libreria, leggere la guida ai contributi per altre informazioni su come compilare e testare il codice.

Impression