使用 Spark 來處理資料檔案
使用 Spark 的優點之一,是您可以使用各種程式設計語言撰寫和執行程式碼,方便運用您既有的程式設計技能,並針對指定的工作使用最適當的語言。 全新 Azure Databricks Spark 筆記本的預設語言是 PySpark (Python 的 Spark 最佳化版本),由於對資料操作和視覺效果提供強式支援,因此資料科學家和分析師常使用此版本。 此外,您也可以使用 Scala (JAVA 衍生語言,能以互動方式使用) 和 SQL (Spark SQL 程式庫中常用的 SQL 語言變體,用於處理關聯式資料結構) 這類語言。 軟體工程師也可以建立使用 JAVA 等架構在 Spark 上執行的編譯解決方案。
使用資料框架探索資料
Spark 會原生使用稱為彈性分散式資料集 (RDD) 的資料結構;但是雖然可以撰寫直接與 RDD 搭配運作的程式碼,但最常用來搭配 Spark 中結構化資料使用的資料結構是資料框架 (隨附於 Spark SQL 程式庫中)。 Spark 中的資料框架類似於通用 Pandas Python 程式庫中的資料框架,但已經過最佳化,可在 Spark 的分散式處理環境中運作。
注意
除了 Dataframe API 外,Spark SQL 還提供 Java 和 Scala 支援的強型別 Dataset API。 本課程模組會聚焦在 Dataframe API。
將資料載入資料框架中
現在來探索假設範例,了解如何使用資料框架來處理資料。 假設您在 Databricks 檔案系統 (DBFS) 儲存體 data 資料夾內名為 products.csv 的逗點分隔文字檔中有下列資料:
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
在 Spark 筆記本中,可以使用下列 PySpark 程式碼將資料載入資料框架,並顯示前 10 列:
%pyspark
df = spark.read.load('/data/products.csv',
format='csv',
header=True
)
display(df.limit(10))
開頭的 %pyspark
行稱為 magic,用於告知 Spark 此儲存格中使用的語言是 PySpark。 以下是產品資料範例的對等 Scala 程式碼:
%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))
magic %spark
用來指定 Scala。
提示
您也可以選取要用於筆記本介面中每個儲存格的語言。
先前顯示的這兩個範例會產生如下的輸出:
ProductID | ProductName | 類別 | ListPrice |
---|---|---|---|
771 | Mountain-100 Silver, 38 | Mountain Bikes | 3399.9900 |
772 | Mountain-100 Silver, 42 | Mountain Bikes | 3399.9900 |
773 | Mountain-100 Silver, 44 | Mountain Bikes | 3399.9900 |
... | ... | ... | ... |
指定資料框架結構描述
在上一個範例中,CSV 檔的第一個資料列中包含資料行名稱,而 Spark 可以從包含的資料推斷每個資料行的資料類型。 您也可以為資料指定明確的結構描述,當資料檔案中未包含資料行名稱時很實用,如下列 CSV 範例所示:
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
下列 PySpark 範例示範如何使用下列格式,指定要從 product-data.csv 檔案載入之資料框架的結構描述:
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('/data/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
結果會再次類似於:
ProductID | ProductName | 類別 | ListPrice |
---|---|---|---|
771 | Mountain-100 Silver, 38 | Mountain Bikes | 3399.9900 |
772 | Mountain-100 Silver, 42 | Mountain Bikes | 3399.9900 |
773 | Mountain-100 Silver, 44 | Mountain Bikes | 3399.9900 |
... | ... | ... | ... |
篩選和分組資料框架
您可以使用資料框架類別方法來篩選、排序、分組,以及操作它所包含的資料。 例如,下列程式碼範例會使用 select 方法,從上一個範例中包含產品資料的 df 資料框架擷取 ProductName 和 ListPrice 資料行:
pricelist_df = df.select("ProductID", "ListPrice")
此程式碼範例的結果看起來會像這樣:
ProductID | ListPrice |
---|---|
771 | 3399.9900 |
772 | 3399.9900 |
773 | 3399.9900 |
... | ... |
大部分資料操作方法的共通點是,select 都會傳回新的資料框架物件。
提示
從資料框架中選取資料行子集是常見的作業,也可以使用下列較短的語法來達成這個目標:
pricelist_df = df["ProductID", "ListPrice"]
您可以將方法「鏈結」在一起,執行一連串的操作,產生轉換的資料框架。 例如,此範例程式碼會鏈結 select 和 where 方法來建立新的資料框架,其中包含類別為 Mountain Bikes 或 Road Bikes 的產品之 ProductName 和 ListPrice 資料行:
bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
此程式碼範例的結果看起來會像這樣:
ProductName | ListPrice |
---|---|
Mountain-100 Silver, 38 | 3399.9900 |
Road-750 Black, 52 | 539.9900 |
... | ... |
若要分組和彙總資料,可以使用 groupBy 方法和彙總函式。 例如,下列 PySpark 程式碼會計算每個類別的產品數目:
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
此程式碼範例的結果看起來會像這樣:
類別 | 計數 |
---|---|
Headsets | 3 |
Wheels | 14 |
Mountain Bikes | 32 |
... | ... |
在 Spark 中使用 SQL 運算式
Dataframe API 是名為 Spark SQL 的 Spark 程式庫的一部分,可讓資料分析師使用 SQL 運算式來查詢及操作資料。
在 Spark 目錄中建立資料庫物件
Spark 目錄是關聯式資料物件 (例如檢視和資料表) 的中繼存放區。 Spark 執行階段可以使用目錄,順暢整合以任何 Spark 支援的語言撰寫的程式碼,以及對某些資料分析師或開發人員而言可能更自然的 SQL 運算式。
若要讓資料框架中的資料可在 Spark 目錄中查詢,其中一個最簡單方式就是建立暫存檢視,如下列程式碼範例所示:
df.createOrReplaceTempView("products")
檢視是暫時性的,這表示它會自動在目前工作階段結束時刪除。 您也可以建立保存於目錄中的資料表,用於定義可使用 Spark SQL 查詢的資料庫。
注意
我們不會在本課程模組中深入探討 Spark 目錄資料表,但值得花一些時間強調幾個重點:
- 您可以使用
spark.catalog.createTable
方法來建立空的資料表。 資料表是中繼資料結構,會將其基礎資料儲存在與目錄相關聯的儲存位置。 刪除資料表也會刪除其基礎資料。 - 您可以使用資料框架的
saveAsTable
方法,將資料框架儲存為資料表。 - 您可以使用 方法來建立 外部 資料表
spark.catalog.createExternalTable
。 外部資料表會定義目錄中的中繼資料,但會從外部儲存位置取得其基礎資料;通常是資料湖中的資料夾。 刪除外部資料表並不會刪除基礎資料。
使用 Spark SQL API 查詢資料
在以任何語言撰寫的程式碼中,都能使用 Spark SQL API 來查詢目錄中的資料。 例如,下列 PySpark 程式碼會使用 SQL 查詢,以資料框架的形式從 products 檢視傳回資料。
bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
FROM products \
WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)
此範例程式碼的結果可能看起來類似下表:
ProductName | ListPrice |
---|---|
Mountain-100 Silver, 38 | 3399.9900 |
Road-750 Black, 52 | 539.9900 |
... | ... |
使用 SQL 程式碼
上述範例示範如何使用 Spark SQL API 在 Spark 程式碼中內嵌 SQL 運算式。 在筆記本中,您也可以使用 %sql
magic 來執行 SQL 程式碼,以查詢目錄中的物件,如下所示:
%sql
SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category
SQL 程式代碼範例會傳回結果集,該結果集會自動顯示在筆記本中做為數據表,如下所示:
類別 | ProductCount |
---|---|
Bib-Shorts | 3 |
Bike Racks | 1 |
Bike Stands | 1 |
... | ... |