Delen via


Prestaties van Apache Kafka HDInsight-clusters optimaliseren

Dit artikel bevat enkele suggesties voor het optimaliseren van de prestaties van uw Apache Kafka-workloads in HDInsight. De focus ligt op het aanpassen van de configuratie van producenten, broker en consumenten. Soms moet u ook de besturingssysteeminstellingen aanpassen om de prestaties af te stemmen met een zware werkbelasting. Er zijn verschillende manieren om prestaties te meten en de optimalisaties die u toepast, zijn afhankelijk van de behoeften van uw bedrijf.

Architectuuroverzicht

Kafka-onderwerpen worden gebruikt om records te ordenen. Producenten produceren records en consumenten verbruiken ze. Producenten verzenden records naar Kafka-brokers, die vervolgens de gegevens opslaan. Elk werkrolknooppunt in uw HDInsight-cluster is een Kafka-broker.

Onderwerpen verdelen records over brokers. Wanneer records worden verbruikt, kunt u maximaal één consumer per partitie gebruiken voor parallelle verwerking van de gegevens.

Replicatie wordt gebruikt om partities tussen knooppunten te dupliceren. Deze partitie beveiligt tegen storingen van knooppunten (broker). Eén partitie tussen de groep replica's wordt aangewezen als partitieleider. Producentverkeer wordt doorgestuurd naar de leider van elk knooppunt, met behulp van de status die wordt beheerd door ZooKeeper.

Uw scenario identificeren

Prestaties van Apache Kafka hebben twee belangrijke aspecten: doorvoer en latentie. Doorvoer is de maximale snelheid waarmee gegevens kunnen worden verwerkt. Hogere doorvoer is beter. Latentie is de tijd die nodig is om gegevens op te slaan of op te halen. Lagere latentie is beter. Het vinden van de juiste balans tussen doorvoer, latentie en de kosten van de infrastructuur van de toepassing kan lastig zijn. Uw prestatievereisten moeten overeenkomen met een van de volgende drie veelvoorkomende situaties, op basis van of u hoge doorvoer, lage latentie of beide nodig hebt:

  • Hoge doorvoer, lage latentie. Voor dit scenario zijn zowel hoge doorvoer als lage latentie (~100 milliseconden) vereist. Een voorbeeld van dit type toepassing is bewaking van de beschikbaarheid van services.
  • Hoge doorvoer, hoge latentie. Voor dit scenario is een hoge doorvoer (~1,5 GBps) vereist, maar kan een hogere latentie (< 250 ms) worden getolereerd. Een voorbeeld van dit type toepassing is telemetriegegevensopname voor bijna realtime processen zoals beveiligings- en inbraakdetectietoepassingen.
  • Lage doorvoer, lage latentie. Voor dit scenario is een lage latentie (< 10 ms) vereist voor realtime verwerking, maar kan lagere doorvoer worden getolereerd. Een voorbeeld van dit type toepassing is online spelling- en grammaticacontroles.

Configuraties van producenten

In de volgende secties worden enkele van de belangrijkste algemene configuratie-eigenschappen gemarkeerd om de prestaties van uw Kafka-producenten te optimaliseren. Zie de Documentatie van Apache Kafka over producerconfiguraties voor een gedetailleerde uitleg van alle configuratie-eigenschappen.

Batchgrootte

Apache Kafka-producenten verzamelen groepen berichten (batches genoemd) die worden verzonden als een eenheid die in één opslagpartitie moet worden opgeslagen. Batchgrootte betekent het aantal bytes dat aanwezig moet zijn voordat die groep wordt verzonden. Als u de batch.size parameter verhoogt, kan de doorvoer worden verhoogd, omdat hiermee de verwerkingsoverhead van netwerk- en IO-aanvragen wordt verminderd. Bij lichte belasting kan een grotere batchgrootte de latentie van Kafka verhogen wanneer de producent wacht tot een batch gereed is. Onder zware belasting is het raadzaam om de batchgrootte te vergroten om de doorvoer en latentie te verbeteren.

