共用方式為


PySpark 基本概念

本文將逐步解說簡單的範例,以說明 PySpark 的使用方式。 它假設您瞭解基本的 Apache Spark 概念, 並在連線至計算的 Azure Databricks 筆記本 中執行命令。 您可以使用範例數據建立 DataFrame、執行基本轉換,包括此數據上的數據列和數據行作業、結合多個 DataFrame 並匯總此數據、可視化此數據,然後將它儲存至數據表或檔案。

上傳資料

本文中的一些範例會使用 Databricks 提供的範例數據來示範如何使用 DataFrame 來載入、轉換及儲存數據。 如果您想要使用尚未在 Databricks 中的數據,您可以先上傳它,並從中建立 DataFrame。 請參閱 使用檔案上傳和將檔案上傳Unity 目錄磁碟區來建立或修改數據表。

關於 Databricks 範例數據

Databricks 提供目錄中和目錄中的samples範例數據/databricks-datasets

  • 若要存取目錄中的 samples 範例數據,請使用 格式 samples.<schema-name>.<table-name>。 本文使用架構中的 samples.tpch 數據表,其中包含虛構企業的數據。 數據表 customer 包含客戶的相關信息,並 orders 包含那些客戶下訂單的相關信息。
  • 使用 dbutils.fs.ls 來探索 中的數據 /databricks-datasets。 使用 Spark SQL 或 DataFrames,使用檔案路徑查詢此位置中的數據。 若要深入瞭解 Databricks 提供的範例數據,請參閱 範例數據集

匯入數據類型

許多 PySpark 作業都需要您使用 SQL 函式或與原生 Spark 類型互動。 您可以直接匯入所需的函式和類型,也可以匯入整個模組。

# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *

# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round

由於某些匯入的函式可能會覆寫 Python 內建函式,因此有些用戶選擇使用別名匯入這些模組。 下列範例顯示 Apache Spark 程式代碼範例中使用的常見別名:

import pyspark.sql.types as T
import pyspark.sql.functions as F

如需完整的數據類型清單,請參閱 Spark 資料類型

如需 PySpark SQL 函式的完整清單,請參閱 Spark 函式

建立 DataFrame

有數種方式可以建立DataFrame。 您通常會針對數據源定義 DataFrame,例如數據表或檔案集合。 然後,如 Apache Spark 基本概念一節中所述,使用 動作,例如 display,來觸發要執行的轉換。 方法 display 會輸出DataFrame。

建立具有指定值的數據框架

若要建立具有指定值的 DataFrame,請使用 createDataFrame 方法,其中數據列會以 Tuple 列表表示:

df_children = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = ['name', 'age'])
display(df_children)

請注意,輸出中會自動推斷 的 df_children 數據類型。 您也可以藉由新增架構來指定類型。 架構是由 指定名稱、數據類型和布爾值旗標所StructType組成的 來定義StructFields,指出它們是否包含 Null 值。 您必須從 匯入 pyspark.sql.types資料類型。

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

df_children_with_schema = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = StructType([
    StructField('name', StringType(), True),
    StructField('age', IntegerType(), True)
  ])
)
display(df_children_with_schema)

從 Unity 目錄中的數據表建立 DataFrame

若要從 Unity 目錄中的數據表建立 DataFrame,請使用 table 使用 格式 <catalog-name>.<schema-name>.<table-name>識別數據表的方法。 按下左側導覽列上的 [目錄],以使用目錄管流覽至您的數據表。 按兩下它,然後選取 [ 複製資料表路徑],將資料表路徑 插入筆記本中。

下列範例會載入 數據表 samples.tpch.customer,但您也可以提供您自己的數據表路徑。

df_customer = spark.table('samples.tpch.customer')
display(df_customer)

從上傳的檔案建立DataFrame

若要從您上傳至 Unity 目錄磁碟區的檔案建立 DataFrame,請使用 read 屬性。 這個方法會傳 DataFrameReader回 ,然後您可以使用它來讀取適當的格式。 按兩下左側小提要欄上的目錄選項,並使用目錄瀏覽器來尋找您的檔案。 選取它,然後按兩下 [ 複製磁碟區檔案路徑]。

