Optimera prestanda för Apache Kafka HDInsight-kluster
Den här artikeln innehåller några förslag på hur du optimerar prestandan för dina Apache Kafka-arbetsbelastningar i HDInsight. Fokus ligger på att justera producent-, asynkron- och konsumentkonfiguration. Ibland måste du också justera operativsystemets inställningar för att justera prestandan med hög arbetsbelastning. Det finns olika sätt att mäta prestanda och de optimeringar som du tillämpar beror på dina affärsbehov.
Arkitekturöversikt
Kafka-ämnen används för att organisera poster. Producenter producerar poster och konsumenterna konsumerar dem. Producenter skickar poster till Kafka-koordinatorer, som sedan lagrar data. Varje arbetsnod i HDInsight-klustret är en asynkron Kafka-meddelandekö.
Ämnen delar upp poster över meddelandeköer. När du förbrukar poster, kan du använda en konsument per partition för att uppnå parallell bearbetning av data.
Replikering används för att duplicera partitioner mellan noder. Den här partitionen skyddar mot avbrott i noden (broker). En enda partition i gruppen med repliker utses till partitionsledare. Producenttrafik dirigeras till varje ledande nod med det tillstånd som hanteras av ZooKeeper.
Identifiera ditt scenario
Apache Kafka-prestanda har två huvudsakliga aspekter – dataflöde och svarstid. Dataflödet är den maximala hastighet med vilken data kan bearbetas. Högre dataflöde är bättre. Svarstid är den tid det tar för data att lagras eller hämtas. Kortare svarstider är bättre. Det kan vara svårt att hitta rätt balans mellan dataflöde, svarstid och kostnaden för programmets infrastruktur. Dina prestandakrav bör matcha med någon av följande tre vanliga situationer, baserat på om du behöver högt dataflöde, låg svarstid eller båda:
- Högt dataflöde, låg svarstid. Det här scenariot kräver både högt dataflöde och låg svarstid (~100 millisekunder). Ett exempel på den här typen av program är övervakning av tjänsttillgänglighet.
- Högt dataflöde, hög svarstid. Det här scenariot kräver högt dataflöde (~1,5 GBIT/s) men kan tolerera högre svarstid (< 250 ms). Ett exempel på den här typen av program är telemetridatainmatning för nästan realtidsprocesser som program för säkerhets- och intrångsidentifiering.
- Lågt dataflöde, låg svarstid. Det här scenariot kräver låg svarstid (< 10 ms) för realtidsbearbetning, men kan tolerera lägre dataflöde. Ett exempel på den här typen av program är stavnings- och grammatikkontroller online.
Producentkonfigurationer
Följande avsnitt belyser några av de viktigaste allmänna konfigurationsegenskaperna för att optimera prestanda för dina Kafka-producenter. En detaljerad förklaring av alla konfigurationsegenskaper finns i Apache Kafka-dokumentationen om producentkonfigurationer.
Batchstorlek
Apache Kafka-producenter sammanställer grupper av meddelanden (kallas batchar) som skickas som en enhet som ska lagras i en enda lagringspartition. Batchstorlek innebär det antal byte som måste finnas innan gruppen överförs. Att öka parametern batch.size
kan öka dataflödet eftersom det minskar bearbetningskostnaderna från nätverks- och I/O-begäranden. Vid lätt belastning kan ökad batchstorlek öka Kafka-svarstiden när producenten väntar på att en batch ska vara klar. Under hög belastning rekommenderar vi att du ökar batchstorleken för att förbättra dataflödet och svarstiden.
Nödvändiga bekräftelser för producenten
Den konfiguration som krävs acks
av producenten avgör hur många bekräftelser som krävs av partitionsledaren innan en skrivbegäran anses vara slutförd. Den här inställningen påverkar datatillförlitligheten och tar värden för 0
, 1
eller -1
. Värdet -1
för innebär att en bekräftelse måste tas emot från alla repliker innan skrivning har slutförts. Inställningen acks = -1
ger starkare garantier mot dataförlust, men det resulterar också i högre svarstid och lägre dataflöde. Om dina programkrav kräver högre dataflöde kan du prova att ange acks = 0
eller acks = 1
. Tänk på att om du inte erkänner alla repliker kan det minska datatillförlitligheten.
Komprimering
En Kafka-producent kan konfigureras för att komprimera meddelanden innan de skickas till koordinatorer. Inställningen compression.type
anger vilken komprimeringskodc som ska användas. Komprimeringskodex som stöds är "gzip", "snappy" och "lz4". Komprimering är fördelaktigt och bör övervägas om det finns en begränsning av diskkapaciteten.
Bland de två vanliga komprimeringskodcerna gzip
, och snappy
, gzip
har ett högre komprimeringsförhållande, vilket resulterar i lägre diskanvändning på bekostnad av högre CPU-belastning. Codec snappy
ger mindre komprimering med mindre processorkostnader. Du kan bestämma vilken codec som ska användas baserat på begränsningar för koordinatordiskar eller producent-CPU. gzip
kan komprimera data med en hastighet som är fem gånger högre än snappy
.
Datakomprimering ökar antalet poster som kan lagras på en disk. Det kan också öka processorkostnaderna i fall där det finns ett matchningsfel mellan de komprimeringsformat som används av producenten och asynkron meddelandekö. eftersom data måste komprimeras innan de skickas och sedan dekomprimeras innan bearbetningen.
Inställningar för asynkron meddelandek
Följande avsnitt belyser några av de viktigaste inställningarna för att optimera prestanda för dina Kafka-mäklare. En detaljerad förklaring av alla asynkrona inställningar finns i Apache Kafka-dokumentationen om asynkrona konfigurationer.
Antal diskar
Lagringsdiskar har begränsad IOPS (indata-/utdataåtgärder per sekund) och byte för läsning/skrivning per sekund. När du skapar nya partitioner lagrar Kafka varje ny partition på disken med minst befintliga partitioner för att balansera dem mellan de tillgängliga diskarna. Trots lagringsstrategin kan Kafka enkelt mätta det tillgängliga diskdataflödet när du bearbetar hundratals partitionsrepliker på varje disk. Kompromissen här är mellan dataflöde och kostnad. Om ditt program kräver större dataflöde skapar du ett kluster med fler hanterade diskar per asynkron meddelandekö. HDInsight stöder för närvarande inte att lägga till hanterade diskar i ett kluster som körs. Mer information om hur du konfigurerar antalet hanterade diskar finns i Konfigurera lagring och skalbarhet för Apache Kafka i HDInsight. Förstå kostnadskonsekvenserna av att öka lagringsutrymmet för noderna i klustret.
Antal ämnen och partitioner
Kafka-producenter skriver till ämnen. Kafka-konsumenter läser från ämnen. Ett ämne är associerat med en logg, som är en datastruktur på disken. Kafka lägger till poster från en eller flera tillverkare i slutet av en ämneslogg. En ämneslogg består av många partitioner som är spridda över flera filer. Dessa filer är i sin tur spridda över flera Kafka-klusternoder. Konsumenter läser från Kafka-ämnen på sin takt och kan välja sin position (förskjutning) i ämnesloggen.
Varje Kafka-partition är en loggfil i systemet och producenttrådar kan skriva till flera loggar samtidigt. Eftersom varje konsumenttråd läser meddelanden från en partition hanteras även användning från flera partitioner parallellt.
Om du ökar partitionstätheten (antalet partitioner per asynkron meddelandekö) läggs ett omkostnader för metadataåtgärder och per partitionsbegäran/svar mellan partitionsledaren och dess följare. Även om data inte flödar igenom hämtar partitionsrepliker fortfarande data från ledare, vilket resulterar i extra bearbetning för att skicka och ta emot begäranden via nätverket.
För Apache Kafka-kluster 2.1 och 2.4 och som tidigare nämnts i HDInsight rekommenderar vi att du har högst 2 000 partitioner per asynkron meddelandekö, inklusive repliker. Om du ökar antalet partitioner per asynkron meddelandekö minskar dataflödet och kan även orsaka otillgänglighet för ämnet. Mer information om stöd för Kafka-partitioner finns i det officiella Apache Kafka-blogginlägget om ökningen av antalet partitioner som stöds i version 1.1.0. Mer information om hur du ändrar ämnen finns i Apache Kafka: ändra ämnen.
Antal repliker
Högre replikeringsfaktor resulterar i ytterligare begäranden mellan partitionsledaren och följare. Därför förbrukar en högre replikeringsfaktor mer disk och PROCESSOR för att hantera ytterligare begäranden, vilket ökar skrivfördröjningen och minskar dataflödet.
Vi rekommenderar att du använder minst 3x replikering för Kafka i Azure HDInsight. De flesta Azure-regioner har tre feldomäner, men i regioner med endast två feldomäner bör användarna använda 4x-replikering.
Mer information om replikering finns i Apache Kafka: replikering och Apache Kafka: ökande replikeringsfaktor.
Konsumentkonfigurationer
I följande avsnitt beskrivs några viktiga allmänna konfigurationer för att optimera prestanda för dina Kafka-konsumenter. En detaljerad förklaring av alla konfigurationer finns i Apache Kafka-dokumentationen om konsumentkonfigurationer.
Antal konsumenter
Det är en bra idé att ha antalet partitioner lika med antalet konsumenter. Om antalet konsumenter är mindre än antalet partitioner läser några av konsumenterna från flera partitioner, vilket ökar konsumentfördröjningen.
Om antalet konsumenter är större än antalet partitioner slösar du bort dina konsumentresurser eftersom dessa konsumenter är inaktiva.
Undvik frekvent ombalansering av konsumenter
Ombalansering av konsumenter utlöses av ändring av partitionsägarskap (d.v.s. konsumenter skalar ut eller skalar ned), en asynkron koordinatorkrasch (eftersom mäklare är gruppkoordinatorer för konsumentgrupper), en konsumentkrasch, lägger till ett nytt ämne eller lägger till nya partitioner. Under ombalanseringen kan konsumenterna inte använda, vilket ökar svarstiden.
Konsumenter anses vara vid liv om de kan skicka ett pulsslag till en asynkron meddelandekö inom session.timeout.ms
. Annars anses konsumenten vara död eller misslyckad. Den här fördröjningen leder till en ombalansering av konsumenterna. Sänk konsumenten session.timeout.ms
, snabbare kan vi identifiera dessa fel.
Om det session.timeout.ms
är för lågt kan en konsument uppleva upprepade onödiga ombalanseringar på grund av scenarier som när en batch med meddelanden tar längre tid att bearbeta eller när en JVM GC-paus tar för lång tid. Om du har en konsument som ägnar för mycket tid åt att bearbeta meddelanden kan du åtgärda detta antingen genom att öka den övre gränsen för hur lång tid en konsument kan vara inaktiv innan fler poster hämtas med max.poll.interval.ms
eller genom att minska den maximala storleken på batchar som returneras med konfigurationsparametern max.poll.records
.
Batchbearbetning
Precis som producenter kan vi lägga till batchbearbetning för konsumenter. Mängden data som konsumenter kan få i varje hämtningsbegäran kan konfigureras genom att ändra konfigurationen fetch.min.bytes
. Den här parametern definierar de minsta byte som förväntas från ett hämtningssvar från en konsument. Om du ökar det här värdet minskar antalet hämtningsbegäranden som görs till asynkron meddelandekö, vilket minskar extra kostnader. Som standard är det här värdet 1. På samma sätt finns det en annan konfiguration fetch.max.wait.ms
. Om en hämtningsbegäran inte har tillräckligt med meddelanden enligt storleken fetch.min.bytes
på väntar den tills väntetiden förfaller baserat på den här konfigurationen fetch.max.wait.ms
.
Kommentar
I några scenarier kan det verka som om konsumenterna är långsamma när meddelandet inte bearbetas. Om du inte genomför förskjutningen efter ett undantag fastnar konsumenten vid en viss förskjutning i en oändlig loop och kommer inte att gå framåt, vilket ökar fördröjningen på konsumentsidan som ett resultat.
Linux OS-justering med tung arbetsbelastning
Minneskartor
vm.max_map_count
definierar maximalt antal mmap som en process kan ha. Som standard är värdet 65535 på den virtuella linux-datorn i HDInsight Apache Kafka-kluster.
I Apache Kafka kräver varje loggsegment ett par index-/tidindexfiler, och var och en av dessa filer använder en mmap. Med andra ord använder varje loggsegment två mmap. Om varje partition är värd för ett enskilt loggsegment kräver det därför minst två mmap. Antalet loggsegment per partition varierar beroende på segmentstorlek, belastningsintensitet, kvarhållningsprincip, rullande period och tenderar vanligtvis att vara mer än en. Mmap value = 2*((partition size)/(segment size))*(partitions)
Om det obligatoriska mmap-värdet överskrider genererar asynkron vm.max_map_count
meddelandekö undantaget "Map failed".
Undvik det här undantaget genom att använda kommandona nedan för att kontrollera storleken på mmap i den virtuella datorn och öka storleken om det behövs på varje arbetsnod.
# command to find number of index files:
find . -name '*index' | wc -l
# command to view vm.max_map_count for a process:
cat /proc/[kafka-pid]/maps | wc -l
# command to set the limit of vm.max_map_count:
sysctl -w vm.max_map_count=<new_mmap_value>
# This will make sure value remains, even after vm is rebooted:
echo 'vm.max_map_count=<new_mmap_value>' >> /etc/sysctl.conf
sysctl -p
Kommentar
Var försiktig med att ställa in detta för högt eftersom det tar upp minne på den virtuella datorn. Mängden minne som tillåts användas av JVM på minneskartor bestäms av inställningen MaxDirectMemory
. Standardvärdet är 64 MB. Det är möjligt att detta uppnås. Du kan öka det här värdet genom att lägga till -XX:MaxDirectMemorySize=amount of memory used
i JVM-inställningarna via Ambari. Var medveten om mängden minne som används på noden och om det finns tillräckligt med tillgängligt RAM-minne för att stödja detta.