共用方式為


教學:執行完整 Lakehouse 數據分析流程

本教學課程說明如何為 Azure Databricks Lakehouse 設定端對端分析管線。

重要

本教學課程利用互動式筆記本,在啟用了 Unity Catalog 的叢集中完成常見的 Python ETL 工作。 如果您未使用 Unity 目錄,請參閱在 Azure Databricks 上執行您的第一個 ETL 工作負載

本教學課程中的工作

本文結束時,您會感到自在:

  1. 啟動已啟用 Unity 目錄的計算叢集
  2. 建立 Databricks 筆記本
  3. 從 Unity 目錄外部位置寫入和讀取資料
  4. 使用 Auto Loader 設定 Unity Catalog 表格的增量資料導入
  5. 執行筆記本儲存格來處理、查詢和預覽資料
  6. 將筆記本排程為 Databricks 作業
  7. 從 Databricks SQL 查詢 Unity 目錄資料表

Azure Databricks 提供一套生產就緒的工具,可讓資料專業人員快速開發及部署擷取、轉換和載入 (ETL) 管線。 Unity 目錄可讓資料管理人為整個組織的使用者設定及保護儲存體認證、外部位置和資料庫物件的安全。 Databricks SQL 允許分析師對用於生產 ETL 工作負載的相同資料表執行 SQL 查詢,從而大規模提供即時商業智慧。

您也可以使用 DLT 來建置 ETL 管線。 Databricks 已建立 DLT,以減少建置、部署和維護生產 ETL 管線的複雜性。 請參閱 教學課程:運行您的第一個 DLT 資料管線

需求

注意事項

如果您沒有叢集控制權限,只要您可以存取叢集,就仍然可以完成下列大多數步驟。

步驟 1:建立叢集

若要進行探勘資料分析和資料工程,請建立叢集以提供執行命令所需的計算資源。

  1. 在側邊欄中按下 計算圖示 [計算]
  2. 按下側邊欄中的 新增圖示 [新增],然後選取 [叢集]。 這會開啟 [新增叢集/計算] 頁面。
  3. 指定叢集的唯一名稱。
  4. 在 [效能] 區段中,選取 [單一節點] 單選按鈕。
  5. 在 [進階] 下,將存取模式設定切換為 [手動],然後選取 [專用 ]。
  6. 單一使用者或群組中,選取您的用戶名稱。
  7. 選取所需的 Databricks Runtime 版本 11.1 或更新版本,以使用 Unity 目錄。
  8. 按下 [建立計算] 以建立叢集。

若要深入瞭解 Databricks 叢集,請參閱計算

步驟 2:建立 Databricks 筆記本

若要在工作區中建立筆記本,請按一下提要欄位的 新增圖示新增 ,然後按一下 筆記本。 空白筆記本會在工作區中開啟。

若要深入瞭解如何建立並管理筆記本,請參閱 管理筆記本

步驟 3:從 Unity 目錄所管理的外部位置寫入和讀取資料

Databricks 建議使用自動載入器進行累加式資料擷取。 自動載入器會在新檔案抵達雲端物件儲存體時自動偵測並處理。

使用 Unity 目錄來管理外部位置的安全存取。 具有 READ FILES 外部位置權限的使用者或服務主體可以使用自動載入器來內嵌資料。

一般而言,資料會因為從其他系統寫入而抵達外部位置。 在此示範中,您可以將 JSON 檔案寫出至外部位置,以模擬資料抵達。

將下列程式碼複製到筆記本儲存格。 將 catalog 的字串值取代為具有 CREATE CATALOGUSE CATALOG 權限的目錄名稱。 將 external_location 的字串值替換為具有 READ FILESWRITE FILESCREATE EXTERNAL TABLE 權限的外部位置的路徑。

外部位置可以定義為整個儲存體容器,但通常會指向容器中巢狀目錄。

外部位置路徑的正確格式為 "abfss://container_name@storage_account.dfs.core.windows.net/path/to/external_location"


 external_location = "<your-external-location>"
 catalog = "<your-catalog>"

 dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
 display(dbutils.fs.head(f"{external_location}/filename.txt"))
 dbutils.fs.rm(f"{external_location}/filename.txt")

 display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))

執行此儲存格應該列印一行,讀取 12 個位元組、列印字串 “Hello world!”,並顯示目錄中提供的所有資料庫。 如果您無法執行此儲存格,請確認您位於已啟用 Unity 目錄的工作區中,並要求工作區系統管理員的適當權限,以完成本教學課程。

