Azure Event Hubs Prüfpunktspeicherbibliothek für JavaScript mithilfe von Speicherblobs
Eine Azure Blob Storage-basierte Lösung zum Speichern von Prüfpunkten und zur Unterstützung des Lastenausgleichs bei Verwendung EventHubConsumerClient
aus der @azure/Event Hubs-Bibliothek
Quellcode | Paket (npm) | API-Referenzdokumentation | Proben
Erste Schritte
Installieren des Pakets
Installieren der Azure Event Hubs Checkpoint Store-Blobbibliothek mithilfe von npm
npm install @azure/eventhubs-checkpointstore-blob
Voraussetzungen: Sie benötigen ein Azure-Abonnement, einen Event Hubs-Namespace, um dieses Paket verwenden zu können, und ein Speicherkonto.
Wenn Sie dieses Paket in einer Node.js-Anwendung verwenden, verwenden Sie Node.js 8.x oder höher.
Konfigurieren von Typescript
TypeScript-Benutzer müssen Knotentypdefinitionen installiert haben:
npm install @types/node
Sie müssen auch in Ihrer tsconfig.json-Datei aktivieren compilerOptions.allowSyntheticDefaultImports
. Beachten Sie, dass, wenn Sie aktiviert compilerOptions.esModuleInterop
haben, allowSyntheticDefaultImports
standardmäßig aktiviert ist. Weitere Informationen finden Sie im Handbuch für Compileroptionen von TypeScript .
Wichtige Begriffe
Skalierung: Erstellen mehrerer Consumer, wobei jeder Consumer die Besitzrechte für das Lesen einiger Event Hubs-Partitionen übernimmt.
Lastenausgleich: Anwendungen, die den Lastenausgleich unterstützen, bestehen aus einer oder mehreren Instanzen, von
EventHubConsumerClient
denen für die Nutzung von Ereignissen aus demselben Event Hub und derselben Consumergruppe und derselbenCheckpointStore
konfiguriert wurde. Sie verteilen die Workload auf verschiedene Instanzen, indem sie die zu verarbeitenden Partitionen untereinander verteilen.Prüfpunkte: Dabei handelt es sich um einen Prozess, mit dem Leser ihre Position innerhalb einer Partitionsereignissequenz markieren oder committen. Dies liegt in der Verantwortung des Consumers und erfolgt auf Partitionsbasis innerhalb einer Consumergruppe. Das bedeutet, dass jeder Partitionsleser für jede Consumergruppe seine aktuelle Position im Ereignisstream nachverfolgen muss und den Dienst informieren kann, wenn er den Datenstrom als abgeschlossen betrachtet.
Wenn ein Leser die Verbindung zu eine Partition trennt, beginnt nach dem erneuten Herstellen der Verbindung das Lesen bei dem Prüfpunkt, der zuvor durch den letzten Leser dieser Partition in dieser Consumergruppe übermittelt wurde. Wenn der Leser verbunden ist, übergibt er diesen Offset an den Event Hub, um die Position für den nächsten Lesevorgang anzugeben. Auf diese Weise können mithilfe von Prüfpunkten Ereignisse von Downstreamanwendungen als abgeschlossen markiert werden. Darüber hinaus sorgen Prüfpunkte für Resilienz bei einem Failover zwischen Lesern, die auf unterschiedlichen Computern ausgeführt werden. Sie können ältere Daten zurückgeben, indem Sie einen niedrigeren Offset aus diesem Prüfpunktprozess angeben. Durch diesen Mechanismus ermöglicht das Setzen von Prüfpunkten Failoverstabilität und eine erneute Wiedergabe des Ereignisstreams.
Ein BlobCheckpointStore ist eine Klasse, die schlüsselbasierte Methoden implementiert, die vom EventHubConsumerClient zum Ausgleich von Last- und Aktualisierungsprüfpunkten erforderlich sind.
Beispiele
- Erstellen eines CheckpointStore mit Azure Blob Storage
- Prüfpunktereignisse mithilfe von Azure Blob Storage
Erstellen eines CheckpointStore
using-Azure Blob Storage
Verwenden Sie den folgenden Codeausschnitt, um einen CheckpointStore
zu erstellen. Sie müssen die Verbindungszeichenfolge für Ihr Speicherkonto angeben.
import { ContainerClient } from "@azure/storage-blob",
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob"
const containerClient = new ContainerClient("storage-connection-string", "container-name");
if (!containerClient.exists()) {
await containerClient.create(); // This can be skipped if the container already exists
}
const checkpointStore = new BlobCheckpointStore(containerClient);
Prüfpunktereignisse mithilfe von Azure Blob Storage
Um Mit Azure Blob Storage empfangene Prüfpunktereignisse zu erhalten, müssen Sie ein Objekt übergeben, das mit der SubscriptionEventHandlers-Schnittstelle kompatibel ist, zusammen mit Code zum Aufrufen der updateCheckpoint()
-Methode.
In diesem Beispiel SubscriptionHandlers
implementiert SubscriptionEventHandlers und behandelt auch prüfpunkte.
import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
import { EventHubConsumerClient } from "@azure/event-hubs";
const consumerGroup = "consumer-group-name";
const connectionString = "event-hub-connectionstring";
const containerClient = new ContainerClient("storage-connection-string", "container-name");
if (!(await containerClient.exists())) {
await containerClient.create(); // This can be skipped if the container already exists
}
const checkpointStore = new BlobCheckpointStore(containerClient);
class SubscriptionHandlers {
async processEvents(event, context) {
// custom logic for processing events goes here
// Checkpointing will allow your service to restart and pick
// up from where it left off.
//
// You'll want to balance how often you checkpoint with the
// performance of your underlying checkpoint store.
await context.updateCheckpoint(event);
}
async processError(err, context) {
// handle any errors that occur during the course of
// this subscription
console.log(`Errors in subscription: ${err}`);
}
}
const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, checkpointStore);
const subscription = consumerClient.subscribe(new SubscriptionHandlers());
// events will now flow into the handlers defined above
// to stop the subscription:
subscription.close();
Problembehandlung
Aktivieren von Protokollen
Sie können die Umgebungsvariable AZURE_LOG_LEVEL
auf einen der folgenden Werte festlegen, um die Protokollierung in zu stderr
aktivieren:
- Ausführlich
- info
- warning
- error
Sie können die Protokollebene auch programmgesteuert festlegen, indem Sie das @azure-/Protokollierungspaket importieren und die setLogLevel
Funktion mit einem der Protokollebenenwerte aufrufen.
Wenn Sie eine Protokollebene entweder programmgesteuert oder über die Umgebungsvariable AZURE_LOG_LEVEL
festlegen, werden alle Protokolle ausgegeben, die mit einer Protokollebene geschrieben werden, die gleich oder kleiner als der von Ihnen ausgewählten ist.
Wenn Sie z. B. die Protokollebene auf info
festlegen, werden die Protokolle, die für Ebenen warning
geschrieben und error
auch ausgegeben werden, festgelegt.
Dieses SDK folgt den Richtlinien des Azure SDK für TypeScript, wenn bestimmt wird, auf welcher Ebene die Anmeldung erfolgt.
Alternativ können Sie die Umgebungsvariable DEBUG
festlegen, um Protokolle abzurufen, wenn Sie diese Bibliothek verwenden.
Dies kann hilfreich sein, wenn Sie auch Protokolle aus den Abhängigkeiten rhea-promise
und rhea
ausgeben möchten.
Hinweis: AZURE_LOG_LEVEL hat, sofern festgelegt, Vorrang vor DEBUG.
Geben Sie keine azure
Bibliotheken über DEBUG an, wenn Sie auch AZURE_LOG_LEVEL angeben oder setLogLevel aufrufen.
Sie können die folgende Umgebungsvariable festlegen, um Debugprotokolle anzuzeigen, wenn Sie diese Bibliothek verwenden.
- Nur Debugprotokolle auf Infoebene aus dem Eventhubs Checkpointstore-Blob werden abgerufen.
export DEBUG=azure:eventhubs-checkpointstore-blob:info
Protokollierung in einer Datei
Aktivieren Sie die Protokollierung wie oben gezeigt, und führen Sie dann Ihr Testskript wie folgt aus:
Protokollierungsanweisungen aus Ihrem Testskript wechseln zu
out.log
, und Protokollierungsanweisungen aus dem SDK wechseln zudebug.log
.node your-test-script.js > out.log 2>debug.log
Protokollierungsanweisungen aus Ihrem Testskript und dem SDK wechseln zu derselben Datei
out.log
, indem sie stderr zu stdout (&1) umleiten und dann stdout in eine Datei umleiten:node your-test-script.js >out.log 2>&1
Protokollierungsanweisungen aus Ihrem Testskript und dem SDK wechseln zu derselben Datei
out.log
.node your-test-script.js &> out.log
Nächste Schritte
Ein ausführliches Beispiel finden Sie im Beispielverzeichnis .
Mitwirken
Wenn Sie an dieser Bibliothek mitwirken möchten, lesen Sie die Anleitung für Mitwirkende, um mehr darüber zu erfahren, wie Sie den Code erstellen und testen können.
Azure SDK for JavaScript