Werken met gegevens in een Spark-dataframe
Spark maakt standaard gebruik van een gegevensstructuur die een tolerante gedistribueerde gegevensset (RDD) wordt genoemd, maar hoewel u code kunt schrijven die rechtstreeks met RDD's werkt, is de meest gebruikte gegevensstructuur voor het werken met gestructureerde gegevens in Spark het dataframe, dat wordt geleverd als onderdeel van de Spark SQL-bibliotheek . Dataframes in Spark zijn vergelijkbaar met die in de alomtegenwoordige Pandas Python-bibliotheek, maar geoptimaliseerd voor gebruik in de gedistribueerde verwerkingsomgeving van Spark.
Notitie
Naast de Dataframe-API biedt Spark SQL een sterk getypeerde gegevensset-API die wordt ondersteund in Java en Scala. In deze module richten we ons op de Dataframe-API.
Gegevens laden in een dataframe
Laten we een hypothetisch voorbeeld bekijken om te zien hoe u een dataframe kunt gebruiken om met gegevens te werken. Stel dat u de volgende gegevens hebt in een door komma's gescheiden tekstbestand met de naam products.csv in de map Bestanden/gegevens in uw lakehouse:
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
Een schema uitstellen
In een Spark-notebook kunt u de volgende PySpark-code gebruiken om de bestandsgegevens in een dataframe te laden en de eerste tien rijen weer te geven:
%%pyspark
df = spark.read.load('Files/data/products.csv',
format='csv',
header=True
)
display(df.limit(10))
De %%pyspark
lijn aan het begin wordt een magie genoemd en vertelt Spark dat de taal die in deze cel wordt gebruikt PySpark is. U kunt de taal selecteren die u als standaardtaal wilt gebruiken in de werkbalk van de Notebook-interface en vervolgens een magic gebruiken om die keuze voor een specifieke cel te overschrijven. Hier ziet u bijvoorbeeld de equivalente Scala-code voor het voorbeeld van de productgegevens:
%%spark
val df = spark.read.format("csv").option("header", "true").load("Files/data/products.csv")
display(df.limit(10))
De magie %%spark
wordt gebruikt om Scala op te geven.
Beide codevoorbeelden produceren uitvoer als volgt:
ProductID | ProductName | Categorie | ListPrice |
---|---|---|---|
771 | Berg-100 zilver, 38 | Mountainbikes | 3399.9900 |
772 | Berg-100 zilver, 42 | Mountainbikes | 3399.9900 |
773 | Berg-100 zilver, 44 | Mountainbikes | 3399.9900 |
... | ... | ... | ... |
Een expliciet schema opgeven
In het vorige voorbeeld bevatte de eerste rij van het CSV-bestand de kolomnamen en kon Spark het gegevenstype van elke kolom afleiden uit de gegevens die het bestand bevat. U kunt ook een expliciet schema opgeven voor de gegevens, wat handig is wanneer de kolomnamen niet zijn opgenomen in het gegevensbestand, zoals in dit CSV-voorbeeld:
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
In het volgende PySpark-voorbeeld ziet u hoe u een schema opgeeft voor het dataframe dat moet worden geladen vanuit een bestand met de naam product-data.csv in deze indeling:
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('Files/data/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
De resultaten zouden weer vergelijkbaar zijn met:
ProductID | ProductName | Categorie | ListPrice |
---|---|---|---|
771 | Berg-100 zilver, 38 | Mountainbikes | 3399.9900 |
772 | Berg-100 zilver, 42 | Mountainbikes | 3399.9900 |
773 | Berg-100 zilver, 44 | Mountainbikes | 3399.9900 |
... | ... | ... | ... |
Tip
Als u een expliciet schema opgeeft, worden ook de prestaties verbeterd.
Gegevensframes filteren en groeperen
U kunt de methoden van de Dataframe-klasse gebruiken om de gegevens te filteren, sorteren, groeperen en anderszins te bewerken. In het volgende codevoorbeeld wordt bijvoorbeeld de select-methode gebruikt om de kolommen ProductID en ListPrice op te halen uit het df-gegevensframe met productgegevens in het vorige voorbeeld:
pricelist_df = df.select("ProductID", "ListPrice")
De resultaten uit dit codevoorbeeld zien er ongeveer als volgt uit:
ProductID | ListPrice |
---|---|
771 | 3399.9900 |
772 | 3399.9900 |
773 | 3399.9900 |
... | ... |
Bij de meeste methoden voor gegevensbewerking wordt een nieuw dataframeobject geretourneerd.
Tip
Het selecteren van een subset kolommen uit een dataframe is een algemene bewerking, die ook kan worden bereikt met behulp van de volgende kortere syntaxis:
pricelist_df = df["ProductID", "ListPrice"]
U kunt methoden 'koppelen' om een reeks bewerkingen uit te voeren die resulteert in een getransformeerd dataframe. Met deze voorbeeldcode wordt bijvoorbeeld de selectie gekoppeld en waar methoden voor het maken van een nieuw dataframe met de kolommen ProductName en ListPrice voor producten met een categorie Mountain Bikes of Road Bikes:
bikes_df = df.select("ProductName", "Category", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
De resultaten uit dit codevoorbeeld zien er ongeveer als volgt uit:
ProductName | Categorie | ListPrice |
---|---|---|
Berg-100 zilver, 38 | Mountainbikes | 3399.9900 |
Road-750 zwart, 52 | Racefietsen | 539.9900 |
... | ... | ... |
Als u gegevens wilt groeperen en aggregeren, kunt u de groupBy-methode en statistische functies gebruiken. Met de volgende PySpark-code wordt bijvoorbeeld het aantal producten voor elke categorie geteld:
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
De resultaten uit dit codevoorbeeld zien er ongeveer als volgt uit:
Categorie | aantal |
---|---|
Headsets | 3 |
Wielen | 14 |
Mountainbikes | 32 |
... | ... |
Een dataframe opslaan
Vaak wilt u Spark gebruiken om onbewerkte gegevens te transformeren en de resultaten op te slaan voor verdere analyse of downstreamverwerking. In het volgende codevoorbeeld wordt het dataFrame opgeslagen in een Parquet-bestand in de Data Lake, waarbij een bestaand bestand met dezelfde naam wordt vervangen.
bikes_df.write.mode("overwrite").parquet('Files/product_data/bikes.parquet')
Notitie
De Parquet-indeling heeft doorgaans de voorkeur voor gegevensbestanden die u gaat gebruiken voor verdere analyse of opname in een analytische opslag. Parquet is een zeer efficiënte indeling die wordt ondersteund door de meeste grootschalige systemen voor gegevensanalyse. Soms is het zelfs mogelijk dat uw gegevenstransformatie vereist is om gegevens te converteren van een andere indeling (zoals CSV) naar Parquet.
Het uitvoerbestand partitioneren
Partitioneren is een optimalisatietechniek waarmee Spark de prestaties op de werkknooppunten kan maximaliseren. Er kunnen meer prestatieverbeteringen worden bereikt bij het filteren van gegevens in query's door onnodige schijf-IO te elimineren.
Als u een dataframe wilt opslaan als een gepartitioneerde set bestanden, gebruikt u de partitionBy-methode bij het schrijven van de gegevens. In het volgende voorbeeld wordt het bikes_df dataframe (dat de productgegevens voor de categorieën mountainbikes en road bikes bevat) opgeslagen en worden de gegevens gepartitioneerd op categorie:
bikes_df.write.partitionBy("Category").mode("overwrite").parquet("Files/bike_data")
De mapnamen die worden gegenereerd bij het partitioneren van een dataframe, bevatten de naam en waarde van de partitioneringskolom in een kolom=waarde-indeling, zodat in het codevoorbeeld een map wordt gemaakt met de naam bike_data die de volgende submappen bevat:
- Categorie=Mountainbikes
- Category=Road Bikes
Elke submap bevat een of meer Parquet-bestanden met de productgegevens voor de juiste categorie.
Notitie
U kunt de gegevens partitioneren op meerdere kolommen, wat resulteert in een hiërarchie van mappen voor elke partitiesleutel. U kunt bijvoorbeeld verkoopordergegevens partitioneren op jaar en maand, zodat de maphiërarchie een map bevat voor elke jaarwaarde, die op zijn beurt een submap voor elke maandwaarde bevat.
Gepartitioneerde gegevens laden
Wanneer u gepartitioneerde gegevens leest in een dataframe, kunt u gegevens uit elke map in de hiërarchie laden door expliciete waarden of jokertekens op te geven voor de gepartitioneerde velden. In het volgende voorbeeld worden gegevens geladen voor producten in de categorie Road Bikes :
road_bikes_df = spark.read.parquet('Files/bike_data/Category=Road Bikes')
display(road_bikes_df.limit(5))
Notitie
De partitioneringskolommen die zijn opgegeven in het bestandspad worden weggelaten in het resulterende dataframe. De resultaten die door de voorbeeldquery worden geproduceerd, bevatten geen kolom Categorie. De categorie voor alle rijen zou Road Bikes zijn.