下列範例會從 *.csv 檔案讀取,但 DataFrameReader 支援以許多其他格式上傳檔案。 請參閱 DataFrameReader 方法

# Assign this variable your full volume file path
volume_file_path = ""

df_csv = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load(volume_file_path)
)
display(df_csv)

如需 Unity 目錄磁碟區的詳細資訊,請參閱 什麼是 Unity 目錄磁碟區?

從 JSON 回應建立 DataFrame

若要從 REST API 傳回的 JSON 回應承載建立 DataFrame,請使用 Python requests 套件來查詢和剖析回應。 您必須匯入套件才能使用它。 此範例使用來自 美國 食品和藥物管理局藥物應用資料庫的數據。

import requests

# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)

# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)

如需在 Databricks 上使用 JSON 和其他半結構化數據的相關信息,請參閱 模型半結構化數據

選取 JSON 欄位或物件

若要從轉換的 JSON 中選取特定欄位或物件,請使用 [] 表示法。 例如,若要選取 products 本身是產品數位的欄位:

display(df_drugs.select(df_drugs["products"]))

您也可以將方法呼叫鏈結在一起,以周遊多個字段。 例如,若要輸出藥物應用程式中第一個產品的品牌名稱:

display(df_drugs.select(df_drugs["products"][0]["brand_name"]))

從檔案建立 DataFrame

為了示範從檔案建立 DataFrame,此範例會在 /databricks-datasets 目錄中載入 CSV 數據。

若要流覽至範例數據集,您可以使用 Databricks Utilties 檔系統命令。 下列範例會使用 dbutils 來列出 中 /databricks-datasets可用的數據集:

display(dbutils.fs.ls('/databricks-datasets'))

或者,您可以使用 %fs 來存取 Databricks CLI 檔案系統命令,如下列範例所示:

%fs ls '/databricks-datasets'

若要從檔案或檔案目錄建立 DataFrame,請在 方法中 load 指定路徑:

df_population = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)

使用 DataFrame 轉換數據

DataFrame 可讓您輕鬆地使用內建方法來排序、篩選和匯總數據來轉換數據。 許多轉換未指定為 DataFrame 上的方法,而是在封裝中 spark.sql.functions 提供。 請參閱 Databricks Spark SQL 函式

資料行作業

Spark 提供許多基本資料行作業:

提示

若要輸出 DataFrame 中的所有資料列,請使用 columns,例如 df_customer.columns

選取資料行

您可以使用 和 select選取特定資料列col。 函 col 式位於子模組中 pyspark.sql.functions

from pyspark.sql.functions import col

df_customer.select(
  col("c_custkey"),
  col("c_acctbal")
)

您也可以參考使用 expr 將資料列定義為字串的運算式:

from pyspark.sql.functions import expr

df_customer.select(
  expr("c_custkey"),
  expr("c_acctbal")
)

您也可以使用 selectExpr,其接受 SQL 運算式:

df_customer.selectExpr(
  "c_custkey as key",
  "round(c_acctbal) as account_rounded"
)

若要使用字串常值選取資料行,請執行下列動作:

df_customer.select(
  "c_custkey",
  "c_acctbal"
)

若要從特定 DataFrame 明確選取資料行,您可以使用 [] 運算子或 . 運算符。 (運算子 . 無法用來選取以整數開頭的數據行,或是包含空格或特殊字元的數據行。當您聯結某些數據行具有相同名稱的 DataFrame 時,這特別有用。

df_customer.select(
  df_customer["c_custkey"],
  df_customer["c_acctbal"]
)
df_customer.select(
  df_customer.c_custkey,
  df_customer.c_acctbal
)

建立資料行

若要建立新的數據行,請使用 withColumn 方法。 下列範例會根據客戶帳戶餘額 c_acctbal 是否超過 1000,建立包含布爾值的新數據行:

df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)

重新命名資料列

若要重新命名數據行,請使用 withColumnRenamed 方法,此方法會接受現有的和新資料行名稱:

df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")

當您想要將資料行重新命名為匯總的一部分時,此方法 alias 特別有用:

