共用方式為


教學課程:執行端對端 Lakehouse 分析管線

本教學課程說明如何為 Azure Databricks Lakehouse 設定端對端分析管線。

重要

本教學課程使用互動式筆記本來完成 Python 或 Unity 目錄中的常見 ETL 工作。 如果您未使用 Unity 目錄,請參閱在 Azure Databricks 上執行您的第一個 ETL 工作負載

本教學課程中的工作

本文結束時,您會熟練掌握:

  1. 啟動已啟用 Unity 目錄的計算叢集
  2. 建立 Databricks 筆記本
  3. 從 Unity 目錄外部位置寫入和讀取資料
  4. 使用自動載入器設定 Unity 目錄的累加式資料擷取
  5. 執行筆記本儲存格來處理、查詢和預覽資料
  6. 將筆記本排程為 Databricks 作業
  7. 從 Databricks SQL 查詢 Unity 目錄資料表

Azure Databricks 提供一套生產就緒的工具,可讓資料專業人員快速開發及部署擷取、轉換和載入 (ETL) 管線。 Unity 目錄可讓資料管理人為整個組織的使用者設定及保護儲存體認證、外部位置和資料庫物件的安全。 Databricks SQL 可讓分析師針對生產 ETL 工作負載中使用的相同資料表執行 SQL 查詢,以大規模執行即時商業智慧。

您也可以使用差異即時資料表來建置 ETL 管線。 Databricks 建立了差異即時資料表,以減少建置、部署和維護生產 ETL 管線的複雜性。 請參閱教學課程:執行您的第一個差異即時資料表管線

需求

注意

如果您沒有叢集控制權限,只要您可以存取叢集,就仍然可以完成下列大多數步驟。

步驟 1:建立叢集

若要進行探勘資料分析和資料工程,請建立叢集以提供執行命令所需的計算資源。

  1. 在側邊欄中按下 計算圖示 [計算]
  2. 按下側邊欄中的 新增圖示 [新增],然後選取 [叢集]。 這會開啟 [新增叢集/計算] 頁面。
  3. 指定叢集的唯一名稱。
  4. 選取 [單一節點] 圓形按鈕。
  5. 從 [存取模式] 下拉式清單中選取 [單一使用者]
  6. 請確定您的電子郵件地址會顯示在 [單一使用者] 欄位中。
  7. 選取所需的 Databricks Runtime 版本 11.1 或更新版本,以使用 Unity 目錄。
  8. 按下 [建立計算] 以建立叢集。

若要深入瞭解 Databricks 叢集,請參閱計算

步驟 2:建立 Databricks 筆記本

若要在工作區中建立筆記本,請按一下提要欄位的 新增圖示 新增 ,然後按一下 筆記本。 空白筆記本會在工作區中開啟。

若要深入瞭解如何建立並管理筆記本,請參閱 管理筆記本

步驟 3:從 Unity 目錄所管理的外部位置寫入和讀取資料

Databricks 建議使用自動載入器進行累加式資料擷取。 自動載入器會在新檔案抵達雲端物件儲存體時自動偵測並處理。

使用 Unity 目錄來管理外部位置的安全存取。 具有 READ FILES 外部位置權限的使用者或服務主體可以使用自動載入器來內嵌資料。

一般而言,資料會因為從其他系統寫入而抵達外部位置。 在此示範中,您可以將 JSON 檔案寫出至外部位置,以模擬資料抵達。

將下列程式碼複製到筆記本儲存格。 將 catalog 的字串值取代為具有 CREATE CATALOGUSE CATALOG 權限的目錄名稱。 將 external_location 的字串值取代為 READ FILESWRITE FILESCREATE EXTERNAL TABLE 權限的外部位置路徑。

外部位置可以定義為整個儲存體容器,但通常會指向容器中巢狀目錄。

外部位置路徑的正確格式為 "abfss://container_name@storage_account.dfs.core.windows.net/path/to/external_location"


 external_location = "<your-external-location>"
 catalog = "<your-catalog>"

 dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
 display(dbutils.fs.head(f"{external_location}/filename.txt"))
 dbutils.fs.rm(f"{external_location}/filename.txt")

 display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))

執行此儲存格應該列印一行,讀取 12 個位元組、列印字串 “Hello world!”,並顯示目錄中提供的所有資料庫。 如果您無法執行此儲存格,請確認您位於已啟用 Unity 目錄的工作區中,並要求工作區系統管理員的適當權限,以完成本教學課程。

