在 Azure Databricks 上執行您的第一個 ETL 工作負載
瞭解如何使用 Azure Databricks 的生產就緒工具,開發及部署資料協調流程的第一個擷取、轉換和載入 (ETL) 管線。
本文結束時,您會熟練掌握:
- 啟動 Databricks 全用途計算叢集。
- 建立 Databricks 筆記本。
- 使用自動載入器設定 Delta Lake 的累加式資料擷取。
- 執行筆記本儲存格來處理、查詢和預覽資料。
- 將筆記本排程為 Databricks 作業。
本教學課程使用互動式筆記本來完成 Python 或 Scala 中的常見 ETL 工作。
您也可以使用差異即時資料表來建置 ETL 管線。 Databricks 建立了差異即時資料表,以減少建置、部署和維護生產 ETL 管線的複雜性。 請參閱教學課程:執行您的第一個差異即時資料表管線。
您也可以使用 Databricks Terraform 提供者來建立本文的資源。 請參閱使用 Terraform 建立叢集、筆記本和作業。
需求
- 您已登入 Azure Databricks 工作區。
- 您擁有建立叢集的權限。
注意
如果您沒有叢集控制權限,只要您可以存取叢集,就仍然可以完成下列大多數步驟。
步驟 1:建立叢集
若要進行探勘資料分析和資料工程,請建立叢集以提供執行命令所需的計算資源。
- 在側邊欄中按下 [計算]。
- 在 [計算] 頁面上,按一下 [建立叢集]。 這會開啟 [新增叢集] 頁面。
- 指定叢集的唯一名稱,保留其餘值的預設狀態,然後按一下 [建立叢集]。
若要深入瞭解 Databricks 叢集,請參閱計算。
步驟 2:建立 Databricks 筆記本
若要在工作區中建立筆記本,請按一下提要欄位的 新增 ,然後按一下 筆記本。 空白筆記本會在工作區中開啟。
若要深入瞭解如何建立並管理筆記本,請參閱 管理筆記本。
步驟 3:設定自動載入器將資料內嵌至 Delta Lake
Databricks 建議使用自動載入器進行累加式資料擷取。 自動載入器會在新檔案抵達雲端物件儲存體時自動偵測並處理。
Databricks 建議使用 Delta Lake 儲存資料。 Delta Lake 是一個開放原始碼儲存體層,可提供 ACID 交易並啟用資料湖存放庫。 Delta Lake 是在 Databricks 中建立的資料表預設格式。
若要將自動載入器設定為將資料內嵌至 Delta Lake 資料表,請將下列程式碼複製並貼到筆記本中的空白儲存格:
Python
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"
# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(table_name))
Scala
// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._
// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"
// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)
// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(Trigger.AvailableNow)
.toTable(table_name)
注意
此程式碼中定義的變數應該可讓您安全地執行它,而不會有與現有工作區資產或其他使用者衝突的風險。 執行此程式碼時,限制的網路或儲存體權限會引發錯誤;請連絡您的工作區系統管理員,以針對這些限制進行疑難排解。
若要深入了解自動載入器,請參閱什麼是自動載入器?。
步驟 4:處理資料並與之互動
筆記本會逐個儲存格執行邏輯。 若要執行儲存格中的邏輯:
若要執行您在上一個步驟中完成的儲存格,請選取儲存格,然後按 SHIFT+ENTER。
若要查詢您剛建立的資料表,請將下列程式碼複製並貼入空白儲存格,然後按 SHIFT+ENTER 以執行儲存格。
Python
df = spark.read.table(table_name)
Scala
val df = spark.read.table(table_name)
若要預覽 DataFrame 中的資料,請將下列程式碼複製並貼到空白儲存格中,然後按 SHIFT+ENTER 以執行儲存格。
Python
display(df)
Scala
display(df)
若要深入瞭解可視化資料的互動式選項,請參閱 Databricks 筆記本中的視覺效果。
步驟 5:排程作業
您可以透過將 Databricks 筆記本新增為 Databricks 作業中的工作,來執行 Databricks 筆記本作為生產指令碼。 在此步驟中,您將建立可手動觸發的新作業。
若要將筆記本排程為工作:
- 按一下標題列右側的 [排程]。
- 針對 [作業名稱] 輸入唯一名稱。
- 按一下 [手動]。
- 在 [叢集] 下拉式清單中,選取您在步驟 1 中建立的叢集。
- 按一下 [建立]。
- 在出現的視窗中,按一下 [立即執行]。
- 若要查看作業執行結果,請按一下 [上次執行] 時間戳旁的圖示。
如需工作的詳細資訊,請參閱什麼是 Databricks 工作?。
其他整合
深入瞭解使用 Azure Databricks 進行資料工程的整合和工具: