Azure Event Hubs Checkpoint Store-bibliotek för Javascript med lagringsblobar
En Azure Blob Storage-baserad lösning för att lagra kontrollpunkter och för att underlätta belastningsutjämning när du använder EventHubConsumerClient
från biblioteket @azure/event-hubs
Källkod | Paket (npm) | API-referensdokumentation | Prover
Komma igång
Installera paketet
Installera blobbiblioteket Azure Event Hubs Checkpoint Store med npm
npm install @azure/eventhubs-checkpointstore-blob
Krav: Du måste ha en Azure-prenumeration, ett Event Hubs-namnområde för att kunna använda det här paketet och ett lagringskonto
Om du använder det här paketet i ett Node.js program använder du Node.js 8.x eller senare.
Konfigurera Typescript
TypeScript-användare måste ha definitioner av nodtyp installerade:
npm install @types/node
Du måste också aktivera compilerOptions.allowSyntheticDefaultImports
i din tsconfig.json. Observera att om du har aktiverat compilerOptions.esModuleInterop
allowSyntheticDefaultImports
är aktiverad som standard. Mer information finns i TypeScripts handbok för kompilatoralternativ .
Viktiga begrepp
Skala: Skapa flera konsumenter, där varje konsument äger läsningen från några Event Hubs-partitioner.
Belastningsutjämning: Program som stöder belastningsutjämning består av en eller flera instanser som har konfigurerats
EventHubConsumerClient
för att använda händelser från samma händelsehubb och konsumentgrupp och sammaCheckpointStore
. De balanserar arbetsbelastningen mellan olika instanser genom att distribuera de partitioner som ska bearbetas sinsemellan.Kontrollpunkter: Det är en process där läsarna markerar eller checkar in sin position i en partitionshändelsesekvens. Att skapa kontrollpunkter är konsumentens ansvar och görs för varje partition i en konsumentgrupp. Det här ansvaret innebär att varje läsare i partitionen måste hålla reda på sin nuvarande position i händelseströmmen för varje konsumentgrupp. Läsaren kan sedan informera tjänsten när de anser att dataströmmen är klar.
Om en läsare kopplar från en partition och den sedan återansluts kan han börja läsa vid den kontrollpunkt som tidigare skickades in av den senaste läsaren i den aktuella partitionen inom just den konsumentgruppen. När läsaren ansluter skickas förskjutningen till händelsehubben för att ange den plats där läsningen ska börja. På så sätt kan du använda kontrollpunkter både till att markera händelser som ”klara” i underordnade program och som skydd i händelse av en redundansväxling mellan läsare som körs på olika datorer. Du kan återgå till äldre data genom att ange en lägre offset i den här kontrollpunktsprocessen. Den här mekanismen möjliggör både återhämtning vid redundansväxlingar och återuppspelning av händelseströmmar.
En BlobCheckpointStore är en klass som implementerar viktiga metoder som krävs av EventHubConsumerClient för att balansera belastnings- och uppdateringskontroller.
Exempel
Skapa en CheckpointStore
med Azure Blob Storage
Använd kodfragmentet nedan för att skapa ett CheckpointStore
. Du måste ange anslutningssträng till ditt lagringskonto.
import { ContainerClient } from "@azure/storage-blob",
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob"
const containerClient = new ContainerClient("storage-connection-string", "container-name");
if (!containerClient.exists()) {
await containerClient.create(); // This can be skipped if the container already exists
}
const checkpointStore = new BlobCheckpointStore(containerClient);
Kontrollpunktshändelser med Azure Blob Storage
Om du vill ta emot kontrollpunktshändelser med Azure Blob Storage måste du skicka ett objekt som är kompatibelt med SubscriptionEventHandlers-gränssnittet tillsammans med kod för att anropa updateCheckpoint()
metoden.
I det här exemplet SubscriptionHandlers
implementerar SubscriptionEventHandlers och hanterar även kontrollpunkter.
import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
import { EventHubConsumerClient } from "@azure/event-hubs";
const consumerGroup = "consumer-group-name";
const connectionString = "event-hub-connectionstring";
const containerClient = new ContainerClient("storage-connection-string", "container-name");
if (!(await containerClient.exists())) {
await containerClient.create(); // This can be skipped if the container already exists
}
const checkpointStore = new BlobCheckpointStore(containerClient);
class SubscriptionHandlers {
async processEvents(event, context) {
// custom logic for processing events goes here
// Checkpointing will allow your service to restart and pick
// up from where it left off.
//
// You'll want to balance how often you checkpoint with the
// performance of your underlying checkpoint store.
await context.updateCheckpoint(event);
}
async processError(err, context) {
// handle any errors that occur during the course of
// this subscription
console.log(`Errors in subscription: ${err}`);
}
}
const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, checkpointStore);
const subscription = consumerClient.subscribe(new SubscriptionHandlers());
// events will now flow into the handlers defined above
// to stop the subscription:
subscription.close();
Felsökning
Aktivera loggar
Du kan ange AZURE_LOG_LEVEL
miljövariabeln till något av följande värden för att aktivera loggning till stderr
:
- utförlig
- information
- varning
- fel
Du kan också ange loggnivån programmässigt genom att importera paketet @azure/logger och anropa setLogLevel
funktionen med något av loggnivåvärdena.
När du anger en loggnivå antingen programmässigt eller via AZURE_LOG_LEVEL
miljövariabeln genereras alla loggar som skrivs med en loggnivå som är lika med eller mindre än den du väljer.
När du till exempel anger loggnivån till info
genereras loggarna som skrivs för nivåer warning
och error
som också genereras.
Denna SDK följer Riktlinjerna för Azure SDK för TypeScript när du fastställer vilken nivå du ska logga in på.
Du kan också ange DEBUG
miljövariabeln för att hämta loggar när du använder det här biblioteket.
Detta kan vara användbart om du också vill generera loggar från beroendena rhea-promise
och rhea
även.
Observera: AZURE_LOG_LEVEL, om den anges, har företräde framför FELSÖKNING.
Ange azure
inga bibliotek via FELSÖKNING när du också anger AZURE_LOG_LEVEL eller anropar setLogLevel.
Du kan ange följande miljövariabel för att hämta felsökningsloggarna när du använder det här biblioteket.
- Hämta endast felsökningsloggar på informationsnivå från Eventhubs Checkpointstore Blob.
export DEBUG=azure:eventhubs-checkpointstore-blob:info
Logga in på en fil
Aktivera loggning enligt ovan och kör sedan testskriptet på följande sätt:
Loggningsuttryck från testskriptet går till
out.log
och loggningsuttryck från sdk:en går tilldebug.log
.node your-test-script.js > out.log 2>debug.log
Loggningsinstruktioner från testskriptet och sdk:en går till samma fil
out.log
genom att omdirigera stderr till stdout (&1) och sedan omdirigera stdout till en fil:node your-test-script.js >out.log 2>&1
Loggningsuttryck från testskriptet och sdk:en går till samma fil
out.log
.node your-test-script.js &> out.log
Nästa steg
Ta en titt på exempelkatalogen för ett detaljerat exempel.
Bidra
Om du vill bidra till det här biblioteket kan du läsa bidragsguiden för att lära dig mer om hur du skapar och testar koden.
Azure SDK for JavaScript