Udostępnij za pośrednictwem


Azure Event Hubs biblioteka magazynu checkpoint dla języka JavaScript przy użyciu obiektów blob usługi Storage

Rozwiązanie oparte na usłudze Azure Blob Storage do przechowywania punktów kontrolnych i pomoc w równoważeniu obciążenia podczas korzystania EventHubConsumerClient z biblioteki @azure/event-hubs

Kod | źródłowyPakiet (npm) | Dokumentacja interfejsu | APIPróbki

Wprowadzenie

Instalowanie pakietu

Instalowanie biblioteki obiektów blob magazynu Azure Event Hubs Checkpoint Przy użyciu narzędzia npm

npm install @azure/eventhubs-checkpointstore-blob

Wymagania wstępne: musisz mieć subskrypcję platformy Azure, przestrzeń nazw usługi Event Hubs do korzystania z tego pakietu i konto magazynu

Jeśli używasz tego pakietu w aplikacji Node.js, użyj Node.js 8.x lub nowszej.

Konfigurowanie języka TypeScript

Użytkownicy języka TypeScript muszą mieć zainstalowane definicje typu węzła:

npm install @types/node

Należy również włączyć compilerOptions.allowSyntheticDefaultImports plik tsconfig.json. Pamiętaj, że jeśli włączono compilerOptions.esModuleInteropusługę , allowSyntheticDefaultImports jest domyślnie włączona. Aby uzyskać więcej informacji, zobacz podręcznik dotyczący opcji kompilatora języka TypeScript .

Kluczowe pojęcia

  • Skali: Utwórz wielu użytkowników, a każdy użytkownik przejmuje własność odczytu z kilku partycji usługi Event Hubs.

  • Równoważenie obciążenia: Aplikacje obsługujące równoważenie obciążenia składają się z co najmniej jednego wystąpienia EventHubConsumerClient skonfigurowanego do korzystania z zdarzeń z tego samego centrum zdarzeń i grupy odbiorców oraz tego samego CheckpointStore. Równoważą obciążenie między różnymi wystąpieniami, dystrybuując partycje do przetworzenia między sobą.

  • Tworzenie punktów kontrolnych: Jest to proces, przez który czytelnicy oznaczą lub zatwierdzą swoją pozycję w sekwencji zdarzeń partycji. Odpowiedzialność za tworzenie punktów kontrolnych spoczywa na odbiorcy i odbywa się dla każdej partycji w ramach grupy odbiorców. Ta odpowiedzialność oznacza, że dla każdej grupy odbiorców każdy czytnik partycji musi śledzić swoją bieżącą pozycję w strumieniu zdarzeń i może poinformować usługi, gdy uzna, że strumień danych jest pełny.

    Jeśli czytnik rozłączy się od partycji, po swoim ponownym połączeniu rozpoczyna odczyt punktu kontrolnego, który został wcześniej przesłany przez ostatni czytnik tej partycji w danej grupie odbiorców. Gdy czytnik łączy się, przekazuje przesunięcie do centrum zdarzeń, aby określić lokalizację, w której chcesz rozpocząć odczytywanie. W ten sposób można użyć procesu tworzenia punktów kontrolnych zarówno do oznaczenia zdarzeń jako „ukończone” przez aplikacje podrzędne, jak i zapewnienia odporności zdarzenia na pracę w trybie failover między czytnikami działającymi na różnych komputerach. Istnieje możliwość powrotu do starszych danych przez określenie niższego przesunięcia od tego procesu tworzenia punktów kontrolnych. Dzięki temu mechanizmowi tworzenie punktów kontrolnych zapewnia zarówno odporność na pracę w trybie failover, jak i powtarzanie strumienia zdarzeń.

    Obiekt BlobCheckpointStore to klasa, która implementuje kluczowe metody wymagane przez klasę EventHubConsumerClient w celu równoważenia obciążenia i aktualizowania punktów kontrolnych.

Przykłady

CheckpointStore Tworzenie przy użyciu Azure Blob Storage

Użyj poniższego fragmentu kodu, aby utworzyć element CheckpointStore. Musisz podać parametry połączenia na koncie magazynu.

import { ContainerClient } from "@azure/storage-blob",
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob"

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!containerClient.exists()) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore =  new BlobCheckpointStore(containerClient);

Zdarzenia punktu kontrolnego korzystające z usługi Azure Blob Storage

