Freigeben über


Azure Event Hubs Prüfpunktspeicherbibliothek für JavaScript mithilfe von Speicherblobs

Eine Azure Blob Storage-basierte Lösung zum Speichern von Prüfpunkten und zur Unterstützung des Lastenausgleichs bei Verwendung EventHubConsumerClient aus der @azure/Event Hubs-Bibliothek

Quellcode | Paket (npm) | API-Referenzdokumentation | Proben

Erste Schritte

Installieren des Pakets

Installieren der Azure Event Hubs Checkpoint Store-Blobbibliothek mithilfe von npm

npm install @azure/eventhubs-checkpointstore-blob

Voraussetzungen: Sie benötigen ein Azure-Abonnement, einen Event Hubs-Namespace, um dieses Paket verwenden zu können, und ein Speicherkonto.

Wenn Sie dieses Paket in einer Node.js-Anwendung verwenden, verwenden Sie Node.js 8.x oder höher.

Konfigurieren von Typescript

TypeScript-Benutzer müssen Knotentypdefinitionen installiert haben:

npm install @types/node

Sie müssen auch in Ihrer tsconfig.json-Datei aktivieren compilerOptions.allowSyntheticDefaultImports . Beachten Sie, dass, wenn Sie aktiviert compilerOptions.esModuleInterophaben, allowSyntheticDefaultImports standardmäßig aktiviert ist. Weitere Informationen finden Sie im Handbuch für Compileroptionen von TypeScript .

Wichtige Begriffe

  • Skalierung: Erstellen mehrerer Consumer, wobei jeder Consumer die Besitzrechte für das Lesen einiger Event Hubs-Partitionen übernimmt.

  • Lastenausgleich: Anwendungen, die den Lastenausgleich unterstützen, bestehen aus einer oder mehreren Instanzen, von EventHubConsumerClient denen für die Nutzung von Ereignissen aus demselben Event Hub und derselben Consumergruppe und derselben CheckpointStorekonfiguriert wurde. Sie verteilen die Workload auf verschiedene Instanzen, indem sie die zu verarbeitenden Partitionen untereinander verteilen.

  • Prüfpunkte: Dabei handelt es sich um einen Prozess, mit dem Leser ihre Position innerhalb einer Partitionsereignissequenz markieren oder committen. Dies liegt in der Verantwortung des Consumers und erfolgt auf Partitionsbasis innerhalb einer Consumergruppe. Das bedeutet, dass jeder Partitionsleser für jede Consumergruppe seine aktuelle Position im Ereignisstream nachverfolgen muss und den Dienst informieren kann, wenn er den Datenstrom als abgeschlossen betrachtet.

    Wenn ein Leser die Verbindung zu eine Partition trennt, beginnt nach dem erneuten Herstellen der Verbindung das Lesen bei dem Prüfpunkt, der zuvor durch den letzten Leser dieser Partition in dieser Consumergruppe übermittelt wurde. Wenn der Leser verbunden ist, übergibt er diesen Offset an den Event Hub, um die Position für den nächsten Lesevorgang anzugeben. Auf diese Weise können mithilfe von Prüfpunkten Ereignisse von Downstreamanwendungen als abgeschlossen markiert werden. Darüber hinaus sorgen Prüfpunkte für Resilienz bei einem Failover zwischen Lesern, die auf unterschiedlichen Computern ausgeführt werden. Sie können ältere Daten zurückgeben, indem Sie einen niedrigeren Offset aus diesem Prüfpunktprozess angeben. Durch diesen Mechanismus ermöglicht das Setzen von Prüfpunkten Failoverstabilität und eine erneute Wiedergabe des Ereignisstreams.

    Ein BlobCheckpointStore ist eine Klasse, die schlüsselbasierte Methoden implementiert, die vom EventHubConsumerClient zum Ausgleich von Last- und Aktualisierungsprüfpunkten erforderlich sind.

Beispiele

Erstellen eines CheckpointStore using-Azure Blob Storage

Verwenden Sie den folgenden Codeausschnitt, um einen CheckpointStorezu erstellen. Sie müssen die Verbindungszeichenfolge für Ihr Speicherkonto angeben.

import { ContainerClient } from "@azure/storage-blob",
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob"

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!containerClient.exists()) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore =  new BlobCheckpointStore(containerClient);

Prüfpunktereignisse mithilfe von Azure Blob Storage

