Udostępnij za pośrednictwem


Optymalizacja wydajności klastrów Apache Kafka HDInsight

Ten artykuł zawiera sugestie dotyczące optymalizacji wydajności obciążeń platformy Apache Kafka w usłudze HDInsight. Koncentruje się on na dostosowaniu konfiguracji producenta, brokera i konsumenta. Czasami trzeba również dostosować ustawienia systemu operacyjnego, aby dostosować wydajność z dużym obciążeniem. Istnieją różne sposoby mierzenia wydajności, a stosowane optymalizacje zależą od potrzeb biznesowych.

Omówienie architektury

Tematy platformy Kafka służą do organizowania rekordów. Producenci tworzą rekordy, a konsumenci je zużywają. Producenci wysyłają rekordy do brokerów platformy Kafka, które następnie przechowują dane. Każdy węzeł procesu roboczego w klastrze usługi HDInsight jest brokerem platformy Kafka.

Tematy dzielą rekordy między brokerami. Podczas korzystania z rekordów można użyć maksymalnie jednego odbiorcy na jedną partycję, aby osiągnąć równoległe przetwarzanie danych.

Replikacja służy do duplikowania partycji między węzłami. Ta partycja chroni przed awariami węzłów (brokera). Pojedyncza partycja między grupą replik jest wyznaczona jako lider partycji. Ruch producenta jest kierowany do partycji wiodącej w każdym węźle przy użyciu stanu zarządzanego przez usługę ZooKeeper.

Identyfikowanie własnego scenariusza

Wydajność platformy Apache Kafka ma dwa główne aspekty — przepływność i opóźnienie. Przepływność to maksymalna szybkość przetwarzania danych. Wyższa przepływność jest lepsza. Opóźnienie to czas przechowywania lub pobierania danych. Mniejsze opóźnienie jest lepsze. Znalezienie właściwej równowagi między przepływnością, opóźnieniem i kosztem infrastruktury aplikacji może być trudne. Wymagania dotyczące wydajności powinny być zgodne z jedną z następujących trzech typowych sytuacji, w zależności od tego, czy wymagana jest wysoka przepływność, małe opóźnienia, czy oba:

  • Wysoka przepływność, małe opóźnienia. Ten scenariusz wymaga zarówno wysokiej przepływności, jak i małych opóźnień (ok. 100 milisekund). Przykładem tego typu aplikacji jest monitorowanie dostępności usługi.
  • Wysoka przepływność, duże opóźnienie. Ten scenariusz wymaga wysokiej przepływności (ok. 1,5 GB/s), ale może tolerować większe opóźnienie (< 250 ms). Przykładem tego typu aplikacji jest pozyskiwanie danych telemetrycznych dla procesów niemal w czasie rzeczywistym, takich jak aplikacje do wykrywania zabezpieczeń i nieautoryzowanego dostępu.
  • Niska przepływność, małe opóźnienie. Ten scenariusz wymaga małego opóźnienia (< 10 ms) na potrzeby przetwarzania w czasie rzeczywistym, ale może tolerować niższą przepływność. Przykładem tej aplikacji jest sprawdzanie pisowni online i gramatyki.

Konfiguracje producenta

W poniższych sekcjach przedstawiono niektóre z najważniejszych ogólnych właściwości konfiguracji w celu zoptymalizowania wydajności producentów platformy Kafka. Aby uzyskać szczegółowe wyjaśnienie wszystkich właściwości konfiguracji, zobacz dokumentację platformy Apache Kafka dotyczącą konfiguracji producenta.

Rozmiar partii

Producenci platformy Apache Kafka zbierają grupy komunikatów (nazywane partiami), które są wysyłane jako jednostka do przechowywania w jednej partycji magazynu. Rozmiar partii oznacza liczbę bajtów, które muszą być obecne przed przesłaniem tej grupy. Zwiększenie parametru batch.size może zwiększyć przepływność, ponieważ zmniejsza obciążenie związane z przetwarzaniem żądań sieci i operacji we/wy. W przypadku lekkiego obciążenia zwiększone rozmiary partii mogą zwiększyć opóźnienie wysyłania platformy Kafka, ponieważ producent czeka na przygotowanie partii. W przypadku dużego obciążenia zaleca się zwiększenie rozmiaru partii w celu zwiększenia przepływności i opóźnień.