Vereiste bevestigingen voor producent

De vereiste acks configuratie van de producent bepaalt het aantal bevestigingen dat door de partitieleider is vereist voordat een schrijfaanvraag als voltooid wordt beschouwd. Deze instelling is van invloed op de betrouwbaarheid van gegevens en neemt waarden van 0, 1of -1. De waarde van -1 betekent dat een bevestiging moet worden ontvangen van alle replica's voordat de schrijfbewerking is voltooid. Instelling acks = -1 biedt sterkere garanties tegen gegevensverlies, maar leidt ook tot hogere latentie en lagere doorvoer. Als uw toepassingsvereisten een hogere doorvoer eisen, kunt u instellen acks = 0 of acks = 1. Houd er rekening mee dat het niet bevestigen van alle replica's de betrouwbaarheid van gegevens kan verminderen.

Compressie

Een Kafka-producent kan worden geconfigureerd om berichten te comprimeren voordat ze naar brokers worden verzonden. Met compression.type de instelling geeft u de compressiecodec op die moet worden gebruikt. Ondersteunde compressiecodecs zijn 'gzip', 'snappy' en 'lz4'. Compressie is nuttig en moet worden overwogen als er een beperking is voor schijfcapaciteit.

Onder de twee veelgebruikte compressiecodecs gzip en snappy, gzip heeft een hogere compressieverhouding, wat resulteert in een lager schijfgebruik ten koste van een hogere CPU-belasting. De snappy codec biedt minder compressie met minder CPU-overhead. U kunt bepalen welke codec moet worden gebruikt op basis van de cpu-beperkingen van brokerschijven of producenten. gzip kan gegevens met een snelheid van vijf keer hoger dan snappy.

Gegevenscompressie verhoogt het aantal records dat op een schijf kan worden opgeslagen. Het kan ook de CPU-overhead verhogen in gevallen waarin de compressie-indelingen die door de producent en de broker worden gebruikt, niet overeenkomen. omdat de gegevens moeten worden gecomprimeerd voordat ze worden verzonden en vervolgens gedecomprimeerd voordat ze worden verwerkt.

Brokerinstellingen

In de volgende secties worden enkele van de belangrijkste instellingen gemarkeerd om de prestaties van uw Kafka-brokers te optimaliseren. Zie de Apache Kafka-documentatie over brokerconfiguraties voor een gedetailleerde uitleg van alle brokerinstellingen.

Aantal schijven

Opslagschijven hebben beperkte IOPS (invoer-/uitvoerbewerkingen per seconde) en lees-/schrijfbytes per seconde. Bij het maken van nieuwe partities slaat Kafka elke nieuwe partitie op de schijf op met het minste bestaande partities om deze te verdelen over de beschikbare schijven. Ondanks de opslagstrategie kan Kafka bij het verwerken van honderden partitiereplica's op elke schijf eenvoudig de beschikbare schijfdoorvoer verzadigen. De afweging hier is tussen doorvoer en kosten. Als voor uw toepassing meer doorvoer is vereist, maakt u een cluster met meer beheerde schijven per broker. HDInsight biedt momenteel geen ondersteuning voor het toevoegen van beheerde schijven aan een actief cluster. Zie Opslag en schaalbaarheid configureren voor Apache Kafka in HDInsight voor meer informatie over het configureren van het aantal beheerde schijven. Inzicht in de kosten van het vergroten van de opslagruimte voor de knooppunten in uw cluster.

Aantal onderwerpen en partities

Kafka-producenten schrijven naar onderwerpen. Kafka-consumenten lezen uit onderwerpen. Een onderwerp is gekoppeld aan een logboek, een gegevensstructuur op schijf. Kafka voegt records van een producent(en) toe aan het einde van een onderwerplogboek. Een onderwerplogboek bestaat uit veel partities die zijn verspreid over meerdere bestanden. Deze bestanden zijn op zijn beurt verspreid over meerdere Kafka-clusterknooppunten. Consumenten lezen uit Kafka-onderwerpen in hun frequentie en kunnen hun positie (offset) kiezen in het onderwerplogboek.