from pyspark.sql.functions import avg

df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)

display(df_segment_balance)

轉換數據行類型

在某些情況下,您可能想要變更 DataFrame 中一或多個數據行的數據類型。 若要這樣做,請使用 cast 方法在數據行數據類型之間轉換。 下列範例示範如何使用 方法來參考數據行, col 將數據行從整數轉換成字串類型:

from pyspark.sql.functions import col

df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))

移除欄

若要移除資料行,您可以在選取期間省略數據行,或使用 select(*) exceptdrop 方法:

df_customer_flag_renamed.drop("balance_flag_renamed")

您也可以卸除多個資料列:

df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")

資料列作業

Spark 提供許多基本資料列作業:

篩選資料列

若要篩選數據列,請使用 filter DataFrame 上的 或 where 方法只傳回特定數據列。 若要識別要篩選的數據行,請使用 col 評估為數據行的方法或表達式。

from pyspark.sql.functions import col

df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)

若要篩選多個條件,請使用邏輯運算元。 例如, &| 可讓您分別使用 ANDOR 條件。 下列範例會篩選 等於 且 大於 的數據列c_nationkey20c_acctbal1000

df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))

拿掉重複的數據列

若要取消重複的數據列,請使用 distinct,它只會傳回唯一的數據列。

df_unique = df_customer.distinct()

處理 Null 值

若要處理 Null 值,請使用 na.drop 方法卸除包含 Null 值的數據列。 這個方法可讓您指定是否要卸除包含 any Null 值或 all Null 值的數據列。

若要卸除任何 Null 值,請使用下列其中一個範例。

df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")

如果您只想要篩選出包含所有 Null 值的資料列,請使用下列專案:

df_customer_no_nulls = df_customer.na.drop("all")

您可以藉由指定這個來套用此項目來套用資料行子集,如下所示:

df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])

若要填入遺漏的值,請使用 fill 方法。 您可以選擇將此套用至所有資料行或資料行子集。 在下列範例中,具有其帳戶餘額 Null 值的帳戶餘額 c_acctbal 會填入 0

df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])

若要以其他值取代字串,請使用 replace 方法。 在下列範例中,任何空的位址字串會取代為 :UNKNOWN

df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])

附加數據列

若要附加數據列,您必須使用 union 方法來建立新的 DataFrame。 在下列範例中,先前建立並df_that_one_customer合併的 DataFrame df_filtered_customer 會傳回具有三個客戶的 DataFrame:

df_appended_rows = df_that_one_customer.union(df_filtered_customer)

display(df_appended_rows)

注意

您也可以將數據框架寫入資料表,然後附加新的數據列,藉以合併數據框架。 針對生產工作負載,以累加方式將數據源處理至目標數據表,可大幅降低數據大小成長時延遲和計算成本。 請參閱 將數據內嵌至 Databricks Lakehouse

排序資料列

重要

排序規模可能很昂貴,而且如果您儲存已排序的數據並使用Spark重載數據,則不保證順序。 請確定您刻意使用排序。

若要依一或多個數據行排序數據列,請使用 sortorderBy 方法。 根據預設,這些方法會以遞增順序排序:

df_customer.orderBy(col("c_acctbal"))

若要依遞減順序篩選,請使用 desc

df_customer.sort(col("c_custkey").desc())

下列範例示範如何排序兩個數據行:

df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())

若要限制在排序 DataFrame 之後要傳回的數據列數目,請使用 limit 方法。 下列範例只會顯示最上層 10 的結果:

display(df_sorted.limit(10))

聯結 DataFrame

若要聯結兩個或多個 DataFrame,請使用 join 方法。 您可以指定要如何將 DataFrames 聯結在 how (聯結類型) 和 on (以聯結為基底的數據行) 參數中聯結。 常見的聯結類型包括:

  • inner:這是聯結類型預設值,它會傳回DataFrame,其只會保留數據列,其中與DataFrames中的參數相符 on
  • left:這會保留第一個指定之 DataFrame 的所有數據列,並且只保留與第一個數據框架相符之第二個指定之 DataFrame 的數據列。
  • outer:無論相符項目為何,外部聯結都會保留來自這兩個 DataFrame 的所有數據列。

