Azure Event Hubs Checkpoint Store-bibliotheek voor Javascript met behulp van storage-blobs
Een op Azure Blob Storage gebaseerde oplossing voor het opslaan van controlepunten en om te helpen bij het verdelen van de taakverdeling bij gebruik EventHubConsumerClient
vanuit de bibliotheek @azure/event-hubs
Broncode | Pakket (npm) | API-referentiedocumentatie | Monsters
Aan de slag
Het pakket installeren
De blobbibliotheek van Azure Event Hubs Checkpoint Store installeren met npm
npm install @azure/eventhubs-checkpointstore-blob
Vereisten: u moet een Azure-abonnement, een Event Hubs-naamruimte hebben om dit pakket te kunnen gebruiken en een opslagaccount
Als u dit pakket in een Node.js-toepassing gebruikt, gebruikt u Node.js 8.x of hoger.
Typescript configureren
TypeScript-gebruikers moeten knooppunttypedefinities hebben geïnstalleerd:
npm install @types/node
U moet ook inschakelen compilerOptions.allowSyntheticDefaultImports
in uw tsconfig.json. Houd er rekening mee dat als u hebt ingeschakeld compilerOptions.esModuleInterop
, allowSyntheticDefaultImports
standaard is ingeschakeld. Zie het handboek voor compileropties van TypeScript voor meer informatie.
Belangrijkste concepten
Schaal: Maak meerdere consumenten, waarbij elke consument eigenaar wordt van het lezen van een paar Event Hubs-partities.
Taakverdeling: Toepassingen die ondersteuning bieden voor taakverdeling bestaan uit een of meer exemplaren
EventHubConsumerClient
waarvan is geconfigureerd om gebeurtenissen te gebruiken van dezelfde Event Hub en consumentengroep en dezelfdeCheckpointStore
. Ze verdelen de workload over verschillende exemplaren door de partities die moeten worden verwerkt onderling te verdelen.Controlepunten: Het is een proces waarmee lezers hun positie binnen een partitie-gebeurtenisreeks markeren of doorvoeren. Het plaatsen van controlepunten is de verantwoordelijkheid van de consumer en vindt plaats per partitie binnen een consumergroep. Deze verantwoordelijkheid houdt in dat elke partitielezer voor elke consumergroep de huidige positie in de gebeurtenisstroom moet bijhouden en de service kan informeren wanneer de gegevensstroom is voltooid.
Als een lezer van een partitie is losgekoppeld en er vervolgens weer verbinding wordt gemaakt, begint het lezen bij het controlepunt dat eerder is verzonden door de laatste lezer van de betreffende partitie in de consumergroep. Wanneer de lezer verbinding maakt, wordt de offset doorgegeven aan de Event Hub om de locatie op te geven waar moet worden gelezen. Op deze manier kunt u het plaatsen van controlepunten gebruiken om gebeurtenissen te markeren als 'voltooid' door downstream-toepassingen. Bovendien beschikt u met controlepunten over tolerantie bij een failover tussen lezers die op verschillende apparaten worden uitgevoerd. Het is mogelijk om terug te keren naar de oudere gegevens door een lagere offset van dit controlepuntproces op te geven. Via dit mechanisme zorgt het plaatsen van controlepunten voor failover-tolerantie en voor herhaling van gebeurtenisstromen.
Een BlobCheckpointStore is een klasse die belangrijke methoden implementeert die nodig zijn voor de EventHubConsumerClient om taken te verdelen en controlepunten bij te werken.
Voorbeelden
- Een CheckpointStore maken met behulp van Azure Blob Storage
- Controlepunt gebeurtenissen met Behulp van Azure Blob Storage
CheckpointStore
Een maken met behulp van Azure Blob Storage
Gebruik het onderstaande codefragment om een CheckpointStore
te maken. U moet de verbindingsreeks opgeven voor uw opslagaccount.
import { ContainerClient } from "@azure/storage-blob",
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob"
const containerClient = new ContainerClient("storage-connection-string", "container-name");
if (!containerClient.exists()) {
await containerClient.create(); // This can be skipped if the container already exists
}
const checkpointStore = new BlobCheckpointStore(containerClient);
Controlepunt gebeurtenissen met Behulp van Azure Blob Storage
Als u controlepuntgebeurtenissen wilt ontvangen met behulp van Azure Blob Storage, moet u een object doorgeven dat compatibel is met de interface SubscriptionEventHandlers, samen met code om de updateCheckpoint()
methode aan te roepen.
In dit voorbeeld SubscriptionHandlers
implementeert u SubscriptionEventHandlers en verwerkt u ook controlepunten.
import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
import { EventHubConsumerClient } from "@azure/event-hubs";
const consumerGroup = "consumer-group-name";
const connectionString = "event-hub-connectionstring";
const containerClient = new ContainerClient("storage-connection-string", "container-name");
if (!(await containerClient.exists())) {
await containerClient.create(); // This can be skipped if the container already exists
}
const checkpointStore = new BlobCheckpointStore(containerClient);
class SubscriptionHandlers {
async processEvents(event, context) {
// custom logic for processing events goes here
// Checkpointing will allow your service to restart and pick
// up from where it left off.
//
// You'll want to balance how often you checkpoint with the
// performance of your underlying checkpoint store.
await context.updateCheckpoint(event);
}
async processError(err, context) {
// handle any errors that occur during the course of
// this subscription
console.log(`Errors in subscription: ${err}`);
}
}
const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, checkpointStore);
const subscription = consumerClient.subscribe(new SubscriptionHandlers());
// events will now flow into the handlers defined above
// to stop the subscription:
subscription.close();
Problemen oplossen
Logboeken inschakelen
U kunt de omgevingsvariabele AZURE_LOG_LEVEL
instellen op een van de volgende waarden om logboekregistratie in te schakelen voor stderr
:
- verbose
- Info
- waarschuwing
- fout
U kunt het logboekniveau ook programmatisch instellen door het pakket @azure/logger te importeren en de setLogLevel
functie aan te roepen met een van de waarden op logboekniveau.
Bij het instellen van een logboekniveau via een programma of via de AZURE_LOG_LEVEL
omgevingsvariabele, worden logboeken die zijn geschreven met een logboekniveau dat gelijk is aan of kleiner is dan het niveau dat u kiest, verzonden.
Wanneer u bijvoorbeeld het logboekniveau instelt op info
, worden de logboeken die zijn geschreven voor niveaus warning
en error
ook verzonden.
Deze SDK volgt de Azure SDK voor TypeScript-richtlijnen bij het bepalen bij welk niveau moet worden aangemeld.
U kunt ook de DEBUG
omgevingsvariabele instellen om logboeken op te halen wanneer u deze bibliotheek gebruikt.
Dit kan handig zijn als u ook logboeken van de afhankelijkheden rhea-promise
rhea
en wilt verzenden.
Opmerking: AZURE_LOG_LEVEL, indien ingesteld, heeft voorrang op FOUTOPSPORING.
Geef geen azure
bibliotheken op via DEBUG wanneer u ook AZURE_LOG_LEVEL opgeeft of setLogLevel aanroept.
U kunt de volgende omgevingsvariabele instellen om de logboeken voor foutopsporing op te halen wanneer u deze bibliotheek gebruikt.
- Alleen foutopsporingslogboeken op informatieniveau ophalen uit de Eventhubs Checkpointstore-blob.
export DEBUG=azure:eventhubs-checkpointstore-blob:info
Logboekregistratie bij een bestand
Schakel logboekregistratie in zoals hierboven wordt weergegeven en voer het testscript als volgt uit:
Logboekinstructies van uw testscript gaan naar
out.log
en logboekregistratieinstructies van de SDK gaan naardebug.log
.node your-test-script.js > out.log 2>debug.log
Logboekinstructies van uw testscript en de SDK gaan naar hetzelfde bestand
out.log
door stderr om te leiden naar stdout (&1) en vervolgens stdout omleiden naar een bestand:node your-test-script.js >out.log 2>&1
Logboekregistratie-instructies van uw testscript en de SDK gaan naar hetzelfde bestand
out.log
.node your-test-script.js &> out.log
Volgende stappen
Bekijk de map met voorbeelden voor een gedetailleerd voorbeeld.
Bijdragen
Als u een bijdrage wilt leveren aan deze bibliotheek, leest u de handleiding voor bijdragen voor meer informatie over het bouwen en testen van de code.
Azure SDK for JavaScript