Elke Kafka-partitie is een logboekbestand op het systeem en producerthreads kunnen tegelijkertijd naar meerdere logboeken schrijven. Aangezien elke consumententhread berichten van één partitie leest, wordt het verbruik van meerdere partities ook parallel verwerkt.

Het verhogen van de partitiedichtheid (het aantal partities per broker) voegt een overhead toe met betrekking tot metagegevensbewerkingen en per partitieaanvraag/-antwoord tussen de partitieleider en de volgers. Zelfs als er geen gegevens binnenkomen, halen partitiereplica's nog steeds gegevens op van leiders, wat resulteert in extra verwerking voor het verzenden en ontvangen van aanvragen via het netwerk.

Voor Apache Kafka-clusters 2.1 en 2.4 en zoals eerder vermeld in HDInsight, raden we u aan maximaal 2000 partities per broker te hebben, inclusief replica's. Het verhogen van het aantal partities per broker vermindert de doorvoer en kan ook ertoe leiden dat het onderwerp niet beschikbaar is. Zie de officiële Apache Kafka-blogpost over de toename van het aantal ondersteunde partities in versie 1.1.0 voor meer informatie over ondersteuning voor Kafka-partities. Zie Apache Kafka voor meer informatie over het wijzigen van onderwerpen: onderwerpen wijzigen.

Aantal replica's

Hogere replicatiefactor resulteert in extra aanvragen tussen de partitieleider en volgers. Daarom verbruikt een hogere replicatiefactor meer schijf en CPU om extra aanvragen af te handelen, waardoor de schrijflatentie toeneemt en de doorvoer afneemt.

U wordt aangeraden ten minste 3x-replicatie te gebruiken voor Kafka in Azure HDInsight. De meeste Azure-regio's hebben drie foutdomeinen, maar in regio's met slechts twee foutdomeinen moeten gebruikers 4x-replicatie gebruiken.

Zie Apache Kafka: replicatie en Apache Kafka: toenemende replicatiefactor voor meer informatie over replicatie.

Consumentenconfiguraties

In de volgende sectie worden enkele belangrijke algemene configuraties gemarkeerd om de prestaties van uw Kafka-consumenten te optimaliseren. Zie de Documentatie van Apache Kafka over consumentenconfiguraties voor een gedetailleerde uitleg van alle configuraties.

Aantal consumenten

Het is een goede gewoonte om het aantal partities gelijk te hebben aan het aantal consumenten. Als het aantal gebruikers kleiner is dan het aantal partities, nemen enkele consumenten uit meerdere partities de latentie van de consument toe.

Als het aantal consumenten groter is dan het aantal partities, verwedt u uw consumentenresources omdat deze consumenten niet actief zijn.

Regelmatig opnieuw verdelen van consumenten voorkomen

Het opnieuw verdelen van de consument wordt geactiveerd door een wijziging van het partitieeigendom (consumenten worden uitgeschaald of omlaag geschaald), een brokercrash (aangezien brokers groepscoördinator zijn voor consumentengroepen), een consumentencrash, een nieuw onderwerp toevoegen of nieuwe partities toevoegen. Tijdens het opnieuw verdelen kunnen consumenten geen verbruiken, waardoor de latentie wordt verhoogd.

Consumenten worden als levend beschouwd als het een heartbeat naar een broker binnen session.timeout.mskan sturen. Anders wordt de consument beschouwd als dood of mislukt. Deze vertraging leidt tot een herverdeling van de consument. Verlaag de consument session.timeout.ms, sneller kunnen we deze fouten detecteren.

