教學課程:使用 Apache Spark DataFrames 載入並轉換資料
本教學課程介紹如何使用 Azure Databricks 的 Apache Spark Python (PySpark) DataFrame API、Apache Spark Scala DataFrame API 及 SparkR SparkDataFrame API 載入並轉換資料。
在本教學課程結束時,您將了解什麼是 DataFrame 並熟悉以下工作:
Python
- 定義變數並將公用數據複製到 Unity Catalog 磁碟區
- 使用 Python 建立 DataFrame
- 將資料從 CSV 檔案載入至 DataFrame
- 檢視 DataFrame 並與其互動
- 儲存 DataFrame
- 在 PySpark 執行 SQL 查詢
另請參閱Apache Spark PySpark API 參照。
Scala
- 定義變數並將公用數據複製到 Unity Catalog 磁碟區
- 使用 Scala 建立 DataFrame
- 將資料從 CSV 檔案載入至 DataFrame
- 檢視 DataFrame 並與其互動
- 儲存 DataFrame
- 在 Apache Spark 執行 SQL 查詢
另請參閱Apache Spark Scala API 參照.
R
- 定義變數並將公用數據複製到 Unity Catalog 磁碟區
- 建立 SparkR SparkDataFrames
- 將資料從 CSV 檔案載入至 DataFrame
- 檢視 DataFrame 並與其互動
- 儲存 DataFrame
- 在 SparkR 執行 SQL 查詢
另請參閱Apache SparkR API 參照.
什麼是 DataFrame?
DataFrame 是一種二維有標籤的資料結構,columns 可能包含不同的類型。 您可以將 DataFrame 想像成電子表格、SQL table或數列物件的字典。 Apache Spark DataFrame 提供豐富的函式 set(selectcolumns、篩選、join、匯總),可讓您有效率地解決常見的數據分析問題。
Apache Spark DataFrame 是以彈性分散式資料集 (RDD) 為基礎建置的抽象概念。 Spark DataFrame 和 Spark SQL 使用統一的規劃和優化引擎,可讓您在 Azure Databricks (Python、SQL、Scala 和 R) 上,get 所有支援語言的幾乎完全相同的效能。
需求
要完成以下教學課程,您必須滿足以下要求:
若要使用本教學課程中的範例,您的工作區必須已啟用 Unity Catalog。
本教學課程中的範例會使用 Unity Catalog磁碟區 來儲存範例數據。 若要使用這些範例,請建立磁碟區,並使用該磁碟區的 catalog、schema和磁碟區名稱來 set 範例所使用的磁碟區路徑。
您必須在 Unity Catalog中具有下列權限:
-
READ VOLUME
和WRITE VOLUME
、或ALL PRIVILEGES
本教學課程所使用的磁碟區。 - 在本教學中使用的 schema 使用的是
USE SCHEMA
或ALL PRIVILEGES
。 - 本教學課程使用的 catalog 是
USE CATALOG
或ALL PRIVILEGES
。
若要 set 這些許可權,請參閱 Databricks 系統管理員或 Unity Catalog 許可權和安全性及可保護的物件。
-
提示
有關本文的完整筆記本,請參閱DataFrame 教學課程筆記本。
步驟 1:定義變數並載入 CSV 檔案
這個步驟是為了定義供本教程使用的變數,然後載入一個包含嬰兒名稱數據的 CSV 檔案,從 health.data.ny.gov 載入到您的 Unity Catalog 磁碟區中。
按一下圖示以開啟新筆記本。 若要了解如何巡覽 Azure Databricks 筆記本,請參閱Databricks 筆記本介面和控制項。
請將下列程式碼複製並貼到全新空白筆記本資料格。 請將
<catalog-name>
、<schema-name>
和<volume-name>
替換為 catalog、schema和 Unity Catalog 磁碟區的名稱。 以您選擇的 table 名稱取代<table_name>
。 稍後在本教學課程中,您會將嬰兒名稱數據載入此 table。Python
catalog = "<catalog_name>" schema = "<schema_name>" volume = "<volume_name>" download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name = "rows.csv" table_name = "<table_name>" path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume path_table = catalog + "." + schema print(path_table) # Show the complete path print(path_volume) # Show the complete path
Scala
val catalog = "<catalog_name>" val schema = "<schema_name>" val volume = "<volume_name>" val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" val fileName = "rows.csv" val tableName = "<table_name>" val pathVolume = s"/Volumes/$catalog/$schema/$volume" val pathTable = s"$catalog.$schema" print(pathVolume) // Show the complete path print(pathTable) // Show the complete path
R
catalog <- "<catalog_name>" schema <- "<schema_name>" volume <- "<volume_name>" download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name <- "rows.csv" table_name <- "<table_name>" path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "") path_table <- paste(catalog, ".", schema, sep = "") print(path_volume) # Show the complete path print(path_table) # Show the complete path
按
Shift+Enter
以執行資料格並建立新的空白資料格。請將下列程式碼複製並貼到全新空白筆記本資料格。 此程式碼會使用 Databricks dbutuils 命令,將
rows.csv
檔案從 health.data.ny.gov 複製到 Unity Catalog 磁碟分區。Python
dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
Scala
dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
R
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
按
Shift+Enter
執行此資料格,然後移至下一個資料格。
步驟 2:建立 DataFrame
此步驟建立一個以測試資料命名df1
的 DataFrame,然後顯示其內容。
請將下列程式碼複製並貼到全新空白筆記本資料格。 此程式代碼會使用測試數據建立 DataFrame,然後顯示 DataFrame 的內容和 schema。
Python
data = [[2021, "test", "Albany", "M", 42]] columns = ["Year", "First_Name", "County", "Sex", "Count"] df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int") display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
Scala
val data = Seq((2021, "test", "Albany", "M", 42)) val columns = Seq("Year", "First_Name", "County", "Sex", "Count") val df1 = data.toDF(columns: _*) display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization. // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
R
# Load the SparkR package that is already preinstalled on the cluster. library(SparkR) data <- data.frame( Year = as.integer(c(2021)), First_Name = c("test"), County = c("Albany"), Sex = c("M"), Count = as.integer(c(42)) ) df1 <- createDataFrame(data) display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
按
Shift+Enter
執行此資料格,然後移至下一個資料格。
步驟 3:將資料從 CSV 檔案載入至 DataFrame
此步驟會從您先前載入 Unity Catalog 磁碟區的 CSV 檔案建立名為 df_csv
的 DataFrame。 請參閱spark.read.csv。
請將下列程式碼複製並貼到全新空白筆記本資料格。 此程式碼將嬰兒名稱資料
df_csv
從 CSV 檔案載入至 DataFrame,然後展示 DataFrame 的內容。Python
df_csv = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",") display(df_csv)
Scala
val dfCsv = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", ",") .csv(s"$pathVolume/$fileName") display(dfCsv)
R
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""), source="csv", header = TRUE, inferSchema = TRUE, delimiter = ",") display(df_csv)
按
Shift+Enter
執行此資料格,然後移至下一個資料格。
您可以從許多支援的檔案格式載入資料。
步驟 4:檢視 DataFrame 並與其互動
使用以下方法檢視您的寶寶姓名 DataFrames 並與其互動。
列印 DataFrame schema
瞭解如何顯示 Apache Spark DataFrame 的 schema。 Apache Spark 會使用 schema 一詞來參考 DataFrame 中 columns 的名稱和數據類型。
注意
Azure Databricks 也會使用 schema 一詞來描述註冊至 catalog的 tables 集合。
請將下列程式碼複製並貼到您空白筆記本資料格。 此程式代碼會使用
.printSchema()
方法來顯示 DataFrames 的 schema,以檢視兩個 DataFrame 的架構, 以準備將兩個 DataFrame 聯集。Python
df_csv.printSchema() df1.printSchema()
Scala
dfCsv.printSchema() df1.printSchema()
R
printSchema(df_csv) printSchema(df1)
按
Shift+Enter
執行此資料格,然後移至下一個資料格。
在 DataFrame 中重新命名 column
瞭解如何在 DataFrame 中重新命名 column。
請將下列程式碼複製並貼到您空白筆記本資料格。 此程式代碼會重新命名
df1_csv
DataFrame 中的 column,以符合df1
DataFrame 中的個別 column。 此程式碼使用 Apache SparkwithColumnRenamed()
方法。Python
df_csv = df_csv.withColumnRenamed("First Name", "First_Name") df_csv.printSchema
Scala
val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name") // when modifying a DataFrame in Scala, you must assign it to a new variable dfCsvRenamed.printSchema()
R
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name") printSchema(df_csv)
按
Shift+Enter
執行此資料格,然後移至下一個資料格。
合併 DataFrames
了解如何建立新的 DataFrame,將一個 DataFrame 的行列新增至另一個 DataFrame。
請將下列程式碼複製並貼到您空白筆記本資料格。 此程式碼使用 Apache Spark
union()
方法將第一個 DataFrame 的內容df
與df_csv
包含從 CSV 檔案載入的嬰兒姓名資料的 DataFrame 合併。Python
df = df1.union(df_csv) display(df)
Scala
val df = df1.union(dfCsvRenamed) display(df)
R
display(df <- union(df1, df_csv))
按
Shift+Enter
執行此資料格,然後移至下一個資料格。
篩選 DataFrame 的行列
使用 Apache Spark .filter()
或 .where()
方法來篩選數據列,以探索數據 set 中最受歡迎的嬰兒名稱。 使用篩選來 select 行子集,以在 DataFrame 中返回或修改。 效能或語法沒有差異,如以下範例所示。
使用 .filter() 方法
請將下列程式碼複製並貼到您空白筆記本資料格。 此程式碼使用 Apache Spark
.filter()
方法來顯示 DataFrame 計數超過 50 的那些行列。Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))
按
Shift+Enter
執行此資料格,然後移至下一個資料格。
使用where() 方法
請將下列程式碼複製並貼到您空白筆記本資料格。 此程式碼使用 Apache Spark
.where()
方法來顯示 DataFrame 計數超過 50 的那些行列。Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))
按
Shift+Enter
執行此資料格,然後移至下一個資料格。
從一個 DataFrame 中提取 Select和columns,並依頻率排序
瞭解哪些嬰兒名稱頻率與 select()
方法,以指定要從 DataFrame 傳回的 columns。 使用 Apache Spark orderby
和desc
函式來排序結果。
Apache Spark 的 pyspark.sql 模組提供對 SQL 函式的支援。 我們在本教學課程使用的這些函式包括 Apache Spark orderBy()
、desc()
、以及expr()
函式。 您可以根據需要將這些函式匯入工作階段來啟用這些功能。
請將下列程式碼複製並貼到您空白筆記本資料格。 此程式碼匯入
desc()
函式,然後使用 Apache Sparkselect()
方法與 Apache SparkorderBy()
以及desc()
函式,按遞減順序顯示最常見的名稱及其計數。Python
from pyspark.sql.functions import desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
Scala
import org.apache.spark.sql.functions.desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
R
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
按
Shift+Enter
執行此資料格,然後移至下一個資料格。
建立子集合 DataFrame
了解如何從現有 DataFrame 建立子集合 DataFrame。
請將下列程式碼複製並貼到您空白筆記本資料格。 此程式碼使用 Apache Spark
filter
方法建立新的 DataFrame,按年份、計數及性別限制資料。 它會使用 Apache Sparkselect()
方法來 limitcolumns。 它還使用 Apache SparkorderBy()
與desc()
函式按計數對新的 DataFrame 進行排序。Python
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
Scala
val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
R
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count") display(subsetDF)
按
Shift+Enter
執行此資料格,然後移至下一個資料格。
步驟 5:儲存 DataFrame
了解如何儲存 DataFrame。 您可以將 DataFrame 儲存至 table,或將數據框架寫入檔案或多個檔案。
將數據框儲存至 table
根據預設,Azure Databricks 會針對所有 tables 使用 Delta Lake 格式。 若要儲存 DataFrame,您必須對 catalog 和 schema具有 CREATE
table 許可權。
請將下列程式碼複製並貼到您空白筆記本資料格。 此程式代碼會使用您在本教學課程開頭定義的變數,將 DataFrame 的內容儲存至 table。
Python
df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
Scala
df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
R
saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
按
Shift+Enter
執行此資料格,然後移至下一個資料格。
大多數 Apache Spark 應用程式都以分散式方式處理大型資料集。 Apache Spark 會寫出檔案目錄,而非單一檔案。 Delta Lake 可拆分 Parquet 資料夾與檔案。 許多資料系統可以讀取這些檔案目錄。 Azure Databricks 建議針對大多數應用程式使用檔案路徑 tables。
將 DataFrame 儲存至 JSON 檔案
請將下列程式碼複製並貼到您空白筆記本資料格。 此程式碼將 DataFrame 儲存至 JSON 檔案目錄。
Python
df.write.format("json").mode("overwrite").save("/tmp/json_data")
Scala
df.write.format("json").mode("overwrite").save("/tmp/json_data")
R
write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
按
Shift+Enter
執行此資料格,然後移至下一個資料格。
從 JSON 檔案讀取 DataFrame
了解如何使用 Apache Spark spark.read.format()
方法將 JSON 資料從目錄讀取至 DataFrame。
請將下列程式碼複製並貼到您空白筆記本資料格。 此程式碼顯示您在先前範例儲存的 JSON 檔案。
Python
display(spark.read.format("json").json("/tmp/json_data"))
Scala
display(spark.read.format("json").json("/tmp/json_data"))
R
display(read.json("/tmp/json_data"))
按
Shift+Enter
執行此資料格,然後移至下一個資料格。
其他工作:在 PySpark、Scala 及 R 中執行 SQL 查詢
Apache Spark DataFrames 提供以下選項將 SQL 與 PySpark、Scala 及 R 結合。您可以在為本教學課程建立的同一筆記本中執行以下程式碼。
將 column 指定為 SQL 查詢
了解如何使用 Apache Spark selectExpr()
方法。 這是select()
方法的變體,可接受 SQL 運算式並傳回更新的 DataFrame。 該方法讓您可使用 SQL 運算式,例如upper
。
請將下列程式碼複製並貼到您空白筆記本資料格。 此程式代碼會使用 Apache Spark
selectExpr()
方法和 SQLupper
表示式,將字串 column 轉換成大寫 (並將 column重新命名 )。Python
display(df.selectExpr("Count", "upper(County) as big_name"))
Scala
display(df.selectExpr("Count", "upper(County) as big_name"))
R
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
按
Shift+Enter
執行此資料格,然後移至下一個資料格。
使用 expr()
來使用 column 的 SQL 語法
瞭解如何匯入並使用 Apache Spark expr()
函式,以在指定 column 的任何位置使用 SQL 語法。
請將下列程式碼複製並貼到您空白筆記本資料格。 此程式代碼會匯入
expr()
函式,然後使用Apache Sparkexpr()
函式和 SQLlower
表示式,將字串 column 轉換成小寫(並將 column重新命名)。Python
from pyspark.sql.functions import expr display(df.select("Count", expr("lower(County) as little_name")))
Scala
import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function display(df.select(col("Count"), expr("lower(County) as little_name")))
R
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name")) # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
按
Shift+Enter
執行此資料格,然後移至下一個資料格。
使用 spark.sql() 函數執行任意 SQL 查詢
瞭解如何使用 Apache Spark spark.sql()
函數來執行任意 SQL 查詢。
請將下列程式碼複製並貼到您空白筆記本資料格。 此程式代碼會使用 Apache Spark
spark.sql()
函式,使用 SQL 語法查詢 SQL table。Python
display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
Scala
display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
R
display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
按
Shift+Enter
執行此資料格,然後移至下一個資料格。
DataFrame 教學課程筆記本
下列筆記本包含本教學課程中的範例查詢。