教學課程:執行您的第一個差異即時資料表管線(機器翻譯)
本教學課程會引導您完成設定第一個 Delta Live Tables 管線、撰寫基本 ETL 程式代碼,以及執行管線更新的步驟。
本教學課程中的所有步驟都是針對已啟用 Unity 目錄的工作區所設計。 您也可以設定 Delta Live Tables 管線來處理舊版 Hive 中繼存放區。 請參閱 搭配舊版 Hive 中繼存放區使用 Delta Live Tables 管線。
注意
本教學課程提供使用 Databricks 筆記本開發和驗證新管線程式代碼的指示。 您也可以在 Python 或 SQL 檔案中使用原始碼來設定管線。
如果您已經有使用 Delta Live Tables 語法撰寫的原始程式碼,您可以設定管線來執行程式碼。 請參閱 設定 Delta Live Tables 管線。
您可以在 Databricks SQL 中使用完全宣告式 SQL 語法,將具體化檢視和串流數據表的重新整理排程註冊和設定為 Unity 目錄管理的物件。 請參閱 在 Databricks SQL 中使用具體化檢視和使用 Databricks SQL 中的串流數據表載入數據。
範例:擷取和處理紐約嬰兒姓名資料
本文中的範例使用公開可用的資料集,其中包含紐約州嬰兒姓名的記錄。 此範例示範如何使用 Delta Live Tables 管線來執行下列工作:
- 將原始 CSV 資料從磁碟區讀取到數據表。
- 從擷取數據表讀取記錄,並使用 Delta Live Tables 預期 來建立包含清理數據的新數據表。
- 使用已清理的記錄作為建立衍生資料集之 Delta Live Tables 查詢的輸入。
此程式碼會示範獎牌結構的簡化範例。 請參閱什麼是獎牌 Lakehouse 結構?(機器翻譯)。
此範例的實作會針對 Python 和 SQL 提供。 請遵循步驟來建立新的管線和筆記本,然後複製並貼上所提供的程序代碼。
也會提供具有完整程式代碼的範例筆記本。
需求
若要啟動管道,則您必須擁有叢集建立權限或定義 Delta Live Tables 叢集之叢集原則的存取權限。 差異即時資料表執行階段會在執行管道之前建立叢集,且如果您沒有正確的權限,則會建立失敗。
所有用戶預設都可以使用無伺服器管線來觸發更新。 無伺服器必須在帳戶層級啟用,而且可能無法在工作區區域中使用。 請參閱啟用無伺服器計算。
本教學課程中的範例使用 Unity 目錄。 Databricks 建議建立新的架構來執行本教學課程,因為目標架構中會建立多個資料庫物件。
- 若要在目錄中建立新的架構,您必須擁有
ALL PRIVILEGES
或USE CATALOG
和CREATE SCHEMA
許可權。 - 如果您無法建立新的架構,請針對現有的架構執行本教學課程。 您必須具有下列權限:
USE CATALOG
父目錄的 。ALL PRIVILEGES
或USE SCHEMA
、CREATE MATERIALIZED VIEW
和CREATE TABLE
目標架構的許可權。
- 本教學課程會使用磁碟區來儲存範例數據。 Databricks 建議為本教學課程建立新的磁碟區。 如果您為本教學課程建立新的架構,您可以在該架構中建立新的磁碟區。
- 若要在現有的架構中建立新的磁碟區,您必須具有下列許可權:
USE CATALOG
父目錄的 。ALL PRIVILEGES
目標架構的 或USE SCHEMA
和CREATE VOLUME
許可權。
- 您可以選擇性地使用現有的磁碟區。 您必須具有下列權限:
USE CATALOG
父目錄的 。USE SCHEMA
父架構的 。ALL PRIVILEGES
WRITE VOLUME
或目標磁碟區上的 或READ VOLUME
。
- 若要在現有的架構中建立新的磁碟區,您必須具有下列許可權:
若要設定這些許可權,請連絡 Databricks 系統管理員。 如需 Unity 目錄許可權的詳細資訊,請參閱 Unity 目錄許可權和安全性實體物件。
- 若要在目錄中建立新的架構,您必須擁有
步驟 0:下載數據
此範例會從 Unity 目錄磁碟區載入數據。 下列程式代碼會下載 CSV 檔案,並將其儲存在指定的磁碟區中。 開啟新的筆記本,然後執行下列程式代碼,將此資料下載至指定的磁碟區:
my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"
dbutils.fs.cp(download_url, volume_path + filename)
使用 Unity 目錄磁碟區的目錄、結構描述和磁碟區名稱取代 <catalog-name>
、<schema-name>
和 <volume-name>
。 如果這些物件不存在,提供的程式代碼會嘗試建立指定的架構和磁碟區。 您必須擁有適當的許可權,才能在 Unity 目錄中建立和寫入物件。 請參閱 需求。
注意
請確定此筆記本已成功執行,再繼續進行本教學課程。 請勿將此筆記本設定為管線的一部分。
步驟 1:建立管線
Delta Live Tables 會藉由使用 Delta Live Tables 語法解析筆記本或檔案中定義的相依性來建立管線(稱為 原始程式碼)。 每個原始碼檔案只能包含一種語言,但您可以在管線中新增多個語言特定的筆記本或檔案。
重要
請勿在 [原始程式碼 ] 欄位中設定任何資產。 讓此欄位保持黑色會建立並設定用於原始程式碼撰寫的筆記本。
本教學課程中的指示使用無伺服器計算和 Unity 目錄。 針對這些指示中未提及的所有組態選項,使用預設設定。
注意
如果您的工作區中未啟用或支援無伺服器,您可以使用預設計算設定來完成本教學課程。 您必須在 [建立管線 UI] 的 [目的地] 區段中,手動選取 [記憶體選項] 下的 [Unity 目錄]。
若要設定新的管線,請執行下列動作:
- 按兩下提要欄中的 [ 差異實時數據表 ]。
- 按兩下 [ 建立管線]。
- 提供唯 一的管線名稱。
- 核取 [無伺服器] 旁的方塊。
- 選取目錄以發佈數據。
- 選取目錄中的架構。
- 指定要建立架構的新架構名稱。
- 使用 [進階] 底下的 [新增組態] 按鈕來定義三個管線參數,以新增三個組態。 使用下列參數名稱指定您下載資料的目錄、架構和磁碟區:
my_catalog
my_schema
my_volume
- 按一下 [建立]。
新建立管線的管線 UI 隨即出現。 原始程式碼筆記本會自動為管線建立和設定。
筆記本會建立在用戶目錄中的新目錄中。 新目錄和檔案的名稱符合管線的名稱。 例如: /Users/your.username@databricks.com/my_pipeline/my_pipeline
。
存取此筆記本的連結位於 [管線詳細數據] 面板的 [原始程式碼] 字段底下。 按兩下連結以開啟筆記本,再繼續進行下一個步驟。
步驟 2:使用 Python 或 SQL 在筆記本中宣告具體化檢視和串流數據表
您可以使用 Datbricks 筆記本,以互動方式開發及驗證 Delta Live Tables 管線的原始程式碼。 您必須將筆記本附加至管線,才能使用這項功能。 若要將新建立的筆記本附加至您剛才建立的管線:
- 按兩下右上角的 [ 連線 ] 以開啟計算組態功能表。
- 將滑鼠停留在您在步驟 1 中建立的管線名稱上。
- 按一下 [連線]。
UI 會變更為在右上方包含 [驗證 ] 和 [開始 ] 按鈕。 若要深入瞭解管線程式代碼開發的筆記本支援,請參閱 在筆記本中開發和偵錯 Delta Live Tables 管線。
重要
- Delta Live Tables 管線會在規劃期間評估筆記本中的所有數據格。 不同於針對所有用途計算或排程為作業執行的筆記本,管線不保證數據格是以指定的順序執行。
- 筆記本只能包含單一程式設計語言。 請勿在管線原始程式碼筆記本中混合 Python 和 SQL 程式代碼。
如需使用 Python 或 SQL 開發程式代碼的詳細資訊,請參閱 使用 Python 開發管線程式代碼或使用 SQL 開發管線程式代碼。
範例管線程序代碼
若要實作本教學課程中的範例,請將下列程式代碼複製並貼到您管線設定為原始程式碼的筆記本中的數據格中。
所提供的程式代碼會執行下列動作:
- 匯入必要的模組(僅限 Python)。
- 參考管線設定期間定義的參數。
- 定義名為的串流數據表,以
baby_names_raw
從磁碟區內嵌。 - 定義名為
baby_names_prepared
的具體化檢視,以驗證內嵌的數據。 - 定義具名
top_baby_names_2021
的具體化檢視,該檢視具有高度精簡的數據檢視。
Python
# Import modules
import dlt
from pyspark.sql.functions import *
# Assign pipeline parameters to variables
my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")
# Define the path to source data
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
# Define a streaming table to ingest data from a volume
@dlt.table(
comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("inferSchema", True)
.option("header", True)
.load(volume_path)
)
df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
return df_renamed_column
# Define a materialized view that validates data and renames a column
@dlt.table(
comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
return (
spark.read.table("LIVE.baby_names_raw")
.withColumnRenamed("Year", "Year_Of_Birth")
.select("Year_Of_Birth", "First_Name", "Count")
)
# Define a materialized view that has a filtered, aggregated, and sorted view of the data
@dlt.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("LIVE.baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
.limit(10)
)
SQL
-- Define a streaming table to ingest data from a volume
CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
'/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
format => 'csv',
header => true,
mode => 'FAILFAST'));
-- Define a materialized view that validates data and renames a column
CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
Year AS Year_Of_Birth,
First_Name,
Count
FROM LIVE.baby_names_raw;
-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM LIVE.baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;
步驟 3:啟動管線更新
若要啟動管線更新,請按下 筆記本 UI 右上角的 [開始 ] 按鈕。
筆記本範例
下列筆記本包含本文中提供的相同程序代碼範例。 這些筆記本的需求與本文中的步驟相同。 請參閱 需求。
若要匯入筆記本,請完成下列步驟:
- 開啟筆記本UI。
- 按兩下 [+ 新增>筆記本]。
- 空的筆記本隨即開啟。
- 按兩下 [檔案>匯入...]。 匯入 對話方塊隨即出現。
- 選取 [匯入的來源] 的 [URL] 選項。
- 貼上筆記本的 URL。
- 按一下 匯入。
本教學課程會要求您先執行數據設定筆記本,再設定和執行 Delta Live Tables 管線。 匯入下列筆記本、將筆記本附加至計算資源、填入、 my_schema
和 的必要變數my_catalog
,然後按兩下 [全部my_volume
執行]。
管線的數據下載教學課程
下列筆記本提供 Python 或 SQL 中的範例。 當您匯入筆記本時,它會儲存到您的使用者主目錄。
匯入下列其中一個筆記本之後,請完成建立管線的步驟,但使用 原始碼 檔案選擇器來選取下載的筆記本。 使用設定為原始程式碼的筆記本建立管線之後,按兩下 管線 UI 中的 [啟動 ] 以觸發更新。