下列 Python 程式碼會使用您的電子郵件位址,在提供的目錄中建立唯一資料庫,以及在提供的外部位置中建立唯一的儲存位置。 執行此儲存格將會移除與本教學課程相關聯的所有資料,讓您能夠以等冪方式執行此範例。 類別已定義並具現化,您將用來模擬從連線系統到來源外部位置的資料批次。

將此程式碼複製到筆記本中的新儲存格,並加以執行以設定您的環境。

注意

此程式碼中定義的變數應該可讓您安全地執行它,而不會有與現有工作區資產或其他使用者衝突的風險。 執行此程式碼時,限制的網路或儲存體權限會引發錯誤;請連絡您的工作區系統管理員,以針對這些限制進行疑難排解。


from pyspark.sql.functions import col

# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"

spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")

spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")

# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)

# Define a class to load batches of data to source
class LoadData:

    def __init__(self, source):
        self.source = source

    def get_date(self):
        try:
            df = spark.read.format("json").load(source)
        except:
            return "2016-01-01"
        batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
        if batch_date.month == 3:
            raise Exception("Source data exhausted")
        return batch_date

    def get_batch(self, batch_date):
        return (
            spark.table("samples.nyctaxi.trips")
            .filter(col("tpep_pickup_datetime").cast("date") == batch_date)
        )

    def write_batch(self, batch):
        batch.write.format("json").mode("append").save(self.source)

    def land_batch(self):
        batch_date = self.get_date()
        batch = self.get_batch(batch_date)
        self.write_batch(batch)

RawData = LoadData(source)

您現在可以將下列程式碼複製到儲存格中並加以執行,來登陸一批資料。 您可以手動執行此儲存格,最多 60 次以觸發新的資料抵達。

RawData.land_batch()

步驟 4:設定自動載入器以將資料內嵌至 Unity 目錄

Databricks 建議使用 Delta Lake 儲存資料。 Delta Lake 是一個開放原始碼儲存體層,可提供 ACID 交易並啟用資料湖存放庫。 Delta Lake 是在 Databricks 中建立的資料表預設格式。

若要將自動載入器設定為將資料內嵌至 Unity 目錄資料表,請將下列程式碼複製並貼到筆記本中的空白儲存格:

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(source)
  .select("*", col("_metadata.source").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .option("mergeSchema", "true")
  .toTable(table))

若要深入了解自動載入器,請參閱什麼是自動載入器?

若要深入瞭解使用 Unity 目錄進行結構化串流,請參閱搭配結構化串流使用 Unity 目錄

步驟 5:處理資料並與之互動

筆記本會逐個儲存格執行邏輯。 使用這些步驟以執行儲存格中的邏輯:

  1. 若要執行您在上一個步驟中完成的儲存格,請選取儲存格,然後按 SHIFT+ENTER

  2. 若要查詢您剛建立的資料表,請將下列程式碼複製並貼入空白儲存格,然後按 SHIFT+ENTER 以執行儲存格。

    df = spark.read.table(table)
    
  3. 若要預覽 DataFrame 中的資料,請將下列程式碼複製並貼到空白儲存格中,然後按 SHIFT+ENTER 以執行儲存格。

    display(df)
    

若要深入瞭解可視化資料的互動式選項,請參閱 Databricks 筆記本中的視覺效果

步驟 6:排程作業

您可以透過將 Databricks 筆記本新增為 Databricks 作業中的工作,來執行 Databricks 筆記本作為生產指令碼。 在此步驟中,您將建立可手動觸發的新作業。

若要將筆記本排程為工作:

  1. 按一下標題列右側的 [排程]
  2. 針對 [作業名稱] 輸入唯一名稱。
  3. 按一下 [手動]
  4. 在 [叢集] 下拉式清單中,選取您在步驟 1 中建立的叢集。
  5. 按一下 [建立]。
  6. 在出現的視窗中,按一下 [立即執行]
  7. 若要查看作業執行結果,請按一下 [上次執行] 時間戳旁的外部連結圖示。

如需工作的詳細資訊,請參閱什麼是 Databricks 工作?

步驟 7:從 Databricks SQL 查詢資料表

具有目前目錄的 USE CATALOG 權限、目前結構描述的 USE SCHEMA 權限和資料表的 SELECT 權限之任何人,都可以從慣用的 Databricks API 查詢資料表的內容。

您需要存取執行中的 SQL 倉儲,才能在 Databricks SQL 中執行查詢。

您稍早在本教學課程中建立的資料表名稱為 target_table。 您可以使用您在第一個儲存格中提供的目錄,以及具有模式 e2e_lakehouse_<your-username> 的資料庫來查詢它。 您可以使用目錄總管來尋找您所建立的資料物件。

其他整合

深入瞭解使用 Azure Databricks 進行資料工程的整合和工具: