共用方式為


在 Databricks 中建置端對端資料管線

本文說明如何建立和部署端對端資料處理管線,包括如何內嵌未經處理的資料、轉換資料,以及對已處理的資料執行分析。

注意

雖然本文示範如何使用 Databricks 筆記本和 Azure Databricks 工作來建立完整的資料管線,以協調工作流程,但 Databricks 建議使用 Delta Live Tables,這是一個宣告式介面,可用來建置可靠、可維護且可測試的資料處理管線。

什麼是資料管線?

資料管線會實作從來源系統移動資料、根據需求轉換資料,以及將資料儲存在目標系統中所需的步驟。 資料管線包含將原始資料轉換成使用者可取用之備妥資料所需的所有程序。 例如,資料管線可能會準備資料,讓資料分析師和資料科學家可以透過分析和報告從資料擷取價值。

擷取、轉換和載入 (ETL) 工作流程是資料管線的常見範例。 在 ETL 處理中,資料會從來源系統擷取並寫入暫存區域、根據需求轉換(確保資料品質、重複資料刪除記錄等等),然後寫入至資料倉儲或資料湖等目標系統。

資料管線步驟

為了協助您開始在 Azure Databricks 上建置資料管線,本文中包含的範例會逐步引導您建立資料處理工作流程:

  • 使用 Azure Databricks 功能來探索原始資料集。
  • 建立 Databricks 筆記本以內嵌原始來源資料,並將原始資料寫入目標資料表。
  • 建立 Databricks 筆記本來轉換原始來源資料,並將轉換的資料寫入目標資料表。
  • 建立 Databricks 筆記本來查詢轉換資料。
  • 使用 Azure Databricks 工作將資料管線自動化。

需求

  • 您已登入 Azure Databricks,並在資料科學與工程工作區中。
  • 您有權建立叢集存取叢集
  • (選用)若要將資料表發佈至 Unity 目錄,您必須在 Unity 目錄中建立目錄結構描述

範例:百萬首歌資料集

此範例中使用的資料集是百萬首歌資料集的子集,這是當代音樂曲目的特徵和中繼資料集合。 此資料集可在 Azure Databricks 工作區中包含的範例資料集中使用。

步驟 1:建立叢集

若要在此範例中執行資料處理和分析,請建立叢集以提供執行命令所需的計算資源。

注意

由於此範例會使用儲存在 DBFS 中的範例資料集,並建議將資料表儲存到 Unity 目錄,因此您會建立以單一使用者存取模式設定的叢集。 單一使用者存取模式提供 DBFS 的完整存取權,同時啟用 Unity 目錄的存取權。 請參閱適用於 DBFS 和 Unity 目錄的最佳做法

  1. 按一下側邊欄中的 [計算]
  2. 在 [計算] 頁面上,按一下 [建立叢集]。
  3. 在叢集名稱頁面,輸入叢集的唯一名稱。
  4. 在 [存取模式] 中,選取 [單一使用者]
  5. 在 [單一使用者或服務主體存取] 中,選取您的使用者名稱。
  6. 保留其餘值的預設狀態,並按一下 [建立叢集]

若要深入瞭解 Databricks 叢集,請參閱計算

步驟 2:探索來來源資料

若要瞭解如何使用 Azure Databricks 介面來探索原始來源資料,請參閱探索資料管線的來源資料。 如果您想要直接移至內嵌和準備資料,請繼續進行步驟 3:內嵌原始資料

步驟 3:內嵌原始資料

在此步驟中,您會將原始資料載入資料表,使其可供進一步處理。 若要管理 Databricks 平台上的資料資產,例如資料表,Databricks 建議 Unity 目錄。 不過,如果您沒有建立必要目錄和結構描述以將資料表發行至 Unity 目錄的權限,您仍然可以將資料表發佈至 Hive 中繼存放區來完成下列步驟。

若要內嵌資料,Databricks 建議使用自動載入器。 自動載入器會在新檔案抵達雲端物件儲存體時自動偵測並處理。

您可以設定自動載入器來自動偵測已載入資料的結構描述,讓您不需要明確宣告資料結構描述並隨著新資料行導入而演進資料表結構描述,即可初始化資料表。 這樣就不需要在一段時間內手動追蹤和套用結構描述變更。 Databricks 在使用自動載入器時建議結構描述推斷。 不過,如資料探索步驟所示,歌曲資料不包含標頭資訊。 因為標頭未與資料一起儲存,因此您必須明確地定義結構描述,如下一個範例所示。

  1. 在側邊欄中,按下 新增圖示 [新增],然後從功能表中選取 [筆記本]。 [建立筆記本] 對話方塊隨即出現。

  2. 輸入筆記本的名稱,例如 Ingest songs data。 預設情況:

    • Python 是選取的語言。
    • 筆記本會附加至您所使用的最後一個叢集。 在此情況下,您在步驟 1:建立叢集中建立的叢集。
  3. 在筆記本的第一個儲存格中輸入下列內容:

    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define variables used in the code below
    file_path = "/databricks-datasets/songs/data-001/"
    table_name = "<table-name>"
    checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data"
    
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    (spark.readStream
      .format("cloudFiles")
      .schema(schema)
      .option("cloudFiles.format", "csv")
      .option("sep","\t")
      .load(file_path)
      .writeStream
      .option("checkpointLocation", checkpoint_path)
      .trigger(availableNow=True)
      .toTable(table_name)
    )
    

    如果您使用 Unity 目錄,請將 <table-name> 取代為目錄、結構描述和資料表名稱,以包含內嵌的記錄(例如,data_pipelines.songs_data.raw_song_data)。 否則,請將 <table-name> 取代為包含內嵌記錄的資料表名稱,例如,raw_song_data

    <checkpoint-path> 取代為 DBFS 中目錄的路徑 (例如,/tmp/pipeline_get_started/_checkpoint/song_data) 以維護檢查點檔案。

  4. 按下 執行功能表,然後選取 [執行儲存格]。 此範例會使用 README 中的資訊定義資料結構描述,內嵌 file_path 中所含所有檔案的歌曲資料,並將資料寫入 table_name 指定的資料表。

步驟 4:準備原始資料

若要準備原始資料以供分析,下列步驟會篩選掉不必要的資料行,並新增包含新記錄建立時間戳記的新欄位,以轉換原始歌曲資料。

  1. 在側邊欄中,按下 新增圖示 [新增],然後從功能表中選取 [筆記本]。 [建立筆記本] 對話方塊隨即出現。

  2. 輸入筆記本的名稱。 例如: Prepare songs data 。 將預設語言變更為 SQL

  3. 在筆記本的第一個儲存格中輸入下列內容:

    CREATE OR REPLACE TABLE
      <table-name> (
        artist_id STRING,
        artist_name STRING,
        duration DOUBLE,
        release STRING,
        tempo DOUBLE,
        time_signature DOUBLE,
        title STRING,
        year DOUBLE,
        processed_time TIMESTAMP
      );
    
    INSERT INTO
      <table-name>
    SELECT
      artist_id,
      artist_name,
      duration,
      release,
      tempo,
      time_signature,
      title,
      year,
      current_timestamp()
    FROM
      <raw-songs-table-name>
    

    如果您使用 Unity 目錄,請將 <table-name> 取代為目錄、結構描述和資料表名稱,以包含篩選和轉換的記錄(例如,data_pipelines.songs_data.prepared_song_data)。 否則,請將 <table-name> 取代 為資料表的名稱,以包含已篩選和轉換的記錄(例如,prepared_song_data)。

    將取代 <raw-songs-table-name> 為包含上一個步驟中內嵌之原始歌曲記錄的資料表名稱。

  4. 按下 執行功能表,然後選取 [執行儲存格]

步驟 5:查詢轉換的資料

在此步驟中,您會新增查詢來分析歌曲資料,以擴充處理管線。 這些查詢會使用在上一個步驟中建立的備妥記錄。

  1. 在側邊欄中,按下 新增圖示 [新增],然後從功能表中選取 [筆記本]。 [建立筆記本] 對話方塊隨即出現。

  2. 輸入筆記本的名稱。 例如: Analyze songs data 。 將預設語言變更為 SQL

  3. 在筆記本的第一個儲存格中輸入下列內容:

    -- Which artists released the most songs each year?
    SELECT
      artist_name,
      count(artist_name)
    AS
      num_songs,
      year
    FROM
      <prepared-songs-table-name>
    WHERE
      year > 0
    GROUP BY
      artist_name,
      year
    ORDER BY
      num_songs DESC,
      year DESC
    

    將取代 <prepared-songs-table-name> 為包含備妥資料的資料表名稱。 例如: data_pipelines.songs_data.prepared_song_data

  4. 按下儲存格動作功能表中的 向下插入點,選取 [新增下方儲存格],在新的存儲格中輸入下列內容:

     -- Find songs for your DJ list
     SELECT
       artist_name,
       title,
       tempo
     FROM
       <prepared-songs-table-name>
     WHERE
       time_signature = 4
       AND
       tempo between 100 and 140;
    

    使用您在先前步驟中建立的 JSON 檔案名稱來取代 <prepared-songs-table-name>。 例如: data_pipelines.songs_data.prepared_song_data

  5. 若要執行查詢並檢視輸出,請按下 [全部執行]

