教學課程:使用 Apache Spark DataFrames 載入並轉換資料
本教學課程介紹如何使用 Azure Databricks 的 Apache Spark Python (PySpark) DataFrame API、Apache Spark Scala DataFrame API 及 SparkR SparkDataFrame API 載入並轉換資料。
在本教學課程結束時,您將了解什麼是 DataFrame 並熟悉以下工作:
Python
- 定義變數並將公用數據複製到 Unity Catalog 磁碟區
- 使用 Python 建立 DataFrame
- 將資料從 CSV 檔案載入至 DataFrame
- 檢視 DataFrame 並與其互動
- 儲存 DataFrame
- 在 PySpark 執行 SQL 查詢
另請參閱Apache Spark PySpark API 參照。
Scala
- 定義變數並將公用數據複製到 Unity Catalog 磁碟區
- 使用 Scala 建立 DataFrame
- 將資料從 CSV 檔案載入至 DataFrame
- 檢視 DataFrame 並與其互動
- 儲存 DataFrame
- 在 Apache Spark 執行 SQL 查詢
另請參閱Apache Spark Scala API 參照.
R
- 定義變數並將公用數據複製到 Unity Catalog 磁碟區
- 建立 SparkR SparkDataFrames
- 將資料從 CSV 檔案載入至 DataFrame
- 檢視 DataFrame 並與其互動
- 儲存 DataFrame
- 在 SparkR 執行 SQL 查詢
另請參閱Apache SparkR API 參照.
什麼是 DataFrame?
DataFrame 是一個可能包含不同類型之 columns 的二維標籤數據結構。 您可以將 DataFrame 想像成電子表格、SQL table或數列物件的字典。 Apache Spark DataFrame 提供豐富的函式 set(selectcolumns、篩選、join、匯總),可讓您有效率地解決常見的數據分析問題。
Apache Spark DataFrame 是以彈性分散式資料集 (RDD) 為基礎建置的抽象概念。 Spark DataFrame 和 Spark SQL 使用統一的規劃和優化引擎,可讓您在 Azure Databricks (Python、SQL、Scala 和 R) 上,get 所有支援語言的幾乎完全相同的效能。
需求
要完成以下教學課程,您必須滿足以下要求:
若要使用本教學課程中的範例,您的工作區必須已啟用 Unity Catalog。
本教學課程中的範例會使用 Unity Catalog磁碟區 來儲存範例數據。 若要使用這些範例,請建立磁碟區,並使用該磁碟區的 catalog、schema和磁碟區名稱來 set 範例所使用的磁碟區路徑。
您必須在 Unity Catalog中具有下列權限:
-
READ VOLUME
和WRITE VOLUME
、或ALL PRIVILEGES
本教學課程所使用的磁碟區。 - 本教學課程中使用的 schema 為
USE SCHEMA
或ALL PRIVILEGES
。 - 本教學課程使用的 catalog 是
USE CATALOG
或ALL PRIVILEGES
。
若要 set 這些許可權,請聯絡 Databricks 系統管理員或查看 Unity Catalog 權限和安全性可控物件。
-
提示
有關本文的完整筆記本,請參閱DataFrame 教學課程筆記本。
步驟 1:定義變數並載入 CSV 檔案
此步驟會定義變數以供本教學課程使用,然後載入包含嬰兒名稱數據的 CSV 檔案,該數據從 health.data.ny.gov 加載到您的 Unity Catalog 儲存空間中。
按一下圖示以開啟新筆記本。 若要瞭解如何流覽 Azure Databricks 筆記本,請參閱 自定義筆記本外觀。
請將下列程式碼複製並貼到全新空白筆記本資料格。 以 Unity Catalog 磁碟區的 catalog、schema和磁碟區名稱取代
<catalog-name>
、<schema-name>
和<volume-name>
。 以您選擇的 table 名稱取代<table_name>
。 稍後在本教學課程中,您會將嬰兒名稱數據載入此 table。Python
catalog = "<catalog_name>" schema = "<schema_name>" volume = "<volume_name>" download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name = "rows.csv" table_name = "<table_name>" path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume path_table = catalog + "." + schema print(path_table) # Show the complete path print(path_volume) # Show the complete path
Scala
val catalog = "<catalog_name>" val schema = "<schema_name>" val volume = "<volume_name>" val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" val fileName = "rows.csv" val tableName = "<table_name>" val pathVolume = s"/Volumes/$catalog/$schema/$volume" val pathTable = s"$catalog.$schema" print(pathVolume) // Show the complete path print(pathTable) // Show the complete path
R
catalog <- "<catalog_name>" schema <- "<schema_name>" volume <- "<volume_name>" download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name <- "rows.csv" table_name <- "<table_name>" path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "") path_table <- paste(catalog, ".", schema, sep = "") print(path_volume) # Show the complete path print(path_table) # Show the complete path
按
Shift+Enter
以執行資料格並建立新的空白資料格。請將下列程式碼複製並貼到全新空白筆記本資料格。 此程式碼會使用 Databricks dbutuils 命令,將
rows.csv
檔案從 health.data.ny.gov 拷貝到 Unity Catalog 磁碟區。Python
dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
Scala
dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
R
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
按
Shift+Enter
執行此資料格,然後移至下一個資料格。
步驟 2:建立 DataFrame
此步驟建立一個以測試資料命名df1
的 DataFrame,然後顯示其內容。
請將下列程式碼複製並貼到全新空白筆記本資料格。 此程式代碼會使用測試數據建立 DataFrame,然後顯示 DataFrame 的內容和 schema。
Python
data = [[2021, "test", "Albany", "M", 42]] columns = ["Year", "First_Name", "County", "Sex", "Count"] df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int") display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
Scala
val data = Seq((2021, "test", "Albany", "M", 42)) val columns = Seq("Year", "First_Name", "County", "Sex", "Count") val df1 = data.toDF(columns: _*) display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization. // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
R
# Load the SparkR package that is already preinstalled on the cluster. library(SparkR) data <- data.frame( Year = as.integer(c(2021)), First_Name = c("test"), County = c("Albany"), Sex = c("M"), Count = as.integer(c(42)) ) df1 <- createDataFrame(data) display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
按
Shift+Enter
執行此資料格,然後移至下一個資料格。
步驟 3:將資料從 CSV 檔案載入至 DataFrame
此步驟會從您先前載入 Unity Catalog 磁碟區的 CSV 檔案建立名為 df_csv
的 DataFrame。 請參閱spark.read.csv。
請將下列程式碼複製並貼到全新空白筆記本資料格。 此程式碼將嬰兒名稱資料
df_csv
從 CSV 檔案載入至 DataFrame,然後展示 DataFrame 的內容。Python
df_csv = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",") display(df_csv)
Scala
val dfCsv = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", ",") .csv(s"$pathVolume/$fileName") display(dfCsv)
R
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""), source="csv", header = TRUE, inferSchema = TRUE, delimiter = ",") display(df_csv)
按
Shift+Enter
執行此資料格,然後移至下一個資料格。
您可以從許多支援的檔案格式載入資料。
步驟 4:檢視 DataFrame 並與其互動
使用以下方法檢視您的寶寶姓名 DataFrames 並與其互動。
列印 DataFrame schema
瞭解如何顯示 Apache Spark DataFrame 的 schema。 Apache Spark 會使用 schema 一詞來參考 DataFrame 中 columns 的名稱和數據類型。
注意
Azure Databricks 也會使用 schema 一詞來描述註冊至 catalog的 tables 集合。
請將下列程式碼複製並貼到您空白筆記本資料格。 此程式代碼會使用
.printSchema()
方法來顯示 DataFrames 的 schema,以檢視兩個 DataFrame 的架構, 以準備將兩個 DataFrame 聯集。Python
df_csv.printSchema() df1.printSchema()
Scala
dfCsv.printSchema() df1.printSchema()
R
printSchema(df_csv) printSchema(df1)
按
Shift+Enter
執行此資料格,然後移至下一個資料格。
在 DataFrame 中重新命名 column
瞭解如何在 DataFrame 中重新命名 column。
請將下列程式碼複製並貼到您空白筆記本資料格。 此程式代碼會重新命名
df1_csv
DataFrame 中的 column,以符合df1
DataFrame 中的個別 column。 此程式碼使用 Apache SparkwithColumnRenamed()
方法。Python
df_csv = df_csv.withColumnRenamed("First Name", "First_Name") df_csv.printSchema
Scala
val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name") // when modifying a DataFrame in Scala, you must assign it to a new variable dfCsvRenamed.printSchema()
R
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name") printSchema(df_csv)
按
Shift+Enter
執行此資料格,然後移至下一個資料格。
合併 DataFrames
了解如何建立新的 DataFrame,將一個 DataFrame 的行列新增至另一個 DataFrame。
請將下列程式碼複製並貼到您空白筆記本資料格。 此程式碼使用 Apache Spark
union()
方法將第一個 DataFrame 的內容df
與df_csv
包含從 CSV 檔案載入的嬰兒姓名資料的 DataFrame 合併。Python
df = df1.union(df_csv) display(df)
Scala
val df = df1.union(dfCsvRenamed) display(df)
R
display(df <- union(df1, df_csv))
按
Shift+Enter
執行此資料格,然後移至下一個資料格。
篩選 DataFrame 的行列
使用 Apache Spark .filter()
或 .where()
方法來篩選數據列,以探索數據 set 中最受歡迎的嬰兒名稱。 使用篩選來 select 行子集,以在資料框中傳回或修改。 效能或語法沒有差異,如以下範例所示。
使用 .filter() 方法
請將下列程式碼複製並貼到您空白筆記本資料格。 此程式碼使用 Apache Spark
.filter()
方法來顯示 DataFrame 計數超過 50 的那些行列。Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))
按
Shift+Enter
執行此資料格,然後移至下一個資料格。
使用where() 方法
請將下列程式碼複製並貼到您空白筆記本資料格。 此程式碼使用 Apache Spark
.where()
方法來顯示 DataFrame 計數超過 50 的那些行列。Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))
按
Shift+Enter
執行此資料格,然後移至下一個資料格。
從 DataFrame 中提取 Select和columns,然後依頻率排序
瞭解如何使用select()
方法來確定要從DataFrame中返回的嬰兒名字頻率columns。 使用 Apache Spark orderby
和desc
函式來排序結果。
Apache Spark 的 pyspark.sql 模組提供對 SQL 函式的支援。 我們在本教學課程使用的這些函式包括 Apache Spark orderBy()
、desc()
、以及expr()
函式。 您可以根據需要將這些函式匯入工作階段來啟用這些功能。
請將下列程式碼複製並貼到您空白筆記本資料格。 此程式碼匯入
desc()
函式,然後使用 Apache Sparkselect()
方法與 Apache SparkorderBy()
以及desc()
函式,按遞減順序顯示最常見的名稱及其計數。Python
from pyspark.sql.functions import desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
Scala
import org.apache.spark.sql.functions.desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
R
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
按
Shift+Enter
執行此資料格,然後移至下一個資料格。
建立子集合 DataFrame
了解如何從現有 DataFrame 建立子集合 DataFrame。
請將下列程式碼複製並貼到您空白筆記本資料格。 此程式碼使用 Apache Spark
filter
方法建立新的 DataFrame,按年份、計數及性別限制資料。 它使用 Apache Sparkselect()
方法來對 columns進行 limit 。 它還使用 Apache SparkorderBy()
與desc()
函式按計數對新的 DataFrame 進行排序。Python
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
Scala
val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
R
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count") display(subsetDF)
按
Shift+Enter
執行此資料格,然後移至下一個資料格。
步驟 5:儲存 DataFrame
了解如何儲存 DataFrame。 您可以將 DataFrame 儲存至 table,或將 DataFrame 寫入一個或多個檔案。
將 DataFrame 儲存至 "table"
根據預設,Azure Databricks 會針對所有 tables 使用 Delta Lake 格式。 若要儲存 DataFrame,您必須具有在 catalog 和 schema上的 CREATE
table 許可權。
請將下列程式碼複製並貼到您空白筆記本資料格。 此程式代碼會使用您在本教學課程開頭定義的變數,將 DataFrame 的內容儲存至 table。
Python
df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
Scala
df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
R
saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
按
Shift+Enter
執行此資料格,然後移至下一個資料格。
大多數 Apache Spark 應用程式都以分散式方式處理大型資料集。 Apache Spark 會寫出檔案目錄,而非單一檔案。 Delta Lake 可拆分 Parquet 資料夾與檔案。 許多資料系統可以讀取這些檔案目錄。 Azure Databricks 建議針對大多數應用程式使用檔案路徑 tables。
將 DataFrame 儲存至 JSON 檔案
請將下列程式碼複製並貼到您空白筆記本資料格。 此程式碼將 DataFrame 儲存至 JSON 檔案目錄。
Python
df.write.format("json").mode("overwrite").save("/tmp/json_data")
Scala
df.write.format("json").mode("overwrite").save("/tmp/json_data")
R
write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
按
Shift+Enter
執行此資料格,然後移至下一個資料格。
從 JSON 檔案讀取 DataFrame
了解如何使用 Apache Spark spark.read.format()
方法將 JSON 資料從目錄讀取至 DataFrame。
請將下列程式碼複製並貼到您空白筆記本資料格。 此程式碼顯示您在先前範例儲存的 JSON 檔案。
Python
display(spark.read.format("json").json("/tmp/json_data"))
Scala
display(spark.read.format("json").json("/tmp/json_data"))
R
display(read.json("/tmp/json_data"))
按
Shift+Enter
執行此資料格,然後移至下一個資料格。
其他工作:在 PySpark、Scala 及 R 中執行 SQL 查詢
Apache Spark DataFrames 提供以下選項將 SQL 與 PySpark、Scala 及 R 結合。您可以在為本教學課程建立的同一筆記本中執行以下程式碼。
將 column 指定為 SQL 查詢
了解如何使用 Apache Spark selectExpr()
方法。 這是select()
方法的變體,可接受 SQL 運算式並傳回更新的 DataFrame。 該方法讓您可使用 SQL 運算式,例如upper
。
請將下列程式碼複製並貼到您空白筆記本資料格。 此程式代碼會使用 Apache Spark
selectExpr()
方法和 SQLupper
表示式,將字串 column 轉換成大寫 (並將 column重新命名 )。Python
display(df.selectExpr("Count", "upper(County) as big_name"))
Scala
display(df.selectExpr("Count", "upper(County) as big_name"))
R
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
按
Shift+Enter
執行此資料格,然後移至下一個資料格。
使用 expr()
來使用 column 的 SQL 語法
瞭解如何匯入並使用 Apache Spark expr()
函式,以在指定 column 的任何位置使用 SQL 語法。
請將下列程式碼複製並貼到您空白筆記本資料格。 此程式代碼會匯入
expr()
函式,然後使用Apache Sparkexpr()
函式和 SQLlower
表示式,將字串 column 轉換成小寫(並將 column重新命名)。Python
from pyspark.sql.functions import expr display(df.select("Count", expr("lower(County) as little_name")))
Scala
import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function display(df.select(col("Count"), expr("lower(County) as little_name")))
R
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name")) # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
按
Shift+Enter
執行此資料格,然後移至下一個資料格。
使用 spark.sql() 函數執行任意 SQL 查詢
瞭解如何使用 Apache Spark spark.sql()
函數來執行任意 SQL 查詢。
請將下列程式碼複製並貼到您空白筆記本資料格。 此程式代碼會使用 Apache Spark
spark.sql()
函式,使用 SQL 語法查詢 SQL table。Python
display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
Scala
display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
R
display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
按
Shift+Enter
執行此資料格,然後移至下一個資料格。
DataFrame 教學課程筆記本
下列筆記本包含本教學課程中的範例查詢。