Optimalisatie van gegevensverwerking voor Apache Spark
In dit artikel wordt beschreven hoe u de configuratie van uw Apache Spark-cluster kunt optimaliseren voor de beste prestaties in Azure HDInsight.
Overzicht
Als u trage taken op een Join of Shuffle hebt, is de oorzaak waarschijnlijk gegevensverschil. Scheeftrekken van gegevens is asymmetrie in uw taakgegevens. Een toewijzingstaak kan bijvoorbeeld 20 seconden duren. Maar het uitvoeren van een taak waarbij de gegevens worden samengevoegd of in willekeurige volgorde worden gerangschikt, duurt uren. Als u gegevensverschil wilt oplossen, moet u de hele sleutel zouten of een geïsoleerde salt gebruiken voor slechts een deel van de sleutels. Als u een geïsoleerde salt gebruikt, moet u verder filteren om uw sleutelsubset waaraan salt is toegevoegd, te isoleren in toewijzings-joins. Een andere optie is om eerst een bucketkolom en een samenvoeging vooraf te introduceren in buckets.
Een andere factor die langzame joins veroorzaakt, kan het jointype zijn. Spark maakt standaard gebruik van het jointype SortMerge
. Dit type join is het meest geschikt voor grote gegevenssets. Maar is verder rekenkundig duur, omdat eerst de linker- en rechterkant van gegevens moeten worden gesorteerd voordat ze worden samengevoegd.
Een Broadcast
-join is het meest geschikt voor kleinere gegevenssets, of waarbij één zijde van de join veel kleiner is dan de andere zijde. Met dit jointype wordt één zijde naar alle uitvoerders uitgezonden, waardoor meer geheugen voor uitzendingen in het algemeen is vereist.
U kunt het jointype in uw configuratie wijzigen door spark.sql.autoBroadcastJoinThreshold
in te stellen, of u kunt een join-hint instellen met behulp van de DataFrame API’s (dataframe.join(broadcast(df2))
).
// Option 1
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1*1024*1024*1024)
// Option 2
val df1 = spark.table("FactTableA")
val df2 = spark.table("dimMP")
df1.join(broadcast(df2), Seq("PK")).
createOrReplaceTempView("V_JOIN")
sql("SELECT col1, col2 FROM V_JOIN")
Als u tabellen in buckets gebruikt, hebt u een derde jointype, de Merge
-join. Met een correct vooraf gepartitioneerde en vooraf gesorteerde gegevensset wordt de dure sorteerfase in een SortMerge
-join overgeslagen.
De volgorde van joins doet ertoe, met name bij meer complexe query’s. Begin met de meeste selectieve joins. Verplaats ook joins die het aantal rijen na aggregaties vergroten, indien mogelijk.
Als u parallelle uitvoering voor Cartesian-joins wilt beheren, kunt u geneste structuren en vensterbewerking toevoegen, en misschien een of meer stappen overslaan in uw Spark-taak.
Taakuitvoering optimaliseren
- Sla gegevens zo nodig in de cache op, bijvoorbeeld als u ze twee keer gebruikt.
- Zend variabelen uit naar alle uitvoerders. De variabelen worden slechts eenmaal geserialiseerd, wat resulteert in snellere zoekacties.
- Gebruik de threadpool in het stuurprogramma, wat resulteert in een snellere bewerking voor veel taken.
Controleer uw actieve taken regelmatig op prestatieproblemen. Als u meer inzicht nodig hebt in bepaalde problemen, kunt u een van de volgende hulpprogramma's voor prestatieprofilering overwegen:
- Intel PAL Tool bewaakt het CPU-, opslag- en netwerkbandbreedtegebruik.
- Oracle Java 8 Mission Control-profielen Spark en uitvoerdercode.
Het belangrijkste aspect voor Spark 2.x-queryprestaties is de Tungsten-engine, die afhankelijk is van codegeneratie in de volledige fase. In sommige gevallen is codegeneratie in de volledige fase mogelijk uitgeschakeld. Als u bijvoorbeeld een niet-veranderlijk type (string
) in de aggregatie-expressie gebruikt, wordt SortAggregate
weergegeven in plaats van HashAggregate
. Probeer bijvoorbeeld voor betere prestaties het volgende uit, en schakel vervolgens codegeneratie in:
MAX(AMOUNT) -> MAX(cast(AMOUNT as DOUBLE))