Werken met joins in Azure Databricks
Databricks biedt ondersteuning voor ansi-standaard-joinsyntaxis. In dit artikel worden de verschillen tussen joins met batch- en stroomverwerking beschreven en worden enkele aanbevelingen gegeven voor het optimaliseren van joinprestaties.
Notitie
Databricks ondersteunt ook standaardsyntaxis voor de setoperatoren UNION
, INTERSECT
en EXCEPT
. Zie Operatoren instellen.
Verschillen tussen streaming en batch-joins
Joins in Azure Databricks zijn stateful of stateless.
Alle batch-joins zijn stateless joins. Resultaten worden onmiddellijk verwerkt en worden gegevens weergegeven op het moment dat de query wordt uitgevoerd. Telkens wanneer de query wordt uitgevoerd, worden nieuwe resultaten berekend op basis van de opgegeven brongegevens. Zie Batch-joins.
Joins tussen twee streaminggegevensbronnen zijn stateful. In stateful joins houdt Azure Databricks informatie over de gegevensbronnen en de resultaten bij en worden de resultaten iteratief bijgewerkt. Stateful joins kunnen krachtige oplossingen bieden voor online gegevensverwerking, maar het kan lastig zijn om effectief te implementeren. Ze hebben complexe operationele semantiek, afhankelijk van de uitvoermodus, het triggerinterval en het watermerk. Zie Stream-stream-joins.
Stream-statische joins zijn staatloos, maar bieden een goede optie voor het samenvoegen van een incrementele gegevensbron (zoals een feitentabel) met een statische gegevensbron (zoals een langzaam veranderende dimensionale tabel). In plaats van alle records van beide zijden samen te voegen telkens wanneer een query wordt uitgevoerd, worden alleen nieuw ontvangen records van de streamingbron gekoppeld aan de huidige versie van de statische tabel. Zie Stream-static joins.
Batch-joins
Azure Databricks biedt ondersteuning voor standaardsyntaxis voor SQL-joins, waaronder inner, outer, semi, anti- en cross joins. Zie JOIN.
Notitie
Databricks raadt aan een gerealiseerde weergave te gebruiken om incrementele berekening van de resultaten van een inner join te optimaliseren. Zie Gerealiseerde weergaven gebruiken in Databricks SQL.
Stream-stream-joins
Het samenvoegen van twee streaminggegevensbronnen kan aanzienlijke uitdagingen opleveren bij het beheren van statusinformatie en het redeneren van resultatenberekening en -uitvoer. Voordat u een stream-stream join implementeert, raadt Databricks aan om een sterk inzicht te krijgen in de operationele semantiek voor stateful streaming, inclusief hoe watermerken van invloed zijn op statusbeheer. Zie de volgende artikelen:
- Wat is stateful streaming?
- Watermerken toepassen om drempelwaarden voor gegevensverwerking te beheren
- Stream-stream-joins
Databricks raadt aan watermerken op te geven voor beide zijden van alle stroom-stoom-joins. De volgende jointypen worden ondersteund:
- Inner joins
- Left outer joins
- Right outer joins
- Volledige outer joins
- Linker semi-joins
Zie de documentatie over Apache Spark Structured Streaming over stream-steam joins.
Stream-static joins
Notitie
Bij het beschreven gedrag voor stream-statische joins wordt ervan uitgegaan dat de statische gegevens worden opgeslagen met Behulp van Delta Lake.
Een stroom-statische join voegt de meest recente geldige versie van een Delta-tabel (de statische gegevens) toe aan een gegevensstroom met behulp van een stateless join.
Wanneer Azure Databricks een microbatch met gegevens verwerkt in een stroomstatische join, wordt de meest recente geldige versie van gegevens uit de statische Delta-tabel samengevoegd met de records die aanwezig zijn in de huidige microbatch. Omdat de join staatloos is, hoeft u geen watermerken te configureren en resultaten met lage latentie te verwerken. De gegevens in de statische Delta-tabel die in de join worden gebruikt, moeten langzaam worden gewijzigd.
In het volgende voorbeeld ziet u dit patroon:
streamingDF = spark.readStream.table("orders")
staticDF = spark.read.table("customers")
query = (streamingDF
.join(staticDF, streamingDF.customer_id==staticDF.id, "inner")
.writeStream
.option("checkpointLocation", checkpoint_path)
.table("orders_with_customer_info")
)
Joinprestaties optimaliseren
Berekenen met Photon ingeschakeld selecteert altijd het beste jointype. Zie Wat is Photon?
Het gebruik van een recente Databricks Runtime-versie waarvoor Photon is ingeschakeld, biedt over het algemeen goede joinprestaties, maar u moet ook rekening houden met de volgende aanbevelingen:
Cross joins zijn erg duur. Verwijder cross joins uit workloads en query's waarvoor een lage latentie of regelmatige hercomputatie is vereist.
Lid worden van bestelling is belangrijk. Wanneer u meerdere joins uitvoert, moet u altijd eerst de kleinste tabellen samenvoegen en vervolgens het resultaat samenvoegen met grotere tabellen.
De optimizer kan moeite hebben met query's met veel joins en aggregaties. Door tussenliggende resultaten op te slaan, kunnen queryplanning en rekenresultaten worden versneld.
Behoud nieuwe statistieken om de prestaties te verbeteren. Voorspellende optimalisatie met
ANALYZE
(openbare preview) kan statistieken automatisch bijwerken en onderhouden. U kunt de queryANALYZE TABLE table_name COMPUTE STATISTICS
ook uitvoeren om statistieken in de queryplanner bij te werken.
Belangrijk
Voorspellende optimalisatie met ANALYZE
bevindt zich in openbare preview. Het bevat intelligente verzameling statistieken tijdens schrijfbewerkingen. Gebruik dit formulier om u aan te melden voor de openbare preview.
Notitie
In Databricks Runtime 14.3 LTS en hoger kunt u de kolommen wijzigen waarop Delta Lake statistieken verzamelt voor het overslaan van gegevens en vervolgens bestaande statistieken opnieuw compileren in het Delta-logboek. Zie Kolommen voor Delta-statistieken opgeven.
Hints toevoegen aan Azure Databricks
Apache Spark biedt ondersteuning voor het opgeven van joinhints voor bereikdeelnames en scheeftrekken. Hints voor scheefheidsdeelnames zijn niet nodig omdat Azure Databricks deze joins automatisch optimaliseert. Zie hints
Hints voor bereikdeelnames kunnen nuttig zijn als joinprestaties slecht zijn en u ongelijkheidsdeelnames uitvoert. Voorbeelden hiervan zijn deelname aan tijdstempelbereiken of een bereik van clustering-id's. Zie Optimalisatie van bereikdeelname.