Arbeta med data i en Spark-dataram

Slutförd

Inbyggt använder Spark en datastruktur som kallas en elastisk distribuerad datamängd (RDD), men även om du kan skriva kod som fungerar direkt med RDD:er är den vanligaste datastrukturen för att arbeta med strukturerade data i Spark dataramen, som tillhandahålls som en del av Spark SQL-biblioteket. Dataramar i Spark liknar dem i det allestädes närvarande Pandas Python-biblioteket, men optimerade för att fungera i Sparks distribuerade bearbetningsmiljö.

Kommentar

Utöver Dataframe-API:et tillhandahåller Spark SQL ett starkt skrivet API för datauppsättningar som stöds i Java och Scala. Vi fokuserar på Dataframe-API:et i den här modulen.

Läsa in data i en dataram

Nu ska vi utforska ett hypotetiskt exempel för att se hur du kan använda en dataram för att arbeta med data. Anta att du har följande data i en kommaavgränsad textfil med namnet products.csv i mappen Filer/data i lakehouse:

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Härled ett schema

I en Spark-notebook-fil kan du använda följande PySpark-kod för att läsa in fildata i en dataram och visa de första 10 raderna:

%%pyspark
df = spark.read.load('Files/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

Raden %%pyspark i början kallas för en magi och talar om för Spark att språket som används i den här cellen är PySpark. Du kan välja det språk som du vill använda som standard i verktygsfältet i notebook-gränssnittet och sedan använda en magi för att åsidosätta det valet för en specifik cell. Här är till exempel motsvarande Scala-kod för produktdataexemplet:

%%spark
val df = spark.read.format("csv").option("header", "true").load("Files/data/products.csv")
display(df.limit(10))

%%spark Magin används för att ange Scala.

Båda dessa kodexempel skulle generera utdata så här:

ProductID ProductName Kategori ListPrice
771 Mountain-100 Silver, 38 Mountainbikes 3399.9900
772 Mountain-100 Silver, 42 Mountainbikes 3399.9900
773 Mountain-100 Silver, 44 Mountainbikes 3399.9900
... ... ... ...

Ange ett explicit schema

I föregående exempel innehöll den första raden i CSV-filen kolumnnamnen och Spark kunde härleda datatypen för varje kolumn från de data som den innehåller. Du kan också ange ett explicit schema för data, vilket är användbart när kolumnnamnen inte ingår i datafilen, som det här CSV-exemplet:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Följande PySpark-exempel visar hur du anger ett schema för dataramen som ska läsas in från en fil med namnet product-data.csv i det här formatet:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('Files/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

Resultatet skulle återigen likna:

ProductID ProductName Kategori ListPrice
771 Mountain-100 Silver, 38 Mountainbikes 3399.9900
772 Mountain-100 Silver, 42 Mountainbikes 3399.9900
773 Mountain-100 Silver, 44 Mountainbikes 3399.9900
... ... ... ...

Dricks

Om du anger ett explicit schema förbättras även prestandan!

Filtrera och gruppera dataramar

Du kan använda metoderna i klassen Dataframe för att filtrera, sortera, gruppera och på annat sätt ändra de data som den innehåller. I följande kodexempel används till exempel metoden select för att hämta kolumnerna ProductID och ListPrice från df-dataramen som innehåller produktdata i föregående exempel:

pricelist_df = df.select("ProductID", "ListPrice")

Resultatet från det här kodexemplet skulle se ut ungefär så här:

ProductID ListPrice
771 3399.9900
772 3399.9900
773 3399.9900
... ...

Gemensamt med de flesta metoder för datamanipulering är att välja returnerar ett nytt dataramobjekt.

Dricks

Att välja en delmängd av kolumner från en dataram är en vanlig åtgärd, som också kan uppnås med hjälp av följande kortare syntax:

pricelist_df = df["ProductID", "ListPrice"]

Du kan "länka" metoder tillsammans för att utföra en serie manipulationer som resulterar i en transformerad dataram. Den här exempelkoden kedjar till exempel urval och var metoder för att skapa en ny dataram som innehåller kolumnerna ProductName och ListPrice för produkter med en kategori av Mountain Bikes eller Road Bikes:

bikes_df = df.select("ProductName", "Category", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

Resultatet från det här kodexemplet skulle se ut ungefär så här:

ProductName Kategori ListPrice
Mountain-100 Silver, 38 Mountainbikes 3399.9900
Road-750 Svart, 52 Vägcyklar 539.9900
... ... ...

Om du vill gruppera och aggregera data kan du använda groupBy-metoden och aggregerade funktioner. Följande PySpark-kod räknar till exempel antalet produkter för varje kategori:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

Resultatet från det här kodexemplet skulle se ut ungefär så här:

Kategori antal
Headset 3
Hjul 14
Mountainbikes 32
... ...

Spara en dataram

Du vill ofta använda Spark för att transformera rådata och spara resultaten för ytterligare analys eller nedströmsbearbetning. I följande kodexempel sparas dataFrame i en parquet-fil i datasjön och ersätter alla befintliga filer med samma namn.

bikes_df.write.mode("overwrite").parquet('Files/product_data/bikes.parquet')

Kommentar

Parquet-formatet föredras vanligtvis för datafiler som du använder för ytterligare analys eller inmatning i ett analysarkiv. Parquet är ett mycket effektivt format som stöds av de flesta storskaliga dataanalyssystem. I själva verket kan ditt krav på datatransformering ibland helt enkelt vara att konvertera data från ett annat format (till exempel CSV) till Parquet!

Partitionera utdatafilen

Partitionering är en optimeringsteknik som gör det möjligt för Spark att maximera prestanda mellan arbetsnoderna. Fler prestandavinster kan uppnås vid filtrering av data i frågor genom att eliminera onödig disk-I/O.

Om du vill spara en dataram som en partitionerad uppsättning filer använder du metoden partitionBy när du skriver data. I följande exempel sparas bikes_df dataram (som innehåller produktdata för kategorierna mountainbikes och landsvägscyklar) och partitioner data efter kategori:

bikes_df.write.partitionBy("Category").mode("overwrite").parquet("Files/bike_data")

Mappnamnen som genereras när en dataram partitioneras innehåller kolumnnamnet och värdet för partitionering i ett column=value-format, så kodexemplet skapar en mapp med namnet bike_data som innehåller följande undermappar:

  • Category=Mountain Bikes
  • Category=Road Bikes

Varje undermapp innehåller en eller flera parquet-filer med produktdata för lämplig kategori.

Kommentar

Du kan partitionera data med flera kolumner, vilket resulterar i en hierarki med mappar för varje partitioneringsnyckel. Du kan till exempel partitionera försäljningsorderdata efter år och månad, så att mapphierarkin innehåller en mapp för varje årsvärde, som i sin tur innehåller en undermapp för varje månadsvärde.

Läsa in partitionerade data

När du läser partitionerade data i en dataram kan du läsa in data från valfri mapp i hierarkin genom att ange explicita värden eller jokertecken för de partitionerade fälten. I följande exempel läses data in för produkter i kategorin Road Bikes :

road_bikes_df = spark.read.parquet('Files/bike_data/Category=Road Bikes')
display(road_bikes_df.limit(5))

Kommentar

De partitioneringskolumner som anges i filsökvägen utelämnas i den resulterande dataramen. Resultaten som skapas av exempelfrågan skulle inte innehålla en kategorikolumn – kategorin för alla rader skulle vara Road Bikes.