Azure Event Hubs biblioteka magazynu checkpoint dla języka JavaScript przy użyciu obiektów blob usługi Storage
Rozwiązanie oparte na usłudze Azure Blob Storage do przechowywania punktów kontrolnych i pomoc w równoważeniu obciążenia podczas korzystania EventHubConsumerClient
z biblioteki @azure/event-hubs
Kod | źródłowyPakiet (npm) | Dokumentacja interfejsu | APIPróbki
Wprowadzenie
Instalowanie pakietu
Instalowanie biblioteki obiektów blob magazynu Azure Event Hubs Checkpoint Przy użyciu narzędzia npm
npm install @azure/eventhubs-checkpointstore-blob
Wymagania wstępne: musisz mieć subskrypcję platformy Azure, przestrzeń nazw usługi Event Hubs do korzystania z tego pakietu i konto magazynu
Jeśli używasz tego pakietu w aplikacji Node.js, użyj Node.js 8.x lub nowszej.
Konfigurowanie języka TypeScript
Użytkownicy języka TypeScript muszą mieć zainstalowane definicje typu węzła:
npm install @types/node
Należy również włączyć compilerOptions.allowSyntheticDefaultImports
plik tsconfig.json. Pamiętaj, że jeśli włączono compilerOptions.esModuleInterop
usługę , allowSyntheticDefaultImports
jest domyślnie włączona. Aby uzyskać więcej informacji, zobacz podręcznik dotyczący opcji kompilatora języka TypeScript .
Kluczowe pojęcia
Skali: Utwórz wielu użytkowników, a każdy użytkownik przejmuje własność odczytu z kilku partycji usługi Event Hubs.
Równoważenie obciążenia: Aplikacje obsługujące równoważenie obciążenia składają się z co najmniej jednego wystąpienia
EventHubConsumerClient
skonfigurowanego do korzystania z zdarzeń z tego samego centrum zdarzeń i grupy odbiorców oraz tego samegoCheckpointStore
. Równoważą obciążenie między różnymi wystąpieniami, dystrybuując partycje do przetworzenia między sobą.Tworzenie punktów kontrolnych: Jest to proces, przez który czytelnicy oznaczą lub zatwierdzą swoją pozycję w sekwencji zdarzeń partycji. Odpowiedzialność za tworzenie punktów kontrolnych spoczywa na odbiorcy i odbywa się dla każdej partycji w ramach grupy odbiorców. Ta odpowiedzialność oznacza, że dla każdej grupy odbiorców każdy czytnik partycji musi śledzić swoją bieżącą pozycję w strumieniu zdarzeń i może poinformować usługi, gdy uzna, że strumień danych jest pełny.
Jeśli czytnik rozłączy się od partycji, po swoim ponownym połączeniu rozpoczyna odczyt punktu kontrolnego, który został wcześniej przesłany przez ostatni czytnik tej partycji w danej grupie odbiorców. Gdy czytnik łączy się, przekazuje przesunięcie do centrum zdarzeń, aby określić lokalizację, w której chcesz rozpocząć odczytywanie. W ten sposób można użyć procesu tworzenia punktów kontrolnych zarówno do oznaczenia zdarzeń jako „ukończone” przez aplikacje podrzędne, jak i zapewnienia odporności zdarzenia na pracę w trybie failover między czytnikami działającymi na różnych komputerach. Istnieje możliwość powrotu do starszych danych przez określenie niższego przesunięcia od tego procesu tworzenia punktów kontrolnych. Dzięki temu mechanizmowi tworzenie punktów kontrolnych zapewnia zarówno odporność na pracę w trybie failover, jak i powtarzanie strumienia zdarzeń.
Obiekt BlobCheckpointStore to klasa, która implementuje kluczowe metody wymagane przez klasę EventHubConsumerClient w celu równoważenia obciążenia i aktualizowania punktów kontrolnych.
Przykłady
- Tworzenie magazynu CheckpointStore przy użyciu Azure Blob Storage
- Zdarzenia punktu kontrolnego korzystające z usługi Azure Blob Storage
CheckpointStore
Tworzenie przy użyciu Azure Blob Storage
Użyj poniższego fragmentu kodu, aby utworzyć element CheckpointStore
. Musisz podać parametry połączenia na koncie magazynu.
import { ContainerClient } from "@azure/storage-blob",
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob"
const containerClient = new ContainerClient("storage-connection-string", "container-name");
if (!containerClient.exists()) {
await containerClient.create(); // This can be skipped if the container already exists
}
const checkpointStore = new BlobCheckpointStore(containerClient);
Zdarzenia punktu kontrolnego korzystające z usługi Azure Blob Storage
Aby zdarzenia punktu kontrolnego odebrane przy użyciu Azure Blob Storage, należy przekazać obiekt zgodny z interfejsem SubscriptionEventHandlers wraz z kodem w celu wywołania updateCheckpoint()
metody.
W tym przykładzie SubscriptionHandlers
implementuje moduł SubscriptionEventHandlers , a także obsługuje tworzenie punktów kontrolnych.
import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
import { EventHubConsumerClient } from "@azure/event-hubs";
const consumerGroup = "consumer-group-name";
const connectionString = "event-hub-connectionstring";
const containerClient = new ContainerClient("storage-connection-string", "container-name");
if (!(await containerClient.exists())) {
await containerClient.create(); // This can be skipped if the container already exists
}
const checkpointStore = new BlobCheckpointStore(containerClient);
class SubscriptionHandlers {
async processEvents(event, context) {
// custom logic for processing events goes here
// Checkpointing will allow your service to restart and pick
// up from where it left off.
//
// You'll want to balance how often you checkpoint with the
// performance of your underlying checkpoint store.
await context.updateCheckpoint(event);
}
async processError(err, context) {
// handle any errors that occur during the course of
// this subscription
console.log(`Errors in subscription: ${err}`);
}
}
const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, checkpointStore);
const subscription = consumerClient.subscribe(new SubscriptionHandlers());
// events will now flow into the handlers defined above
// to stop the subscription:
subscription.close();
Rozwiązywanie problemów
Włączanie dzienników
Zmienną AZURE_LOG_LEVEL
środowiskową można ustawić na jedną z następujących wartości, aby włączyć rejestrowanie na stderr
:
- tryb pełny
- informacje o
- warning
- error
Poziom dziennika można również ustawić programowo, importując pakiet @azure/rejestratora i wywołując setLogLevel
funkcję z jedną z wartości na poziomie dziennika.
Podczas ustawiania poziomu dziennika programowo lub za pośrednictwem zmiennej AZURE_LOG_LEVEL
środowiskowej wszystkie dzienniki zapisywane przy użyciu poziomu dziennika równego lub mniejszego niż wybrany zostanie emitowany.
Na przykład po ustawieniu poziomu dziennika na info
wartość , dzienniki, które są zapisywane dla poziomów warning
i error
są również emitowane.
Ten zestaw SDK jest zgodny z wytycznymi dotyczącymi języka TypeScript dla zestawu Azure SDK podczas określania poziomu, do którego należy się zalogować.
Możesz też ustawić zmienną środowiskową, DEBUG
aby pobierać dzienniki podczas korzystania z tej biblioteki.
Może to być przydatne, jeśli chcesz również emitować dzienniki z zależności rhea-promise
i rhea
jak również.
Uwaga: AZURE_LOG_LEVEL, jeśli ustawiono, ma pierwszeństwo przed debugowaniem.
Nie należy określać żadnych azure
bibliotek za pomocą debugowania podczas określania AZURE_LOG_LEVEL ani wywoływania polecenia setLogLevel.
W przypadku korzystania z tej biblioteki można ustawić następującą zmienną środowiskową, aby pobrać dzienniki debugowania.
- Pobieranie tylko dzienników debugowania na poziomie informacji z obiektu blob magazynu punktów kontrolnych usługi EventHubs.
export DEBUG=azure:eventhubs-checkpointstore-blob:info
Rejestrowanie w pliku
Włącz rejestrowanie, jak pokazano powyżej, a następnie uruchom skrypt testowy w następujący sposób:
Instrukcje rejestrowania ze skryptu testowego przejdź do
out.log
instrukcji rejestrowania z zestawu SDK i przejdź dodebug.log
.node your-test-script.js > out.log 2>debug.log
Instrukcje rejestrowania ze skryptu testowego i zestawu SDK przechodzą do tego samego pliku
out.log
, przekierowując stderr do stdout (&1), a następnie przekierowując stdout do pliku:node your-test-script.js >out.log 2>&1
Instrukcje rejestrowania ze skryptu testowego i zestawu SDK przejdź do tego samego pliku
out.log
.node your-test-script.js &> out.log
Następne kroki
Zapoznaj się z katalogiem przykładów , aby uzyskać szczegółowy przykład.
Współtworzenie
Jeśli chcesz współtworzyć tę bibliotekę, przeczytaj przewodnik współtworzenia , aby dowiedzieć się więcej na temat sposobu kompilowania i testowania kodu.
Azure SDK for JavaScript