如需聯結的詳細資訊,請參閱 使用 Azure Databricks 上的聯結。 如需 PySpark 中支援的聯結清單,請參閱 DataFrame 聯結

下列範例會傳回單一 DataFrame,其中 DataFrame 的每個數據列 orders 都會與 DataFrame 中的 customers 對應數據列聯結。 內部聯結會使用,因為預期每個訂單都對應至一個客戶。

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_joined = df_order.join(
  df_customer,
  on = df_order["o_custkey"] == df_customer["c_custkey"],
  how = "inner"
)

display(df_joined)

若要在多個條件上聯結,請使用 和 &|布爾運算子分別指定 ANDOR。 下列範例會新增其他條件,只篩選大於o_totalprice的數據列500,000

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_complex_joined = df_order.join(
  df_customer,
  on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
  how = "inner"
)

display(df_complex_joined)

匯總數據

若要匯總 DataFrame 中的數據,類似於 GROUP BY SQL 中的 ,請使用 groupBy 方法來指定要分組的數據行,以及 agg 指定要指定匯總的方法。 從 匯入一般匯總,包括 avgsummaxminpyspark.sql.functions 下列範例顯示依市場區段的平均客戶餘額:

from pyspark.sql.functions import avg

# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_balance)
from pyspark.sql.functions import avg

# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_nation_balance)

某些匯總是動作,這表示它們會觸發計算。 在此情況下,您不需要使用其他動作來輸出結果。

若要計算 DataFrame 中的數據列,請使用 count 方法:

df_customer.count()

鏈結呼叫

轉換 DataFrame 的方法會傳回 DataFrame,而且在呼叫動作之前,Spark 不會處理轉換。 這種 延遲評估 表示您可以鏈結多個方法,以方便和可讀性。 下列範例示範如何鏈結篩選、匯總和排序:

from pyspark.sql.functions import count

df_chained = (
    df_order.filter(col("o_orderstatus") == "F")
    .groupBy(col("o_orderpriority"))
    .agg(count(col("o_orderkey")).alias("n_orders"))
    .sort(col("n_orders").desc())
)

display(df_chained)

將 DataFrame 視覺化

若要在筆記本中可視化 DataFrame,請按兩下 + DataFrame 左上方表格旁的符號,然後選取 [視覺效果 ] 以根據您的 DataFrame 新增一或多個圖表。 如需視覺效果的詳細資訊,請參閱 Databricks 筆記本中的視覺效果。

display(df_order)

若要執行其他視覺效果,Databricks 建議使用適用於Spark的 pandas API。 .pandas_api()可讓您轉換成 Spark DataFrame 的對應 pandas API。 如需詳細資訊,請參閱 Spark上的 Pandas API。

儲存您的資料

轉換數據之後,您就可以使用 DataFrameWriter 方法加以儲存。 您可以在 DataFrameWriter 中找到這些方法的完整清單。 下列各節說明如何將 DataFrame 儲存為數據表和數據檔集合。

將 DataFrame 儲存為數據表

若要將 DataFrame 儲存為 Unity 目錄中的數據表,請使用 write.saveAsTable 方法,並以 格式 <catalog-name>.<schema-name>.<table-name>指定路徑。

df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")

將您的 DataFrame 寫入 CSV

若要撰寫 DataFrame 來 *.csv 格式化,請使用 write.csv 方法,並指定格式和選項。 根據預設,如果數據存在於指定的路徑,寫入作業就會失敗。 您可以指定要採取不同動作的下列其中一種模式:

  • overwrite 使用 DataFrame 內容覆寫目標路徑中的所有現有數據。
  • append 將 DataFrame 的內容附加至目標路徑中的數據。
  • ignore 如果目標路徑中有數據,則以無訊息方式失敗寫入。

下列範例示範使用 DataFrame 內容以 CSV 檔案覆寫數據:

# Assign this variable your file path
file_path = ""

(df_joined.write
  .format("csv")
  .mode("overwrite")
  .write(file_path)
)

下一步

若要在 Databricks 上運用更多 Spark 功能,請參閱: