Azure Event Hubs knihovny Checkpoint Store pro JavaScript s využitím objektů blob služby Storage
Řešení založené na službě Azure Blob Storage pro ukládání kontrolních bodů a pomoc při vyrovnávání zatížení při použití EventHubConsumerClient
knihovny @azure/event-hubs
Zdrojový kód | Balíček (npm) | Referenční dokumentace k | rozhraní APIVzorky
Začínáme
Instalace balíčku
Instalace knihovny objektů blob Azure Event Hubs Checkpoint Store pomocí npm
npm install @azure/eventhubs-checkpointstore-blob
Předpoklady: Abyste mohli tento balíček používat, musíte mít předplatné Azure, obor názvů služby Event Hubs a účet úložiště.
Pokud tento balíček používáte v aplikaci Node.js, použijte Node.js 8.x nebo novější.
Konfigurace TypeScriptu
Uživatelé TypeScriptu musí mít nainstalované definice typů uzlů:
npm install @types/node
Musíte také povolit compilerOptions.allowSyntheticDefaultImports
soubor tsconfig.json. Všimněte si, že pokud jste povolili compilerOptions.esModuleInterop
, allowSyntheticDefaultImports
je ve výchozím nastavení povolená. Další informace najdete v příručce k možnostem kompilátoru TypeScriptu .
Klíčové koncepty
Měřítko: Vytvořte několik příjemců, přičemž každý z nich převezme vlastnictví čtení z několika oddílů služby Event Hubs.
Vyrovnávání zatížení: Aplikace, které podporují vyrovnávání zatížení, se skládají z jedné nebo několika instancí
EventHubConsumerClient
, jejichž konfigurace byla nakonfigurována tak, aby využívala události ze stejného centra událostí a skupiny příjemců a stejnéCheckpointStore
skupiny . Vyrovnávají zatížení mezi různými instancemi rozdělením oddílů, které se mají zpracovat mezi sebe.Vytváření kontrolních bodů: Jedná se o proces, kterým čtenáři označí nebo potvrdí svou pozici v sekvenci událostí oddílu. Za vytváření kontrolních bodů zodpovídá příjemce. Proces probíhá na bázi oddílů ve skupinách příjemců. Taková zodpovědnost znamená, že si každý čtenář oddílu v každé skupině příjemců musí udržovat přehled o své aktuální pozici v datovém proudu událostí a může informovat službu, když bude považovat datový proud za dokončený.
Pokud se čtenář z oddílu odpojí, začne při opětovném připojení číst od kontrolního bodu, který dříve zaslal poslední čtenář daného oddílu z této skupiny příjemců. Když se čtenář připojí, předá posun do centra událostí a určí umístění, ve kterém se má začít číst. Takto můžete vytváření kontrolních bodů použít jak k označování událostí jako „dokončených“, tak k zajištění ochrany pro případ, že nastane selhání u čtenářů spuštěných na různých strojích. Ke starším datům se je možné vrátit tak, že určíte nižší posun od tohoto kontrolního bodu. Díky tomuto mechanismu umožňuje vytváření kontrolních bodů nejen obnovu při selhání, ale i opakované přehrání datového proudu.
BlobCheckpointStore je třída, která implementuje klíčové metody vyžadované EventHubConsumerClient pro vyrovnávání zatížení a aktualizace kontrolních bodů.
Příklady
- Vytvoření checkpointStore pomocí Azure Blob Storage
- Události kontrolního bodu s využitím služby Azure Blob Storage
Vytvoření CheckpointStore
pomocí Azure Blob Storage
Pomocí následujícího fragmentu kódu vytvořte CheckpointStore
. Ke svému účtu úložiště budete muset zadat připojovací řetězec.
import { ContainerClient } from "@azure/storage-blob",
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob"
const containerClient = new ContainerClient("storage-connection-string", "container-name");
if (!containerClient.exists()) {
await containerClient.create(); // This can be skipped if the container already exists
}
const checkpointStore = new BlobCheckpointStore(containerClient);
Události kontrolního bodu s využitím služby Azure Blob Storage
Pokud chcete zkontrolovat události přijaté pomocí Azure Blob Storage, budete muset předat objekt, který je kompatibilní s rozhraním SubscriptionEventHandlers, spolu s kódem pro volání updateCheckpoint()
metody .
V tomto příkladu SubscriptionHandlers
implementuje SubscriptionEventHandlers a také zpracovává kontrolní body.
import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
import { EventHubConsumerClient } from "@azure/event-hubs";
const consumerGroup = "consumer-group-name";
const connectionString = "event-hub-connectionstring";
const containerClient = new ContainerClient("storage-connection-string", "container-name");
if (!(await containerClient.exists())) {
await containerClient.create(); // This can be skipped if the container already exists
}
const checkpointStore = new BlobCheckpointStore(containerClient);
class SubscriptionHandlers {
async processEvents(event, context) {
// custom logic for processing events goes here
// Checkpointing will allow your service to restart and pick
// up from where it left off.
//
// You'll want to balance how often you checkpoint with the
// performance of your underlying checkpoint store.
await context.updateCheckpoint(event);
}
async processError(err, context) {
// handle any errors that occur during the course of
// this subscription
console.log(`Errors in subscription: ${err}`);
}
}
const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, checkpointStore);
const subscription = consumerClient.subscribe(new SubscriptionHandlers());
// events will now flow into the handlers defined above
// to stop the subscription:
subscription.close();
Poradce při potížích
Povolení protokolů
Můžete nastavit proměnnou AZURE_LOG_LEVEL
prostředí na jednu z následujících hodnot, abyste povolili protokolování do stderr
:
- verbose
- Info
- upozornění
- error
Úroveň protokolu můžete také programově nastavit importem balíčku @azure/logger a zavoláním setLogLevel
funkce s jednou z hodnot na úrovni protokolu.
Při programovém nastavení úrovně protokolu nebo prostřednictvím AZURE_LOG_LEVEL
proměnné prostředí se vygenerují všechny protokoly zapsané pomocí úrovně protokolu, která je stejná nebo menší než ta, kterou zvolíte.
Pokud například nastavíte úroveň protokolu na info
, protokoly, které jsou zapsány pro úrovně warning
a error
jsou také generovány.
Tato sada SDK při určování úrovně, ke které se má protokolovat, se řídí pokyny sady Azure SDK pro TypeScript.
Případně můžete nastavit proměnnou prostředí pro DEBUG
získání protokolů při použití této knihovny.
To může být užitečné, pokud chcete také generovat protokoly ze závislostí rhea-promise
a rhea
také.
Poznámka: AZURE_LOG_LEVEL, pokud je nastavená, má přednost před laděním.
Při zadávání AZURE_LOG_LEVEL nebo volání setLogLevel nezadávejte žádné azure
knihovny prostřednictvím funkce DEBUG.
Pokud použijete tuto knihovnu, můžete nastavit následující proměnnou prostředí, abyste získali protokoly ladění.
- Získání pouze protokolů ladění na úrovni informací z objektu blob úložiště kontrolních bodů EventHubs
export DEBUG=azure:eventhubs-checkpointstore-blob:info
Protokolování do souboru
Povolte protokolování, jak je znázorněno výše, a pak následujícím způsobem spusťte testovací skript:
Příkazy protokolování z testovacího skriptu přejdou do
out.log
a příkazy protokolování ze sady SDK se přejdou dodebug.log
.node your-test-script.js > out.log 2>debug.log
Příkazy protokolování z testovacího skriptu a sady SDK přejdou do stejného souboru
out.log
přesměrováním stderru na stdout (&1) a pak přesměrují stdout do souboru:node your-test-script.js >out.log 2>&1
Příkazy protokolování z testovacího skriptu a sady SDK přejdou do stejného souboru
out.log
.node your-test-script.js &> out.log
Další kroky
Podrobný příklad najdete v adresáři samples .
Přispívání
Pokud chcete přispívat do této knihovny, přečtěte si příručku pro přispívání , kde najdete další informace o tom, jak sestavit a otestovat kód.
Azure SDK for JavaScript