Potwierdzenie wymagane przez producenta

Wymagana konfiguracja producenta acks określa liczbę potwierdzenia wymaganych przez lidera partycji, zanim żądanie zapisu zostanie uznane za ukończone. To ustawienie ma wpływ na niezawodność danych i przyjmuje wartości 0, 1lub -1. Wartość -1 oznacza, że potwierdzenie musi zostać odebrane ze wszystkich replik przed ukończeniem zapisu. Ustawienie acks = -1 zapewnia silniejsze gwarancje dotyczące utraty danych, ale powoduje również większe opóźnienie i niższą przepływność. Jeśli wymagania aplikacji wymagają wyższej przepływności, spróbuj ustawić wartość acks = 0 lub acks = 1. Należy pamiętać, że nie potwierdzanie, że wszystkie repliki mogą zmniejszyć niezawodność danych.

Kompresja

Producent platformy Kafka można skonfigurować do kompresowania komunikatów przed wysłaniem ich do brokerów. Ustawienie compression.type określa koder kompresji, który ma być używany. Obsługiwane koderi kompresji to "gzip", "snappy" i "lz4". Kompresja jest korzystna i powinna być brana pod uwagę, jeśli występuje ograniczenie pojemności dysku.

Wśród dwóch powszechnie używanych koderów gzip kompresji i snappy, gzip ma wyższy współczynnik kompresji, co skutkuje niższym użyciem dysku kosztem wyższego obciążenia procesora CPU. snappy Koder koderowy zapewnia mniejszą kompresję z mniejszym obciążeniem procesora CPU. Możesz zdecydować, którego kodera używać na podstawie ograniczeń dysku brokera lub procesora CPU producenta. gzip może kompresować dane z szybkością pięć razy wyższą niż snappy.

Kompresja danych zwiększa liczbę rekordów, które mogą być przechowywane na dysku. Może również zwiększyć obciążenie procesora CPU w przypadkach, gdy występuje niezgodność między formatami kompresji używanymi przez producenta i brokera. ponieważ dane muszą zostać skompresowane przed wysłaniem, a następnie zdekompresowane przed przetworzeniem.

Ustawienia brokera

W poniższych sekcjach przedstawiono niektóre z najważniejszych ustawień optymalizacji wydajności brokerów platformy Kafka. Aby uzyskać szczegółowe wyjaśnienie wszystkich ustawień brokera, zobacz dokumentację platformy Apache Kafka dotyczącą konfiguracji brokera.

Liczba dysków

Dyski magazynu mają ograniczoną liczbę operacji we/wy na sekundę (operacje wejścia/wyjścia) i bajty odczytu/zapisu na sekundę. Podczas tworzenia nowych partycji platforma Kafka przechowuje każdą nową partycję na dysku z najmniej istniejącymi partycjami, aby zrównoważyć je na dostępnych dyskach. Pomimo strategii magazynowania podczas przetwarzania setek replik partycji na każdym dysku platforma Kafka może łatwo sytować dostępną przepływność dysku. Kompromis polega na tym, że między przepływnością a kosztami. Jeśli aplikacja wymaga większej przepływności, utwórz klaster z większymi dyskami zarządzanymi na brokera. Usługa HDInsight obecnie nie obsługuje dodawania dysków zarządzanych do uruchomionego klastra. Aby uzyskać więcej informacji na temat konfigurowania liczby dysków zarządzanych, zobacz Konfigurowanie magazynu i skalowalności dla platformy Apache Kafka w usłudze HDInsight. Zapoznaj się z kosztami zwiększania miejsca do magazynowania dla węzłów w klastrze.

Liczba tematów i partycji

Producenci platformy Kafka piszą do tematów. Użytkownicy platformy Kafka czytają tematy. Temat jest skojarzony z dziennikiem, który jest strukturą danych na dysku. Platforma Kafka dołącza rekordy od producentów na końcu dziennika tematu. Dziennik tematów składa się z wielu partycji, które są rozłożone na wiele plików. Te pliki są z kolei rozłożone na wiele węzłów klastra platformy Kafka. Konsumenci czytają tematy platformy Kafka pod ich okresem i mogą wybrać pozycję (przesunięcie) w dzienniku tematów.

