Hub eventi di Azure libreria dell'archivio checkpoint per Javascript con BLOB di archiviazione
Una soluzione basata sull'archiviazione BLOB di Azure per archiviare i checkpoint e per facilitare il bilanciamento del carico quando si usa EventHubConsumerClient
dalla libreria @azure/hub eventi
Codice | sorgentePacchetto (npm) | Documentazione di | riferimento sulle APICampioni
Introduzione
Installare il pacchetto
Installare la libreria BLOB dell'archivio checkpoint Hub eventi di Azure tramite npm
npm install @azure/eventhubs-checkpointstore-blob
Prerequisiti: è necessario avere una sottoscrizione di Azure, uno spazio dei nomi di Hub eventi per usare questo pacchetto e un account di archiviazione
Se si usa questo pacchetto in un'applicazione Node.js, usare Node.js 8.x o versione successiva.
Configurare Typescript
Gli utenti typeScript devono avere le definizioni dei tipi di nodo installate:
npm install @types/node
È anche necessario abilitare compilerOptions.allowSyntheticDefaultImports
nel file tsconfig.json. Si noti che se è stato abilitato , allowSyntheticDefaultImports
è abilitato compilerOptions.esModuleInterop
per impostazione predefinita. Per altre informazioni, vedere il manuale delle opzioni del compilatore di TypeScript .
Concetti chiave
Scalabilità: creare diversi consumer, ciascuno tra questi che dispone di proprietà di lettura derivanti da alcune partizioni di Hub eventi.
Bilanciamento del carico: Le applicazioni che supportano il bilanciamento del carico sono costituite da una o più istanze di che sono state configurate per l'utilizzo di
EventHubConsumerClient
eventi dello stesso hub eventi e del gruppo di consumer e dello stessoCheckpointStore
. Bilanciano il carico di lavoro tra istanze diverse distribuendo le partizioni da elaborare tra loro.Checkpointing: Si tratta di un processo in base al quale i lettori contrassegnano o eseguono il commit della posizione all'interno di una sequenza di eventi di partizione. Il checkpoint è responsabilità del consumer e si verifica per partizione all'interno di un gruppo di consumer. Questa responsabilità significa che per ogni gruppo di consumer, ogni lettore di partizione deve tenere traccia della posizione corrente nel flusso di eventi e può informare il servizio quando considera completo il flusso di dati.
Se un lettore si disconnette da una partizione, quando riconnette inizia a leggere in corrispondenza del checkpoint inviato in precedenza dall’ulitimo lettore di tale partizione in tale gruppo di consumer. Quando il lettore si connette, passa l'offset all'hub eventi per specificare la posizione da cui iniziare la lettura. In questo modo è possibile usare la funzionalità di checkpoint sia per contrassegnare gli eventi come "completi" dalle applicazioni a valle sia per fornire la resilienza in caso di failover tra i lettori in esecuzione in computer diversi. È possibile tornare a dati precedenti specificando un offset inferiore da questo processo di checkpoint. Tramite questo meccanismo il checkpoint consente sia la resilienza del failover che la riproduzione del flusso di eventi.
BlobCheckpointStore è una classe che implementa i metodi chiave richiesti da EventHubConsumerClient per bilanciare il carico e aggiornare i checkpoint.
Esempio
- Creare un checkpointstore usando Archiviazione BLOB di Azure
- Eventi di checkpoint con l'archiviazione BLOB di Azure
Creare un CheckpointStore
oggetto usando Archiviazione BLOB di Azure
Usare il frammento di codice seguente per creare un oggetto CheckpointStore
. Sarà necessario fornire il stringa di connessione all'account di archiviazione.
import { ContainerClient } from "@azure/storage-blob",
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob"
const containerClient = new ContainerClient("storage-connection-string", "container-name");
if (!containerClient.exists()) {
await containerClient.create(); // This can be skipped if the container already exists
}
const checkpointStore = new BlobCheckpointStore(containerClient);
Eventi di checkpoint con l'archiviazione BLOB di Azure
Per inviare eventi di checkpoint ricevuti tramite Archiviazione BLOB di Azure, è necessario passare un oggetto compatibile con l'interfaccia SubscriptionEventHandlers insieme al codice per chiamare il updateCheckpoint()
metodo .
In questo esempio implementa SubscriptionHandlers
SubscriptionEventHandlers e gestisce anche il checkpoint.
import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
import { EventHubConsumerClient } from "@azure/event-hubs";
const consumerGroup = "consumer-group-name";
const connectionString = "event-hub-connectionstring";
const containerClient = new ContainerClient("storage-connection-string", "container-name");
if (!(await containerClient.exists())) {
await containerClient.create(); // This can be skipped if the container already exists
}
const checkpointStore = new BlobCheckpointStore(containerClient);
class SubscriptionHandlers {
async processEvents(event, context) {
// custom logic for processing events goes here
// Checkpointing will allow your service to restart and pick
// up from where it left off.
//
// You'll want to balance how often you checkpoint with the
// performance of your underlying checkpoint store.
await context.updateCheckpoint(event);
}
async processError(err, context) {
// handle any errors that occur during the course of
// this subscription
console.log(`Errors in subscription: ${err}`);
}
}
const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, checkpointStore);
const subscription = consumerClient.subscribe(new SubscriptionHandlers());
// events will now flow into the handlers defined above
// to stop the subscription:
subscription.close();
Risoluzione dei problemi
Abilitare i log
È possibile impostare la AZURE_LOG_LEVEL
variabile di ambiente su uno dei valori seguenti per abilitare la registrazione in stderr
:
- verbose
- info
- warning
- error
È anche possibile impostare il livello di log a livello di log a livello di codice importando il pacchetto @azure/logger e chiamando la setLogLevel
funzione con uno dei valori del livello di log.
Quando si imposta un livello di log a livello programmato o tramite la AZURE_LOG_LEVEL
variabile di ambiente, tutti i log scritti usando un livello di log uguale o inferiore a quello scelto verranno generati.
Ad esempio, quando si imposta il livello di log su info
, vengono generati anche i log scritti per i livelli warning
e error
.
Questo SDK segue le linee guida di Azure SDK per TypeScript per determinare a quale livello accedere.
In alternativa, è possibile impostare la DEBUG
variabile di ambiente per ottenere i log quando si usa questa libreria.
Ciò può essere utile anche se si vogliono generare log dalle dipendenze rhea-promise
e rhea
.
Nota: AZURE_LOG_LEVEL, se impostato, ha la precedenza su DEBUG.
Non specificare librerie azure
tramite DEBUG quando si specificano anche AZURE_LOG_LEVEL o chiamando setLogLevel.
È possibile impostare la variabile di ambiente seguente per ottenere i log di debug quando si usa questa libreria.
- Ottenere solo i log di debug a livello di informazioni dal BLOB dell'archivio checkpoint eventhubs.
export DEBUG=azure:eventhubs-checkpointstore-blob:info
Registrazione in un file
Abilitare la registrazione come illustrato in precedenza e quindi eseguire lo script di test come indicato di seguito:
Le istruzioni di registrazione dello script di test passano alle
out.log
istruzioni di registrazione e dall'SDK passano adebug.log
.node your-test-script.js > out.log 2>debug.log
Le istruzioni di registrazione dallo script di test e l'SDK passano allo stesso file
out.log
reindirizzando stderr a stdout (&1) e quindi reindirizzare stdout a un file:node your-test-script.js >out.log 2>&1
Le istruzioni di registrazione dallo script di test e l'SDK passano allo stesso file
out.log
.node your-test-script.js &> out.log
Passaggi successivi
Esaminare la directory degli esempi per un esempio dettagliato.
Contributo
Per contribuire a questa libreria, leggere la guida ai contributi per altre informazioni su come compilare e testare il codice.
Azure SDK for JavaScript