Aby zdarzenia punktu kontrolnego odebrane przy użyciu Azure Blob Storage, należy przekazać obiekt zgodny z interfejsem SubscriptionEventHandlers wraz z kodem w celu wywołania updateCheckpoint() metody.

W tym przykładzie SubscriptionHandlers implementuje moduł SubscriptionEventHandlers , a także obsługuje tworzenie punktów kontrolnych.

import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
import { EventHubConsumerClient } from "@azure/event-hubs";

const consumerGroup = "consumer-group-name";
const connectionString = "event-hub-connectionstring";

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!(await containerClient.exists())) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore = new BlobCheckpointStore(containerClient);

class SubscriptionHandlers {
  async processEvents(event, context) {
    // custom logic for processing events goes here

    // Checkpointing will allow your service to restart and pick
    // up from where it left off.
    //
    // You'll want to balance how often you checkpoint with the
    // performance of your underlying checkpoint store.
    await context.updateCheckpoint(event);
  }

  async processError(err, context) {
    // handle any errors that occur during the course of
    // this subscription
    console.log(`Errors in subscription: ${err}`);
  }
}

const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, checkpointStore);

const subscription = consumerClient.subscribe(new SubscriptionHandlers());

// events will now flow into the handlers defined above
// to stop the subscription:
subscription.close();

Rozwiązywanie problemów

Włączanie dzienników

Zmienną AZURE_LOG_LEVEL środowiskową można ustawić na jedną z następujących wartości, aby włączyć rejestrowanie na stderr:

  • tryb pełny
  • informacje o
  • warning
  • error

Poziom dziennika można również ustawić programowo, importując pakiet @azure/rejestratora i wywołując setLogLevel funkcję z jedną z wartości na poziomie dziennika.

Podczas ustawiania poziomu dziennika programowo lub za pośrednictwem zmiennej AZURE_LOG_LEVEL środowiskowej wszystkie dzienniki zapisywane przy użyciu poziomu dziennika równego lub mniejszego niż wybrany zostanie emitowany. Na przykład po ustawieniu poziomu dziennika na infowartość , dzienniki, które są zapisywane dla poziomów warning i error są również emitowane. Ten zestaw SDK jest zgodny z wytycznymi dotyczącymi języka TypeScript dla zestawu Azure SDK podczas określania poziomu, do którego należy się zalogować.

Możesz też ustawić zmienną środowiskową, DEBUG aby pobierać dzienniki podczas korzystania z tej biblioteki. Może to być przydatne, jeśli chcesz również emitować dzienniki z zależności rhea-promise i rhea jak również.

Uwaga: AZURE_LOG_LEVEL, jeśli ustawiono, ma pierwszeństwo przed debugowaniem. Nie należy określać żadnych azure bibliotek za pomocą debugowania podczas określania AZURE_LOG_LEVEL ani wywoływania polecenia setLogLevel.

W przypadku korzystania z tej biblioteki można ustawić następującą zmienną środowiskową, aby pobrać dzienniki debugowania.

  • Pobieranie tylko dzienników debugowania na poziomie informacji z obiektu blob magazynu punktów kontrolnych usługi EventHubs.
export DEBUG=azure:eventhubs-checkpointstore-blob:info

Rejestrowanie w pliku

  • Włącz rejestrowanie, jak pokazano powyżej, a następnie uruchom skrypt testowy w następujący sposób:

    • Instrukcje rejestrowania ze skryptu testowego przejdź do out.log instrukcji rejestrowania z zestawu SDK i przejdź do debug.log.

      node your-test-script.js > out.log 2>debug.log
      
    • Instrukcje rejestrowania ze skryptu testowego i zestawu SDK przechodzą do tego samego pliku out.log , przekierowując stderr do stdout (&1), a następnie przekierowując stdout do pliku:

      node your-test-script.js >out.log 2>&1
      
    • Instrukcje rejestrowania ze skryptu testowego i zestawu SDK przejdź do tego samego pliku out.log.

      node your-test-script.js &> out.log
      

Następne kroki

Zapoznaj się z katalogiem przykładów , aby uzyskać szczegółowy przykład.

Współtworzenie

Jeśli chcesz współtworzyć tę bibliotekę, przeczytaj przewodnik współtworzenia , aby dowiedzieć się więcej na temat sposobu kompilowania i testowania kodu.

Wrażenia