Każda partycja platformy Kafka jest plikiem dziennika w systemie, a wątki producenta mogą zapisywać w wielu dziennikach jednocześnie. Podobnie, ponieważ każdy wątek odbiorcy odczytuje komunikaty z jednej partycji, korzystanie z wielu partycji jest obsługiwane równolegle.

Zwiększenie gęstości partycji (liczba partycji na brokera) powoduje zwiększenie obciążenia związanego z operacjami metadanych i żądaniem/odpowiedzią partycji między liderem partycji a jego obserwatorami. Nawet w przypadku braku danych przesyłanych przez repliki partycji nadal pobierają dane z liderów, co powoduje dodatkowe przetwarzanie żądań wysyłania i odbierania przez sieć.

W przypadku klastrów Apache Kafka w wersji 2.1 i 2.4 oraz jak wspomniano wcześniej w usłudze HDInsight, zalecamy posiadanie maksymalnie 2000 partycji na brokera, w tym repliki. Zwiększenie liczby partycji na brokera zmniejsza przepływność i może również powodować niedostępność tematu. Aby uzyskać więcej informacji na temat obsługi partycji platformy Kafka, zobacz oficjalny wpis w blogu platformy Apache Kafka dotyczący zwiększenia liczby obsługiwanych partycji w wersji 1.1.0. Aby uzyskać szczegółowe informacje na temat modyfikowania tematów, zobacz Apache Kafka: modyfikowanie tematów.

Liczba replik

Wyższy współczynnik replikacji powoduje dodatkowe żądania między liderem partycji a obserwatorami. W związku z tym wyższy współczynnik replikacji zużywa więcej dysku i procesora CPU do obsługi dodatkowych żądań, zwiększając opóźnienie zapisu i zmniejszając przepływność.

Zalecamy użycie co najmniej 3x replikacji dla platformy Kafka w usłudze Azure HDInsight. Większość regionów platformy Azure ma trzy domeny błędów, ale w regionach z tylko dwiema domenami błędów użytkownicy powinni używać replikacji 4 razy.

Aby uzyskać więcej informacji na temat replikacji, zobacz Apache Kafka: replikacja i Apache Kafka: zwiększanie współczynnika replikacji.

Konfiguracje konsumentów

W poniższej sekcji przedstawiono niektóre ważne konfiguracje ogólne, aby zoptymalizować wydajność użytkowników platformy Kafka. Aby uzyskać szczegółowe wyjaśnienie wszystkich konfiguracji, zobacz dokumentację platformy Apache Kafka dotyczącą konfiguracji konsumentów.

Liczba użytkowników

Dobrym rozwiązaniem jest posiadanie liczby partycji równych liczbie odbiorców. Jeśli liczba odbiorców jest mniejsza niż liczba partycji, kilku użytkowników odczytuje z wielu partycji, zwiększając opóźnienie konsumentów.

Jeśli liczba użytkowników jest większa niż liczba partycji, to marnujesz zasoby konsumentów, ponieważ ci konsumenci są bezczynni.

Unikaj częstego ponownego równoważenia konsumentów

Ponowne równoważenie konsumentów jest wyzwalane przez zmianę własności partycji (tj. skalowanie użytkowników w poziomie lub skalowanie w dół), awarię brokera (ponieważ brokerzy są koordynatorem grup odbiorców), awaria użytkownika, dodanie nowego tematu lub dodanie nowych partycji. Podczas ponownego równoważenia użytkownicy nie mogą korzystać, co zwiększa opóźnienie.

Konsumenci są traktowani jako aktywni, jeśli może wysłać puls do brokera w programie session.timeout.ms. W przeciwnym razie konsument jest uważany za nieaktywnego lub nieudanego. To opóźnienie prowadzi do ponownego równoważenia konsumentów. Obniż odbiorcę session.timeout.ms, szybciej możemy wykryć te błędy.

