共用方式為


教學課程:COPY INTO with Spark SQL

Databricks 建議您針對包含數千個檔案的數據源,使用 COPY INTO 命令進行增量和大量數據載入。 Databricks 建議您針對進階使用案例使用 自動載入器

在本教學課程中,您會使用 COPY INTO 命令,將數據從雲端物件記憶體載入 Azure Databricks 工作區中的 table。

要求

  1. Azure 訂用帳戶、該訂用帳戶中的 Azure Databricks 工作區,以及該工作區中的叢集。 若要建立這些專案,請參閱 快速入門:使用 Azure 入口網站在 Azure Databricks 工作區上執行 Spark 作業。 如果您遵循本快速入門,則不需要遵循 執行Spark SQL作業 一節中的指示。
  2. 您的工作區中有一個多用途叢集 ,執行 Databricks Runtime 11.3 LTS 或更新版本的。 若要建立所有用途的叢集,請參閱計算組態參考。
  3. 熟悉 Azure Databricks 工作區用戶介面。 請參閱 瀏覽工作區
  4. 熟悉使用 Databricks 筆記本
  5. 您可以寫入數據的位置;此示範使用 DBFS 根目錄作為範例,但 Databricks 建議使用 Unity Catalog設定的外部儲存位置。

步驟 1. 設定您的環境並建立數據產生器

本教學課程假設您已熟悉 Azure Databricks 和預設工作區設定。 如果您無法執行所提供的程式代碼,請連絡工作區管理員,以確定您可以存取計算資源,以及您可以寫入數據的位置。

請注意,所提供的程式代碼會使用 source 參數來指定您要設定為 COPY INTO 數據源的位置。 撰寫時,此程式代碼會指向 DBFS 根目錄上的位置。 如果您有外部物件儲存位置的寫入許可權,請將來源字串的 dbfs:/ 部分取代為物件記憶體的路徑。 由於此程式代碼區塊也會執行遞歸刪除,以 reset 此示範,請確定您不會將此數據指向生產數據,而且您保留 /user/{username}/copy-into-demo 巢狀目錄,以避免覆寫或刪除現有的數據。

  1. 建立新的 SQL 筆記本將它附加至執行 Databricks Runtime 11.3 LTS 或更新版本之叢集

  2. 複製並執行下列代碼,以 reset 本教學課程中使用的儲存位置和資料庫:

    %python
    # Set parameters for isolation in workspace and reset demo
    
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    dbutils.fs.rm(source, True)
    
  3. 複製並執行以下程式碼來配置一些將用於隨機生成 generate 資料的 tables 和函數:

    -- Configure random data generator
    
    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};
    
    CREATE TABLE user_ids (user_id STRING);
    
    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");
    
    CREATE FUNCTION get_ping()
        RETURNS INT
        RETURN int(rand() * 250);
    
    CREATE FUNCTION is_active()
        RETURNS BOOLEAN
        RETURN CASE
            WHEN rand() > .25 THEN true
            ELSE false
            END;
    

步驟 2:將範例數據寫入雲端記憶體

在 Azure Databricks 上,寫入 Delta Lake 以外的數據格式並不罕見。 此處提供的程式代碼會將資料寫入 JSON,模擬一個可能將其他系統的結果匯入物件儲存的外部系統。

  1. 複製並執行下列程式代碼,以撰寫一批原始 JSON 數據:

    -- Write a new batch of data to the data source
    
    INSERT INTO user_ping_raw
    SELECT *,
      get_ping() ping,
      current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;
    

步驟 3:使用 COPY INTO 以等冪方式載入 JSON 數據

您必須先建立目標 Delta Lake table,才能使用 COPY INTO。 在 Databricks Runtime 11.3 LTS 和更高版本中,您只需在 CREATE TABLE 語句中提供 table 名稱,不需要其他任何內容。 針對舊版的 Databricks Runtime,您必須在建立空白 table時提供 schema。

  1. 複製並執行下列程式代碼,以建立您的目標 Delta table 並從來源載入資料:

    -- Create target table and load data
    
    CREATE TABLE IF NOT EXISTS user_ping_target;
    
    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")
    

因為此動作是等冪的,所以您可以多次執行,但數據只會載入一次。

步驟 4:預覽 table 的內容

您可以執行簡單的 SQL 查詢,以手動檢閱此 table的內容。

  1. 複製並執行下列程式代碼,以預覽您的 table:

    -- Review updated table
    
    SELECT * FROM user_ping_target
    

步驟 5:載入更多數據和預覽結果

您可以多次重新執行步驟 2-4,將新的隨機原始 JSON 數據批次放入來源,然後使用 COPY INTO將其等冪地加載到 Delta Lake,最後預覽結果。 嘗試不按順序或多次執行這些步驟,以模擬寫入多個批次的原始數據或多次執行 COPY INTO,而不需要 having 有新數據到達。

步驟 6:整理教學課程

當您完成本教學課程時,如果您不想再保留相關聯的資源,則可以清除這些資源。

  1. 複製並執行下列程序代碼,以卸除資料庫、tables和 remove 所有數據:

    %python
    # Drop database and tables and remove data
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    dbutils.fs.rm(source, True)
    
  2. 若要停止計算資源,請移至 [叢集] 索引卷標,然後 您的叢集終止。

其他資源