修改及儲存資料框架
Apache Spark 提供資料框架物件作為處理資料的主要結構。 您可以使用資料框架來查詢和轉換資料,並將結果保存在資料湖中。 若要將資料載入資料框架,您可以使用 spark.read 函數,指定要讀取資料的檔案格式、路徑,以及選擇性地指定要讀取資料的結構描述。 例如,下列程式碼會將 orders 資料夾中所有 .csv 檔案的資料載入至名為 order_details 的資料框架,然後顯示前五筆記錄。
order_details = spark.read.csv('/orders/*.csv', header=True, inferSchema=True)
display(order_details.limit(5))
轉換資料結構
將來源資料載入資料框架之後,您可以使用資料框架物件的方法和 Spark 函數來加以轉換。 資料框架上的一般作業包括:
- 篩選資料列和資料行
- 重新命名資料行
- 建立新的資料行,通常衍生自現有的資料行
- 取代 Null 或其他值
在下列範例中,程式碼會使用 split
函數,將 CustomerName 資料行中的值分成名為 FirstName 和 LastName 的兩個新資料行。 然後其會使用 drop
方法,來刪除原始 CustomerName 資料行。
from pyspark.sql.functions import split, col
# Create the new FirstName and LastName fields
transformed_df = order_details.withColumn("FirstName", split(col("CustomerName"), " ").getItem(0)).withColumn("LastName", split(col("CustomerName"), " ").getItem(1))
# Remove the CustomerName field
transformed_df = transformed_df.drop("CustomerName")
display(transformed_df.limit(5))
您可以使用 Spark SQL 程式庫的完整功能來轉換資料,方法是篩選資料列、衍生、移除、重新命名資料行,以及套用其他必要的資料修改。
儲存轉換的資料
DataFrame 在必要結構中之後,您可以將結果儲存為資料湖中支援的格式。
下列程式碼範例會將 dataFrame 儲存至資料湖中的 parquet 檔案,並取代同名的任何現有檔案。
transformed_df.write.mode("overwrite").parquet('/transformed_data/orders.parquet')
print ("Transformed data saved!")
注意
Parquet 通常是您用來進一步分析或擷取至分析存放區之資料檔案的慣用格式。 Parquet 是非常有效率的格式,大部分大規模資料分析系統都支援此格式。 事實上,有時候資料轉換需求可能只是將資料從另一種格式 (例如 CSV) 轉換成 Parquet!