教學課程:執行端對端 Lakehouse 分析管線
本教學課程說明如何為 Azure Databricks Lakehouse 設定端對端分析管線。
重要
本教學課程使用互動式筆記本來完成 Python 或 Unity 目錄中的常見 ETL 工作。 如果您未使用 Unity 目錄,請參閱在 Azure Databricks 上執行您的第一個 ETL 工作負載。
本教學課程中的工作
本文結束時,您會熟練掌握:
- 啟動已啟用 Unity 目錄的計算叢集。
- 建立 Databricks 筆記本。
- 從 Unity 目錄外部位置寫入和讀取資料。
- 使用自動載入器設定 Unity 目錄的累加式資料擷取。
- 執行筆記本儲存格來處理、查詢和預覽資料。
- 將筆記本排程為 Databricks 作業。
- 從 Databricks SQL 查詢 Unity 目錄資料表
Azure Databricks 提供一套生產就緒的工具,可讓資料專業人員快速開發及部署擷取、轉換和載入 (ETL) 管線。 Unity 目錄可讓資料管理人為整個組織的使用者設定及保護儲存體認證、外部位置和資料庫物件的安全。 Databricks SQL 可讓分析師針對生產 ETL 工作負載中使用的相同資料表執行 SQL 查詢,以大規模執行即時商業智慧。
您也可以使用差異即時資料表來建置 ETL 管線。 Databricks 建立了差異即時資料表,以減少建置、部署和維護生產 ETL 管線的複雜性。 請參閱教學課程:執行您的第一個差異即時資料表管線。
需求
注意
如果您沒有叢集控制權限,只要您可以存取叢集,就仍然可以完成下列大多數步驟。
步驟 1:建立叢集
若要進行探勘資料分析和資料工程,請建立叢集以提供執行命令所需的計算資源。
- 在側邊欄中按下 [計算]。
- 按下側邊欄中的 [新增],然後選取 [叢集]。 這會開啟 [新增叢集/計算] 頁面。
- 指定叢集的唯一名稱。
- 選取 [單一節點] 圓形按鈕。
- 從 [存取模式] 下拉式清單中選取 [單一使用者]。
- 請確定您的電子郵件地址會顯示在 [單一使用者] 欄位中。
- 選取所需的 Databricks Runtime 版本 11.1 或更新版本,以使用 Unity 目錄。
- 按下 [建立計算] 以建立叢集。
若要深入瞭解 Databricks 叢集,請參閱計算。
步驟 2:建立 Databricks 筆記本
若要在工作區中建立筆記本,請按一下提要欄位的 新增 ,然後按一下 筆記本。 空白筆記本會在工作區中開啟。
若要深入瞭解如何建立並管理筆記本,請參閱 管理筆記本。
步驟 3:從 Unity 目錄所管理的外部位置寫入和讀取資料
Databricks 建議使用自動載入器進行累加式資料擷取。 自動載入器會在新檔案抵達雲端物件儲存體時自動偵測並處理。
使用 Unity 目錄來管理外部位置的安全存取。 具有 READ FILES
外部位置權限的使用者或服務主體可以使用自動載入器來內嵌資料。
一般而言,資料會因為從其他系統寫入而抵達外部位置。 在此示範中,您可以將 JSON 檔案寫出至外部位置,以模擬資料抵達。
將下列程式碼複製到筆記本儲存格。 將 catalog
的字串值取代為具有 CREATE CATALOG
和 USE CATALOG
權限的目錄名稱。 將 external_location
的字串值取代為 READ FILES
、WRITE FILES
和 CREATE EXTERNAL TABLE
權限的外部位置路徑。
外部位置可以定義為整個儲存體容器,但通常會指向容器中巢狀目錄。
外部位置路徑的正確格式為 "abfss://container_name@storage_account.dfs.core.windows.net/path/to/external_location"
。
external_location = "<your-external-location>"
catalog = "<your-catalog>"
dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
display(dbutils.fs.head(f"{external_location}/filename.txt"))
dbutils.fs.rm(f"{external_location}/filename.txt")
display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))
執行此儲存格應該列印一行,讀取 12 個位元組、列印字串 “Hello world!”,並顯示目錄中提供的所有資料庫。 如果您無法執行此儲存格,請確認您位於已啟用 Unity 目錄的工作區中,並要求工作區系統管理員的適當權限,以完成本教學課程。
下列 Python 程式碼會使用您的電子郵件位址,在提供的目錄中建立唯一資料庫,以及在提供的外部位置中建立唯一的儲存位置。 執行此儲存格將會移除與本教學課程相關聯的所有資料,讓您能夠以等冪方式執行此範例。 類別已定義並具現化,您將用來模擬從連線系統到來源外部位置的資料批次。
將此程式碼複製到筆記本中的新儲存格,並加以執行以設定您的環境。
注意
此程式碼中定義的變數應該可讓您安全地執行它,而不會有與現有工作區資產或其他使用者衝突的風險。 執行此程式碼時,限制的網路或儲存體權限會引發錯誤;請連絡您的工作區系統管理員,以針對這些限制進行疑難排解。
from pyspark.sql.functions import col
# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"
spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")
spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")
# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)
# Define a class to load batches of data to source
class LoadData:
def __init__(self, source):
self.source = source
def get_date(self):
try:
df = spark.read.format("json").load(source)
except:
return "2016-01-01"
batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
if batch_date.month == 3:
raise Exception("Source data exhausted")
return batch_date
def get_batch(self, batch_date):
return (
spark.table("samples.nyctaxi.trips")
.filter(col("tpep_pickup_datetime").cast("date") == batch_date)
)
def write_batch(self, batch):
batch.write.format("json").mode("append").save(self.source)
def land_batch(self):
batch_date = self.get_date()
batch = self.get_batch(batch_date)
self.write_batch(batch)
RawData = LoadData(source)
您現在可以將下列程式碼複製到儲存格中並加以執行,來登陸一批資料。 您可以手動執行此儲存格,最多 60 次以觸發新的資料抵達。
RawData.land_batch()
步驟 4:設定自動載入器以將資料內嵌至 Unity 目錄
Databricks 建議使用 Delta Lake 儲存資料。 Delta Lake 是一個開放原始碼儲存體層,可提供 ACID 交易並啟用資料湖存放庫。 Delta Lake 是在 Databricks 中建立的資料表預設格式。
若要將自動載入器設定為將資料內嵌至 Unity 目錄資料表,請將下列程式碼複製並貼到筆記本中的空白儲存格:
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(source)
.select("*", col("_metadata.source").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.option("mergeSchema", "true")
.toTable(table))
若要深入了解自動載入器,請參閱什麼是自動載入器?。
若要深入瞭解使用 Unity 目錄進行結構化串流,請參閱搭配結構化串流使用 Unity 目錄。
步驟 5:處理資料並與之互動
筆記本會逐個儲存格執行邏輯。 使用這些步驟以執行儲存格中的邏輯:
若要執行您在上一個步驟中完成的儲存格,請選取儲存格,然後按 SHIFT+ENTER。
若要查詢您剛建立的資料表,請將下列程式碼複製並貼入空白儲存格,然後按 SHIFT+ENTER 以執行儲存格。
df = spark.read.table(table)
若要預覽 DataFrame 中的資料,請將下列程式碼複製並貼到空白儲存格中,然後按 SHIFT+ENTER 以執行儲存格。
display(df)
若要深入瞭解可視化資料的互動式選項,請參閱 Databricks 筆記本中的視覺效果。
步驟 6:排程作業
您可以透過將 Databricks 筆記本新增為 Databricks 作業中的工作,來執行 Databricks 筆記本作為生產指令碼。 在此步驟中,您將建立可手動觸發的新作業。
若要將筆記本排程為工作:
- 按一下標題列右側的 [排程]。
- 針對 [作業名稱] 輸入唯一名稱。
- 按一下 [手動]。
- 在 [叢集] 下拉式清單中,選取您在步驟 1 中建立的叢集。
- 按一下 [建立]。
- 在出現的視窗中,按一下 [立即執行]。
- 若要查看作業執行結果,請按一下 [上次執行] 時間戳旁的圖示。
如需工作的詳細資訊,請參閱什麼是 Databricks 工作?。
步驟 7:從 Databricks SQL 查詢資料表
具有目前目錄的 USE CATALOG
權限、目前結構描述的 USE SCHEMA
權限和資料表的 SELECT
權限之任何人,都可以從慣用的 Databricks API 查詢資料表的內容。
您需要存取執行中的 SQL 倉儲,才能在 Databricks SQL 中執行查詢。
您稍早在本教學課程中建立的資料表名稱為 target_table
。 您可以使用您在第一個儲存格中提供的目錄,以及具有模式 e2e_lakehouse_<your-username>
的資料庫來查詢它。 您可以使用目錄總管來尋找您所建立的資料物件。
其他整合
深入瞭解使用 Azure Databricks 進行資料工程的整合和工具: