教學課程:執行您的第一個 DLT 資料處理流程
本教學課程會引導您完成設定第一個 DLT 管線、撰寫基本 ETL 程式代碼,以及執行管線更新的步驟。
本教學課程中的所有步驟都是針對已啟用 Unity 目錄的工作區所設計。 您也可以設定 DLT 管線與舊版 Hive 中繼存放區一同使用。 請參閱 搭配舊版 Hive 中繼存放區使用 DLT 管線。
注意
本教學課程提供使用 Databricks 筆記本開發和驗證新管線程式代碼的指示。 您也可以在 Python 或 SQL 檔案中使用原始碼來設定管線。
如果您已經有使用 DLT 語法撰寫的原始程式碼,您可以設定管線來執行程式碼。 請參閱 設定 DLT 管線。
您可以在 Databricks SQL 中使用完全宣告式 SQL 語法,將具體化檢視和串流數據表的重新整理排程註冊和設定為 Unity 目錄管理的物件。 請參閱 在 Databricks SQL 中使用具體化檢視,使用 Databricks SQL中的串流數據表載入數據。
範例:擷取和處理紐約嬰兒姓名數據
本文中的範例會使用公開提供的數據集,其中包含 紐約州嬰兒名稱的記錄。 此範例示範如何使用 DLT 管線來執行下列工作:
- 將未處理的 CSV 資料從磁碟讀取到數據表。
- 從導入表讀取記錄,並使用 DLT 預期 建立包含經清理的數據的新表格。
- 使用清理的記錄做為建立衍生數據集之 DLT 查詢的輸入。
此程式代碼示範獎章架構的簡化範例。 請參閱 什麼是獎牌湖屋建築?。
此範例的實作會針對 Python 和 SQL 提供。 請遵循步驟來建立新的管線和筆記本,然後複製並貼上所提供的程序代碼。
提供 筆記本 範例,內含完整程式代碼。
要求
- 若要啟動管線,您必須 叢集建立許可權 或存取定義 DLT 叢集的叢集原則。 在執行您的管線之前,DLT 執行階段會建立一個叢集,如果您沒有正確的權限,它將失敗。
- 所有用戶預設都可以使用無伺服器管線來觸發更新。 無伺服器架構必須在帳戶層級啟用,且可能無法在您的工作區域中使用。 請參閱 啟用無伺服器計算。
本教學課程中的範例會使用 Unity 目錄。 Databricks 建議建立新的架構來執行本教學課程,因為目標架構中會建立多個資料庫物件。
- 若要在目錄中建立新的架構,您必須具有
ALL PRIVILEGES
或USE CATALOG
和CREATE SCHEMA
許可權。 - 如果您無法建立新的架構,請針對現有的架構執行本教學課程。 您必須具有下列權限:
- 父目錄
USE CATALOG
。 -
ALL PRIVILEGES
或USE SCHEMA
、CREATE MATERIALIZED VIEW
和目標架構上CREATE TABLE
許可權。
- 父目錄
- 本教學課程會使用磁碟區來儲存範例數據。 Databricks 建議為本教學課程建立新的磁碟區。 如果您為本教學課程建立新的架構,您可以在該架構中建立新的磁碟區。
- 若要在現有的架構中建立新的磁碟區,您必須具有下列許可權:
- 父目錄的
USE CATALOG
。 - 在目標架構上應具有
ALL PRIVILEGES
許可權,或者同時具有USE SCHEMA
和CREATE VOLUME
許可權。
- 父目錄的
- 您可以選擇性地使用現有的磁碟區。 您必須具有下列權限:
- 分類編號
USE CATALOG
的父目錄。 - 父架構
USE SCHEMA
。 - 目標磁碟區上
ALL PRIVILEGES
或READ VOLUME
和WRITE VOLUME
。
- 分類編號
- 若要在現有的架構中建立新的磁碟區,您必須具有下列許可權:
若要設定這些許可權,請連絡 Databricks 系統管理員。 如需 Unity Catalog 權限的詳細資訊,請參閱 Unity Catalog 權限和安全性實體物件。
- 若要在目錄中建立新的架構,您必須具有
步驟 0:下載數據
此範例會從 Unity 目錄磁碟區載入數據。 下列程式代碼會下載 CSV 檔案,並將其儲存在指定的磁碟區中。 開啟新的筆記本,然後執行下列程式代碼,將此資料下載至指定的磁碟區:
import urllib
my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"
urllib.request.urlretrieve(download_url, volume_path + filename)
將 <catalog-name>
、<schema-name>
和 <volume-name>
替換為 Unity Catalog 的目錄名稱、架構名稱和磁碟區名稱。 如果這些物件不存在,提供的程式代碼會嘗試建立指定的架構和磁碟區。 您必須擁有適當的許可權,才能在 Unity 目錄中建立和寫入物件。 請參閱 需求。
注意
請確定此筆記型電腦已成功執行,再繼續本教學課程。 請勿將此筆記本設定為管線的一部分。
步驟 1:建立管線
DLT 透過使用 DLT 語法,解析在筆記本或檔案中定義的相依性,來建立管線(稱為 原始程式碼)。 每個原始碼檔案只能包含一種語言,但您可以在管線中新增多個語言特定的筆記本或檔案。
重要
請勿在 [原始程式碼] 欄位中設定任何資產。 讓此欄位保持黑色會建立並設定用於原始程式碼撰寫的筆記本。
本教學課程中的指示使用無伺服器計算和 Unity 目錄。 針對這些指示中未指定的所有組態選項,使用預設設定。
注意
如果您的工作區中未啟用或支援無伺服器環境,您可以使用預設計算設定來完成本教學課程。 您必須在 [建立管線] UI 的 [目的地] 區段中,於 [儲存體選項] 下手動選取 [Unity Catalog]。
若要設定新的管線,請執行下列動作:
- 在側邊欄中,點擊 [DLT]。
- 點擊 建立管線。
- 在 管線名稱中,輸入唯一的管線名稱。
- 選取 無伺服器 複選框。
- 在 [目的地]中,若要設定發佈數據表的 Unity 目錄位置,請選取 目錄 和 架構。
- 在 [進階] 中,按一下 [新增組態],然後為您下載資料的目錄、架構和磁碟區管線定義參數,使用下列參數名稱:
my_catalog
my_schema
my_volume
- 點擊 建立。
新管線的管線 UI 隨即出現。 程式碼筆記本會針對管線自動建立並設定。
筆記本會建立在用戶目錄下的新資料夾中。 新目錄和檔案的名稱符合管線的名稱。 例如,/Users/your.username@databricks.com/my_pipeline/my_pipeline
。
存取此筆記本的連結位於 [管線 詳細數據] 面板的 [原始程式碼] 字段中。 按兩下連結以開啟筆記本,再繼續進行下一個步驟。
步驟 2:使用 Python 或 SQL 在筆記本中宣告具體化檢視和串流數據表
您可以使用 Datbricks 筆記本,以互動方式開發及驗證 DLT 管線的原始程式碼。 您必須將筆記本附加至管線,才能使用這項功能。 若要將新建立的筆記本附加至您剛才建立的管線:
- 單擊右上方的 [Connect],以開啟計算組態功能表。
- 將滑鼠停留在您在步驟 1 中建立的管線名稱上。
- 按一下 連線。
UI 會變更為在右上角包含 [驗證 ] 和 [開始 ] 按鈕。 若要深入瞭解管線程式代碼開發的筆記本支援,請參閱 在筆記本中開發及偵錯 DLT 管線。
重要
- DLT 管線在規劃期間會評估筆記本中的所有儲存格。 不同於針對通用計算而執行或排程為工作項目的筆記本程式,資料管道不保證程式碼單元會依照指定順序執行。
- 筆記本只能包含單一程式設計語言。 請勿在管線原始程式碼筆記本中混合 Python 和 SQL 程式代碼。
如需使用 Python 或 SQL 開發程式代碼的詳細資訊,請參閱 使用 Python 開發管線程式代碼,或 使用 SQL開發管線程式代碼。
範例管線程序代碼
若要實作本教學課程中的範例,請將下列程式代碼複製並貼在設計為原始程式碼的筆記本中的單元格中。
所提供的程式代碼會執行下列動作:
- 匯入必要的模組(僅限 Python)。
- 引用在管線設定期間定義的參數。
- 定義一個名為
baby_names_raw
的流式資料表,從一個磁碟區中匯入資料。 - 定義名為
baby_names_prepared
的具體化檢視,以驗證內嵌的數據。 - 定義一個名為
top_baby_names_2021
的具體化檢視,該檢視能高度精簡地呈現數據。
蟒
# Import modules
import dlt
from pyspark.sql.functions import *
# Assign pipeline parameters to variables
my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")
# Define the path to source data
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
# Define a streaming table to ingest data from a volume
@dlt.table(
comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("inferSchema", True)
.option("header", True)
.load(volume_path)
)
df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
return df_renamed_column
# Define a materialized view that validates data and renames a column
@dlt.table(
comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
return (
spark.read.table("baby_names_raw")
.withColumnRenamed("Year", "Year_Of_Birth")
.select("Year_Of_Birth", "First_Name", "Count")
)
# Define a materialized view that has a filtered, aggregated, and sorted view of the data
@dlt.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
.limit(10)
)
SQL
-- Define a streaming table to ingest data from a volume
CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
'/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
format => 'csv',
header => true,
mode => 'FAILFAST'));
-- Define a materialized view that validates data and renames a column
CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
Year AS Year_Of_Birth,
First_Name,
Count
FROM baby_names_raw;
-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;
步驟 3:啟動管線更新
若要啟動管線更新,請按下筆記本 UI 右上角的 [開始] 按鈕。
範例筆記本
下列筆記本包含本文中提供的相同程序代碼範例。 這些筆記本的需求與本文中的步驟相同。 請參閱 需求。
若要匯入筆記本,請完成下列步驟:
- 開啟筆記本UI。
- 點擊 「+ 新增>Notebook」。
- 空的筆記本打開了。
- 點選【檔案>匯入...】。 [匯入] 對話框出現。
- 選取 URL 選項,適用於 從匯入。
- 貼上筆記本的 URL。
- 點選 匯入。
本教學課程會要求您先執行數據設定筆記本,再設定和執行 DLT 管線。 匯入下列筆記本,將筆記本附加至計算資源,填入 my_catalog
、my_schema
和 my_volume
的必要變數,然後按一下 執行所有。
管線的數據下載教學課程
下列筆記本提供 Python 或 SQL 中的範例。 當您匯入筆記本時,它會儲存到您的使用者主目錄。
匯入下列其中一個筆記本之後,請完成建立管線的步驟,但使用 原始程式碼 檔案選擇器來選取下載的筆記本。 使用設定為原始程式碼的筆記本建立管線之後,按兩下管線 UI 中的 [開始] 以觸發更新。