在 Spark 資料框架中使用資料
Spark 會原生使用稱為彈性分散式資料集 (RDD) 的資料結構;但是雖然可以撰寫直接與 RDD 搭配運作的程式碼,但最常用來搭配 Spark 中結構化資料使用的資料結構是資料框架 (隨附於 Spark SQL 程式庫中)。 Spark 中的資料框架類似於通用 Pandas Python 程式庫中的資料框架,但已經過最佳化,可在 Spark 的分散式處理環境中運作。
注意
除了 Dataframe API 外,Spark SQL 還提供 Java 和 Scala 支援的強型別 Dataset API。 本課程模組會聚焦在 Dataframe API。
將資料載入資料框架中
現在來探索假設範例,了解如何使用資料框架來處理資料。 假設您在 Lakehouse 的 Files/data 資料夾內名為 products.csv 的逗點分隔文字檔中有下列資料:
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
推斷結構描述
在 Spark 筆記本中,可以使用下列 PySpark 程式碼將檔案資料載入資料框架,並顯示前 10 列:
%%pyspark
df = spark.read.load('Files/data/products.csv',
format='csv',
header=True
)
display(df.limit(10))
開頭的 %%pyspark
行稱為 magic,用於告知 Spark 此儲存格中使用的語言是 PySpark。 您可以在筆記本介面的工具列中選取預設語言,然後使用 magic 來覆寫特定儲存格的該選項。 例如,以下是產品資料範例的對等 Scala 程式碼:
%%spark
val df = spark.read.format("csv").option("header", "true").load("Files/data/products.csv")
display(df.limit(10))
magic %%spark
用來指定 Scala。
這兩個程式碼範例產生輸出類似於:
ProductID | ProductName | 類別 | ListPrice |
---|---|---|---|
771 | Mountain-100 Silver, 38 | Mountain Bikes | 3399.9900 |
772 | Mountain-100 Silver, 42 | Mountain Bikes | 3399.9900 |
773 | Mountain-100 Silver, 44 | Mountain Bikes | 3399.9900 |
... | ... | ... | ... |
指定明確的結構描述
在上一個範例中,CSV 檔的第一個資料列中包含資料行名稱,而 Spark 可以從包含的資料推斷每個資料行的資料類型。 您也可以為資料指定明確的結構描述,當資料檔案中未包含資料行名稱時很實用,如下列 CSV 範例所示:
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
下列 PySpark 範例示範如何使用下列格式,指定要從 product-data.csv 檔案載入之資料框架的結構描述:
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('Files/data/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
結果會再次類似於:
ProductID | ProductName | 類別 | ListPrice |
---|---|---|---|
771 | Mountain-100 Silver, 38 | Mountain Bikes | 3399.9900 |
772 | Mountain-100 Silver, 42 | Mountain Bikes | 3399.9900 |
773 | Mountain-100 Silver, 44 | Mountain Bikes | 3399.9900 |
... | ... | ... | ... |
提示
指定明確的結構描述也會改善效能!
篩選和分組資料框架
您可以使用資料框架類別方法來篩選、排序、分組,以及操作它所包含的資料。 例如,下列程式碼範例會使用 select 方法,從上一個範例中包含產品資料的 df 資料框架擷取 ProductID 和 ListPrice 資料行:
pricelist_df = df.select("ProductID", "ListPrice")
此程式碼範例的結果看起來會像這樣:
ProductID | ListPrice |
---|---|
771 | 3399.9900 |
772 | 3399.9900 |
773 | 3399.9900 |
... | ... |
大部分資料操作方法的共通點是,select 都會傳回新的資料框架物件。
提示
從資料框架中選取資料行子集是常見的作業,也可以使用下列較短的語法來達成這個目標:
pricelist_df = df["ProductID", "ListPrice"]
您可以將方法「鏈結」在一起,執行一連串的操作,產生轉換的資料框架。 例如,此範例程式碼會鏈結 select 和 where 方法來建立新的資料框架,其中包含類別為 Mountain Bikes 或 Road Bikes 的產品之 ProductName 和 ListPrice 資料行:
bikes_df = df.select("ProductName", "Category", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
此程式碼範例的結果看起來會像這樣:
ProductName | 類別 | ListPrice |
---|---|---|
Mountain-100 Silver, 38 | Mountain Bikes | 3399.9900 |
Road-750 Black, 52 | 公路車 | 539.9900 |
... | ... | ... |
若要分組和彙總資料,可以使用 groupBy 方法和彙總函式。 例如,下列 PySpark 程式碼會計算每個類別的產品數目:
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
此程式碼範例的結果看起來會像這樣:
類別 | 計數 |
---|---|
Headsets | 3 |
Wheels | 14 |
Mountain Bikes | 32 |
... | ... |
儲存資料框架
您通常會想要使用 Spark 來轉換未經處理資料,並儲存結果以供進一步分析或下游處理。 下列程式碼範例會將 dataFrame 儲存至資料湖中的 parquet 檔案,並取代同名的任何現有檔案。
bikes_df.write.mode("overwrite").parquet('Files/product_data/bikes.parquet')
注意
Parquet 通常是您用來進一步分析或擷取至分析存放區之資料檔案的慣用格式。 Parquet 是非常有效率的格式,大部分大規模資料分析系統都支援此格式。 事實上,有時候資料轉換需求可能只是將資料從另一種格式 (例如 CSV) 轉換成 Parquet!
分割輸出檔案
資料分割是一種最佳化技術,可讓 Spark 將背景工作角色節點的效能最大化。 藉由排除不必要的磁碟 IO,即可在查詢中篩選資料時提升更多效能。
若要將資料框架儲存為分割的檔案集,請在寫入資料時使用 partitionBy 方法。 下列範例會儲存 bikes_df 資料框架 (其中包含山地自行車和公路自行車類別的產品資料),並依類別分割資料:
bikes_df.write.partitionBy("Category").mode("overwrite").parquet("Files/bike_data")
分割資料框架時所產生的資料夾名稱包含資料行=值格式的分割資料行名稱和值,因此程式碼範例會建立名為 bike_data 的資料夾,其中包含下列子資料夾:
- 類別=公路自行車
- 類別=山地自行車
每個子資料夾都包含一或多個 Parquet 檔案,其中包含適當類別的產品資料。
注意
您可以依多個資料行分割資料,這會導致每個資料分割索引鍵的資料夾階層。 例如,您可能會依年份和月份分割銷售訂單資料,讓資料夾階層包含每年值的資料夾,而資料夾階層又會包含每個月值的子資料夾。
載入分割的資料
將分割的資料讀取到資料框架時,您可以指定用於分割欄位的明確值或萬用字元,以從階層內的任何資料夾載入資料。 下列範例會在 [公路自行車] 類別中載入產品的資料:
road_bikes_df = spark.read.parquet('Files/bike_data/Category=Road Bikes')
display(road_bikes_df.limit(5))
注意
產生的資料框架中會省略檔案路徑中指定的資料分割資料行。 範例查詢所產生的結果不會包含 [類別] 資料行,所有資料列的類別會是公路自行車。