下列 Python 程式碼會使用您的電子郵件位址,在提供的目錄中建立唯一資料庫,以及在提供的外部位置中建立唯一的儲存位置。 執行此儲存格將會移除與本教學課程相關聯的所有資料,讓您能夠以等冪方式執行此範例。 定義並實例化了一個類別,您將用來模擬從連線系統到您的外部來源位置的資料批次。

將此程式碼複製到筆記本中的新儲存格,並加以執行以設定您的環境。

注意

此程式碼中定義的變數應該可讓您安全地執行它,而不會有與現有工作區資產或其他使用者衝突的風險。 執行此程式碼時,限制的網路或儲存體權限會引發錯誤;請連絡您的工作區系統管理員,以針對這些限制進行疑難排解。


from pyspark.sql.functions import col

# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"

spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")

spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")

# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)

# Define a class to load batches of data to source
class LoadData:

    def __init__(self, source):
        self.source = source

    def get_date(self):
        try:
            df = spark.read.format("json").load(source)
        except:
            return "2016-01-01"
        batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
        if batch_date.month == 3:
            raise Exception("Source data exhausted")
        return batch_date

    def get_batch(self, batch_date):
        return (
            spark.table("samples.nyctaxi.trips")
            .filter(col("tpep_pickup_datetime").cast("date") == batch_date)
        )

    def write_batch(self, batch):
        batch.write.format("json").mode("append").save(self.source)

    def land_batch(self):
        batch_date = self.get_date()
        batch = self.get_batch(batch_date)
        self.write_batch(batch)

RawData = LoadData(source)

您現在可以將下列程式碼複製到儲存格中,然後執行,以載入一批資料。 您可以手動執行此儲存格,最多 60 次以觸發新的資料抵達。

RawData.land_batch()

步驟 4:設定自動載入器以將資料內嵌至 Unity 目錄

Databricks 建議使用 Delta Lake 儲存資料。 Delta Lake 是一個開放原始碼儲存體層,可提供 ACID 交易並啟用資料湖存放庫。 Delta Lake 是在 Databricks 中建立的資料表預設格式。

若要將自動載入器設定為將資料內嵌至 Unity 目錄資料表,請將下列程式碼複製並貼到筆記本中的空白儲存格:

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(source)
  .select("*", col("_metadata.source").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .option("mergeSchema", "true")
  .toTable(table))

若要深入了解自動載入器,請參閱什麼是自動載入器?

若要深入瞭解使用 Unity 目錄進行結構化串流,請參閱搭配結構化串流使用 Unity 目錄

步驟 5:處理資料並與之互動

筆記本會逐個儲存格執行邏輯。 使用這些步驟以執行儲存格中的邏輯:

  1. 若要執行您在上一個步驟中完成的儲存格,請選取儲存格,然後按 SHIFT+ENTER

  2. 若要查詢您剛建立的資料表,請將下列程式碼複製並貼入空白儲存格,然後按 SHIFT+ENTER 以執行儲存格。

    df = spark.read.table(table)
    
  3. 若要預覽 DataFrame 中的資料,請將下列程式碼複製並貼到空白儲存格中,然後按 SHIFT+ENTER 以執行儲存格。

    display(df)
    

若要深入瞭解可視化資料的互動式選項,請參閱 Databricks 筆記本中的視覺效果

步驟 6:排程作業

您可以將 Databricks 筆記本新增為 Databricks 作業中的一個任務,從而將其作為生產指令碼運行。 在此步驟中,您將建立可手動觸發的新作業。

若要將筆記本排程為工作:

  1. 按一下標題列右側的 [排程]
  2. 針對 [作業名稱] 輸入唯一名稱。
  3. 按一下 [手動]
  4. 在 [叢集] 下拉式清單中,選取您在步驟 1 中建立的叢集。
  5. 按一下 [建立]。
  6. 在出現的視窗中,按一下 [立即執行]
  7. 若要查看作業執行結果,請按一下 [上次執行時間戳]旁邊的 外部連結 圖示。

如需作業的詳細資訊,請參閱 什麼是作業?

步驟 7:從 Databricks SQL 查詢資料表

具有目前目錄的 USE CATALOG 權限、目前結構描述的 USE SCHEMA 權限和資料表的 SELECT 權限之任何人,都可以從慣用的 Databricks API 查詢資料表的內容。

您需要存取執行中的 SQL 倉儲,才能在 Databricks SQL 中執行查詢。

您稍早在本教學課程中建立的資料表名稱為 target_table。 您可以使用您在第一個儲存格中提供的目錄,以及具有模式 e2e_lakehouse_<your-username> 的資料庫來查詢它。 您可以使用目錄總管來尋找您所建立的資料物件。

其他集成

深入瞭解使用 Azure Databricks 進行資料工程的整合和工具: