Dela via


Azure Event Hubs Checkpoint Store-bibliotek för Javascript med lagringsblobar

En Azure Blob Storage-baserad lösning för att lagra kontrollpunkter och för att underlätta belastningsutjämning när du använder EventHubConsumerClient från biblioteket @azure/event-hubs

Källkod | Paket (npm) | API-referensdokumentation | Prover

Komma igång

Installera paketet

Installera blobbiblioteket Azure Event Hubs Checkpoint Store med npm

npm install @azure/eventhubs-checkpointstore-blob

Krav: Du måste ha en Azure-prenumeration, ett Event Hubs-namnområde för att kunna använda det här paketet och ett lagringskonto

Om du använder det här paketet i ett Node.js program använder du Node.js 8.x eller senare.

Konfigurera Typescript

TypeScript-användare måste ha definitioner av nodtyp installerade:

npm install @types/node

Du måste också aktivera compilerOptions.allowSyntheticDefaultImports i din tsconfig.json. Observera att om du har aktiverat compilerOptions.esModuleInteropallowSyntheticDefaultImports är aktiverad som standard. Mer information finns i TypeScripts handbok för kompilatoralternativ .

Viktiga begrepp

  • Skala: Skapa flera konsumenter, där varje konsument äger läsningen från några Event Hubs-partitioner.

  • Belastningsutjämning: Program som stöder belastningsutjämning består av en eller flera instanser som har konfigurerats EventHubConsumerClient för att använda händelser från samma händelsehubb och konsumentgrupp och samma CheckpointStore. De balanserar arbetsbelastningen mellan olika instanser genom att distribuera de partitioner som ska bearbetas sinsemellan.

  • Kontrollpunkter: Det är en process där läsarna markerar eller checkar in sin position i en partitionshändelsesekvens. Att skapa kontrollpunkter är konsumentens ansvar och görs för varje partition i en konsumentgrupp. Det här ansvaret innebär att varje läsare i partitionen måste hålla reda på sin nuvarande position i händelseströmmen för varje konsumentgrupp. Läsaren kan sedan informera tjänsten när de anser att dataströmmen är klar.

    Om en läsare kopplar från en partition och den sedan återansluts kan han börja läsa vid den kontrollpunkt som tidigare skickades in av den senaste läsaren i den aktuella partitionen inom just den konsumentgruppen. När läsaren ansluter skickas förskjutningen till händelsehubben för att ange den plats där läsningen ska börja. På så sätt kan du använda kontrollpunkter både till att markera händelser som ”klara” i underordnade program och som skydd i händelse av en redundansväxling mellan läsare som körs på olika datorer. Du kan återgå till äldre data genom att ange en lägre offset i den här kontrollpunktsprocessen. Den här mekanismen möjliggör både återhämtning vid redundansväxlingar och återuppspelning av händelseströmmar.

    En BlobCheckpointStore är en klass som implementerar viktiga metoder som krävs av EventHubConsumerClient för att balansera belastnings- och uppdateringskontroller.

Exempel

Skapa en CheckpointStore med Azure Blob Storage

Använd kodfragmentet nedan för att skapa ett CheckpointStore. Du måste ange anslutningssträng till ditt lagringskonto.

import { ContainerClient } from "@azure/storage-blob",
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob"

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!containerClient.exists()) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore =  new BlobCheckpointStore(containerClient);

Kontrollpunktshändelser med Azure Blob Storage

Om du vill ta emot kontrollpunktshändelser med Azure Blob Storage måste du skicka ett objekt som är kompatibelt med SubscriptionEventHandlers-gränssnittet tillsammans med kod för att anropa updateCheckpoint() metoden.

I det här exemplet SubscriptionHandlers implementerar SubscriptionEventHandlers och hanterar även kontrollpunkter.

import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
import { EventHubConsumerClient } from "@azure/event-hubs";

const consumerGroup = "consumer-group-name";
const connectionString = "event-hub-connectionstring";

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!(await containerClient.exists())) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore = new BlobCheckpointStore(containerClient);

class SubscriptionHandlers {
  async processEvents(event, context) {
    // custom logic for processing events goes here

    // Checkpointing will allow your service to restart and pick
    // up from where it left off.
    //
    // You'll want to balance how often you checkpoint with the
    // performance of your underlying checkpoint store.
    await context.updateCheckpoint(event);
  }

  async processError(err, context) {
    // handle any errors that occur during the course of
    // this subscription
    console.log(`Errors in subscription: ${err}`);
  }
}

const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, checkpointStore);

const subscription = consumerClient.subscribe(new SubscriptionHandlers());

// events will now flow into the handlers defined above
// to stop the subscription:
subscription.close();

Felsökning

Aktivera loggar

Du kan ange AZURE_LOG_LEVEL miljövariabeln till något av följande värden för att aktivera loggning till stderr:

  • utförlig
  • information
  • varning
  • fel

Du kan också ange loggnivån programmässigt genom att importera paketet @azure/logger och anropa setLogLevel funktionen med något av loggnivåvärdena.

När du anger en loggnivå antingen programmässigt eller via AZURE_LOG_LEVEL miljövariabeln genereras alla loggar som skrivs med en loggnivå som är lika med eller mindre än den du väljer. När du till exempel anger loggnivån till infogenereras loggarna som skrivs för nivåer warning och error som också genereras. Denna SDK följer Riktlinjerna för Azure SDK för TypeScript när du fastställer vilken nivå du ska logga in på.

Du kan också ange DEBUG miljövariabeln för att hämta loggar när du använder det här biblioteket. Detta kan vara användbart om du också vill generera loggar från beroendena rhea-promise och rhea även.

Observera: AZURE_LOG_LEVEL, om den anges, har företräde framför FELSÖKNING. Ange azure inga bibliotek via FELSÖKNING när du också anger AZURE_LOG_LEVEL eller anropar setLogLevel.

Du kan ange följande miljövariabel för att hämta felsökningsloggarna när du använder det här biblioteket.

  • Hämta endast felsökningsloggar på informationsnivå från Eventhubs Checkpointstore Blob.
export DEBUG=azure:eventhubs-checkpointstore-blob:info

Logga in på en fil

  • Aktivera loggning enligt ovan och kör sedan testskriptet på följande sätt:

    • Loggningsuttryck från testskriptet går till out.log och loggningsuttryck från sdk:en går till debug.log.

      node your-test-script.js > out.log 2>debug.log
      
    • Loggningsinstruktioner från testskriptet och sdk:en går till samma fil out.log genom att omdirigera stderr till stdout (&1) och sedan omdirigera stdout till en fil:

      node your-test-script.js >out.log 2>&1
      
    • Loggningsuttryck från testskriptet och sdk:en går till samma fil out.log.

      node your-test-script.js &> out.log
      

Nästa steg

Ta en titt på exempelkatalogen för ett detaljerat exempel.

Bidra

Om du vill bidra till det här biblioteket kan du läsa bidragsguiden för att lära dig mer om hur du skapar och testar koden.

Visningar