Als het session.timeout.ms te laag is, kan een consument te maken hebben met herhaalde onnodige herbalancering, vanwege scenario's zoals wanneer een batch berichten langer duurt om te verwerken of wanneer een JVM GC-pauze te lang duurt. Als u een consument hebt die te veel tijd besteedt aan het verwerken van berichten, kunt u dit oplossen door de bovengrens te verhogen voor de hoeveelheid tijd die een consument inactief kan zijn voordat meer records worden opgehaald of max.poll.interval.ms door de maximale grootte van batches te verminderen die met de configuratieparameter max.poll.recordsworden geretourneerd.

Batching

Net als producenten kunnen we batchverwerking voor consumenten toevoegen. De hoeveelheid gegevensgebruikers kan in elke ophaalaanvraag worden geconfigureerd door de configuratie fetch.min.byteste wijzigen. Met deze parameter definieert u het minimum aantal bytes dat wordt verwacht op basis van een ophaalantwoord van een consument. Door deze waarde te verhogen, wordt het aantal ophaalaanvragen voor de broker verminderd, waardoor extra overhead wordt verminderd. Deze waarde is standaard 1. Op dezelfde manier is er nog een configuratie fetch.max.wait.ms. Als een ophaalaanvraag niet voldoende berichten heeft op basis van de grootte, fetch.min.byteswacht deze totdat de wachttijd is verlopen op basis van deze configuratie fetch.max.wait.ms.

Notitie

In enkele scenario's lijken consumenten mogelijk traag te zijn wanneer het bericht niet kan worden verwerkt. Als u de offset niet na een uitzondering doorvoert, blijft de consument vastzitten op een bepaalde offset in een oneindige lus en zal deze niet vooruitgaan, waardoor de vertraging aan de consumentenzijde toeneemt.

Linux-besturingssysteem afstemmen met zware werkbelasting

Geheugentoewijzingen

vm.max_map_count definieert het maximum aantal mmaps dat een proces kan hebben. Standaard is de waarde 65535 op de Linux-VM van het Apache Kafka-cluster in HDInsight.

In Apache Kafka vereist elk logboeksegment een paar index-/timeindexbestanden en elk van deze bestanden verbruikt één mmap. Met andere woorden, elk logboeksegment maakt gebruik van twee mmaps. Als elke partitie als host fungeert voor één logboeksegment, is er dus minimaal twee mmaps vereist. Het aantal logboeksegmenten per partitie is afhankelijk van de segmentgrootte, de belastingsintensiteit, het bewaarbeleid, de rolling-periode en doorgaans meer dan één. Mmap value = 2*((partition size)/(segment size))*(partitions)

Als de vereiste mmap-waarde groter is dan de vm.max_map_countuitzondering 'Kaart is mislukt', zou de broker de uitzondering 'Toewijzing mislukt' genereren.

Als u deze uitzondering wilt voorkomen, gebruikt u de onderstaande opdrachten om de grootte van mmap in vm te controleren en de grootte zo nodig op elk werkknooppunt te vergroten.

# command to find number of index files:
find . -name '*index' | wc -l

# command to view vm.max_map_count for a process:
cat /proc/[kafka-pid]/maps | wc -l

# command to set the limit of vm.max_map_count:
sysctl -w vm.max_map_count=<new_mmap_value>

# This will make sure value remains, even after vm is rebooted:
echo 'vm.max_map_count=<new_mmap_value>' >> /etc/sysctl.conf
sysctl -p

Notitie

Wees voorzichtig met het instellen van deze te hoge waarde omdat het geheugen op de virtuele machine in beslag neemt. De hoeveelheid geheugen die door de JVM op geheugenkaarten mag worden gebruikt, wordt bepaald door de instelling MaxDirectMemory. De standaardwaarde is 64 MB. Het is mogelijk dat dit is bereikt. U kunt deze waarde verhogen door deze toe te voegen -XX:MaxDirectMemorySize=amount of memory used aan de JVM-instellingen via Ambari. Wees cognizant van de hoeveelheid geheugen die op het knooppunt wordt gebruikt en als er voldoende RAM beschikbaar is om dit te ondersteunen.

Volgende stappen