Jeśli wartość session.timeout.ms jest zbyt niska, konsument może doświadczać powtarzających się niepotrzebnych ponownych równoważenia ze względu na scenariusze, takie jak w przypadku dłuższego przetwarzania partii komunikatów lub gdy wstrzymanie JVM GC trwa zbyt długo. Jeśli masz konsumenta, który poświęca zbyt dużo czasu na przetwarzanie komunikatów, możesz rozwiązać ten problem, zwiększając górną granicę czasu, przez jaki odbiorca może być bezczynny przed pobraniem większej liczby rekordów max.poll.interval.ms z lub przez zmniejszenie maksymalnego rozmiaru partii zwracanych za pomocą parametru max.poll.recordskonfiguracji .

Dzielenie na partie

Podobnie jak producenci, możemy dodać przetwarzanie wsadowe dla konsumentów. Ilość danych, które użytkownicy mogą uzyskać w każdym żądaniu pobierania, można skonfigurować, zmieniając konfigurację fetch.min.bytes. Ten parametr definiuje minimalne bajty oczekiwane od odpowiedzi pobierania odbiorcy. Zwiększenie tej wartości zmniejsza liczbę żądań pobierania wysyłanych do brokera, co zmniejsza dodatkowe obciążenie. Domyślnie ta wartość to 1. Podobnie istnieje inna konfiguracja fetch.max.wait.ms. Jeśli żądanie pobierania nie ma wystarczającej liczby komunikatów zgodnie z rozmiarem fetch.min.bytes, czeka na wygaśnięcie czasu oczekiwania na podstawie tej konfiguracji fetch.max.wait.ms.

Uwaga

W kilku scenariuszach konsumenci mogą wydawać się powolne, gdy nie może przetworzyć komunikatu. Jeśli nie zatwierdzasz przesunięcia po wyjątku, konsument będzie zablokowany w określonym przesunięciu w nieskończonej pętli i w rezultacie nie przejdzie do przodu, zwiększając opóźnienie po stronie konsumenta.

Dostrajanie systemu operacyjnego Linux z dużym obciążeniem

Mapy pamięci

vm.max_map_count definiuje maksymalną liczbę mmap, które może mieć proces. Domyślnie na maszynie wirtualnej z systemem Linux klastra Apache Kafka w usłudze HDInsight wartość to 65535.

Na platformie Apache Kafka każdy segment dziennika wymaga pary plików indeksu/indeksu czasu, a każdy z tych plików zużywa jedną mapę m. Innymi słowy, każdy segment dziennika używa dwóch mmap. W związku z tym, jeśli każda partycja hostuje pojedynczy segment dziennika, wymaga co najmniej dwóch mmap. Liczba segmentów dziennika na partycję różni się w zależności od rozmiaru segmentu, intensywności obciążenia, zasad przechowywania, okresu kroczącego i zazwyczaj jest więcej niż jeden. Mmap value = 2*((partition size)/(segment size))*(partitions)

Jeśli wymagana wartość mmap przekracza vm.max_map_countwartość , broker zgłosi wyjątek "Mapowanie nie powiodło się".

Aby uniknąć tego wyjątku, użyj poniższych poleceń, aby sprawdzić rozmiar mapy mmap na maszynie wirtualnej i zwiększyć rozmiar w razie potrzeby w każdym węźle roboczym.

# command to find number of index files:
find . -name '*index' | wc -l

# command to view vm.max_map_count for a process:
cat /proc/[kafka-pid]/maps | wc -l

# command to set the limit of vm.max_map_count:
sysctl -w vm.max_map_count=<new_mmap_value>

# This will make sure value remains, even after vm is rebooted:
echo 'vm.max_map_count=<new_mmap_value>' >> /etc/sysctl.conf
sysctl -p

Uwaga

Należy zachować ostrożność podczas ustawiania tej zbyt dużej ilości pamięci na maszynie wirtualnej. Ilość pamięci, która może być używana przez maszynę wirtualną JVM na mapach pamięci, jest określana przez ustawienie MaxDirectMemory. Wartość domyślna to 64 MB. Jest możliwe, że zostanie to osiągnięte. Tę wartość można zwiększyć, dodając -XX:MaxDirectMemorySize=amount of memory used do ustawień maszyny wirtualnej JVM za pomocą narzędzia Ambari. Bądź cognizant ilości pamięci używanej w węźle i jeśli jest wystarczająca ilość dostępnej pamięci RAM, aby to obsłużyć.

Następne kroki