步驟 6:建立 Azure Databricks 工作以執行管線

您可以建立工作流程,以使用 Azure Databricks 工作自動執行資料擷取、處理和分析步驟。

  1. 在資料科學與工程工作區中,執行下列其中一項:
    • 按下側邊欄中的 工作流程圖示 [工作流程],然後按下 [建立作業] 按鈕
    • 在側邊欄中,按下 新增圖示 [新增],然後選取 [作業]
  2. 在 [工作] 索引標籤上的 [工作] 對話方塊中,將 [新增工作的名稱...] 取代為您的工作名稱。 例如,「歌曲工作流程」。
  3. 在 [任務名稱] 欄位中,輸入第一個任務的名稱,例如 Ingest_songs_data
  4. 在 [類型] 中,選取 [筆記本] 工作類型。
  5. 在 [來源] 中,選取 [工作區]
  6. 使用檔案瀏覽器尋找資料內嵌筆記本、按下筆記本名稱,然後按下 [確認]
  7. 在 [叢集] 中,選取 [Shared_job_cluster] 或您在 Create a cluster 步驟中建立的叢集。
  8. 按一下 [建立]。
  9. 按下您剛才建立之工作下方的 [新增工作] 按鈕,然後選取 [筆記本]
  10. 在 [工作名稱] 欄位中,輸入工作名稱,例如 Prepare_songs_data
  11. 在 [類型] 中,選取 [筆記本] 工作類型。
  12. 在 [來源] 中,選取 [工作區]
  13. 使用檔案瀏覽器尋找資料準備筆記本、按下筆記本名稱,然後按下 [確認]
  14. 在 [叢集] 中,選取 [Shared_job_cluster] 或您在 Create a cluster 步驟中建立的叢集。
  15. 按一下 [建立]。
  16. 按下您剛才建立之工作下方的 [新增工作] 按鈕,然後選取 [筆記本]
  17. 在 [工作名稱] 欄位中,輸入工作名稱,例如 Analyze_songs_data
  18. 在 [類型] 中,選取 [筆記本] 工作類型。
  19. 在 [來源] 中,選取 [工作區]
  20. 使用檔案瀏覽器尋找資料分析筆記本、按下筆記本名稱,然後按下 [確認]
  21. 在 [叢集] 中,選取 [Shared_job_cluster] 或您在 Create a cluster 步驟中建立的叢集。
  22. 按一下 [建立]。
  23. 按下[立即執行] 按鈕以執行工作流程。 若要檢視 [執行的詳細資料],請按下作業執行檢視中執行之 [開始時間] 資料行中的連結。 按下每個工作以檢視工作執行的詳細資料。
  24. 若要在工作流程完成時檢視結果,請按下最終資料分析工作。 [輸出] 頁面隨即出現,並顯示查詢結果。

步驟 7:排程資料管線工作

注意

為了示範如何使用 Azure Databricks 工作來協調排程的工作流程,本快速入門範例會將擷取、準備和分析步驟分成不同的筆記本,然後每個筆記本會用來在工作中建立工作。 如果所有處理都包含在單一筆記本中,您可以直接從 Azure Databricks Notebook UI 排程筆記本。 請參閱建立及管理排程作業

常見的需求是依排程執行資料管線。 若要定義執行管線之工作的排程:

  1. 按一下側邊欄中的 工作流程圖示 [工作流程]
  2. 在 [名稱] 資料行,按下工作名稱。 側邊面板會顯示 [工作詳細資料]。
  3. 按下 [工作詳細資料] 面板中的 [新增觸發程式],然後選取 [觸發程式類型] 中的 [排程]
  4. 指定期間、開始時間和時區。 您可以選用地選取 [顯示 cron 語法] 核取方塊,在 [Quartz Cron 語法] 中編輯排程。
  5. 按一下 [檔案] 。

深入了解