Um Mit Azure Blob Storage empfangene Prüfpunktereignisse zu erhalten, müssen Sie ein Objekt übergeben, das mit der SubscriptionEventHandlers-Schnittstelle kompatibel ist, zusammen mit Code zum Aufrufen der updateCheckpoint() -Methode.

In diesem Beispiel SubscriptionHandlers implementiert SubscriptionEventHandlers und behandelt auch prüfpunkte.

import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
import { EventHubConsumerClient } from "@azure/event-hubs";

const consumerGroup = "consumer-group-name";
const connectionString = "event-hub-connectionstring";

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!(await containerClient.exists())) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore = new BlobCheckpointStore(containerClient);

class SubscriptionHandlers {
  async processEvents(event, context) {
    // custom logic for processing events goes here

    // Checkpointing will allow your service to restart and pick
    // up from where it left off.
    //
    // You'll want to balance how often you checkpoint with the
    // performance of your underlying checkpoint store.
    await context.updateCheckpoint(event);
  }

  async processError(err, context) {
    // handle any errors that occur during the course of
    // this subscription
    console.log(`Errors in subscription: ${err}`);
  }
}

const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, checkpointStore);

const subscription = consumerClient.subscribe(new SubscriptionHandlers());

// events will now flow into the handlers defined above
// to stop the subscription:
subscription.close();

Problembehandlung

Aktivieren von Protokollen

Sie können die Umgebungsvariable AZURE_LOG_LEVEL auf einen der folgenden Werte festlegen, um die Protokollierung in zu stderraktivieren:

  • Ausführlich
  • info
  • warning
  • error

Sie können die Protokollebene auch programmgesteuert festlegen, indem Sie das @azure-/Protokollierungspaket importieren und die setLogLevel Funktion mit einem der Protokollebenenwerte aufrufen.

Wenn Sie eine Protokollebene entweder programmgesteuert oder über die Umgebungsvariable AZURE_LOG_LEVEL festlegen, werden alle Protokolle ausgegeben, die mit einer Protokollebene geschrieben werden, die gleich oder kleiner als der von Ihnen ausgewählten ist. Wenn Sie z. B. die Protokollebene auf infofestlegen, werden die Protokolle, die für Ebenen warning geschrieben und error auch ausgegeben werden, festgelegt. Dieses SDK folgt den Richtlinien des Azure SDK für TypeScript, wenn bestimmt wird, auf welcher Ebene die Anmeldung erfolgt.

Alternativ können Sie die Umgebungsvariable DEBUG festlegen, um Protokolle abzurufen, wenn Sie diese Bibliothek verwenden. Dies kann hilfreich sein, wenn Sie auch Protokolle aus den Abhängigkeiten rhea-promise und rhea ausgeben möchten.

Hinweis: AZURE_LOG_LEVEL hat, sofern festgelegt, Vorrang vor DEBUG. Geben Sie keine azure Bibliotheken über DEBUG an, wenn Sie auch AZURE_LOG_LEVEL angeben oder setLogLevel aufrufen.

Sie können die folgende Umgebungsvariable festlegen, um Debugprotokolle anzuzeigen, wenn Sie diese Bibliothek verwenden.

  • Nur Debugprotokolle auf Infoebene aus dem Eventhubs Checkpointstore-Blob werden abgerufen.
export DEBUG=azure:eventhubs-checkpointstore-blob:info

Protokollierung in einer Datei

  • Aktivieren Sie die Protokollierung wie oben gezeigt, und führen Sie dann Ihr Testskript wie folgt aus:

    • Protokollierungsanweisungen aus Ihrem Testskript wechseln zu out.log , und Protokollierungsanweisungen aus dem SDK wechseln zu debug.log.

      node your-test-script.js > out.log 2>debug.log
      
    • Protokollierungsanweisungen aus Ihrem Testskript und dem SDK wechseln zu derselben Datei out.log , indem sie stderr zu stdout (&1) umleiten und dann stdout in eine Datei umleiten:

      node your-test-script.js >out.log 2>&1
      
    • Protokollierungsanweisungen aus Ihrem Testskript und dem SDK wechseln zu derselben Datei out.log.

      node your-test-script.js &> out.log
      

Nächste Schritte

Ein ausführliches Beispiel finden Sie im Beispielverzeichnis .

Mitwirken

Wenn Sie an dieser Bibliothek mitwirken möchten, lesen Sie die Anleitung für Mitwirkende, um mehr darüber zu erfahren, wie Sie den Code erstellen und testen können.

Aufrufe