教學課程:COPY INTO with Spark SQL
Databricks 建議您針對包含數千個檔案的數據源,使用 COPY INTO 命令進行增量和大量數據載入。 Databricks 建議您針對進階使用案例使用 自動載入器。
在本教學課程中,您會使用 COPY INTO
命令,將數據從雲端物件記憶體載入 Azure Databricks 工作區中的 table。
要求
- Azure 訂用帳戶、該訂用帳戶中的 Azure Databricks 工作區,以及該工作區中的叢集。 若要建立這些專案,請參閱 快速入門:使用 Azure 入口網站在 Azure Databricks 工作區上執行 Spark 作業。 如果您遵循本快速入門,則不需要遵循 執行Spark SQL作業 一節中的指示。
- 您的工作區中有一個多用途叢集 ,執行 Databricks Runtime 11.3 LTS 或更新版本的。 若要建立所有用途的叢集,請參閱
計算組態參考。 - 熟悉 Azure Databricks 工作區用戶介面。 請參閱 瀏覽工作區。
- 熟悉使用 Databricks 筆記本。
- 您可以寫入數據的位置;此示範使用 DBFS 根目錄作為範例,但 Databricks 建議使用 Unity Catalog設定的外部儲存位置。
步驟 1. 設定您的環境並建立數據產生器
本教學課程假設您已熟悉 Azure Databricks 和預設工作區設定。 如果您無法執行所提供的程式代碼,請連絡工作區管理員,以確定您可以存取計算資源,以及您可以寫入數據的位置。
請注意,所提供的程式代碼會使用 source
參數來指定您要設定為 COPY INTO
數據源的位置。 撰寫時,此程式代碼會指向 DBFS 根目錄上的位置。 如果您有外部物件儲存位置的寫入許可權,請將來源字串的 dbfs:/
部分取代為物件記憶體的路徑。 由於此程式代碼區塊也會執行遞歸刪除,以 reset 此示範,請確定您不會將此數據指向生產數據,而且您保留 /user/{username}/copy-into-demo
巢狀目錄,以避免覆寫或刪除現有的數據。
複製並執行下列代碼,以 reset 本教學課程中使用的儲存位置和資料庫:
%python # Set parameters for isolation in workspace and reset demo username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0] database = f"copyinto_{username}_db" source = f"dbfs:/user/{username}/copy-into-demo" spark.sql(f"SET c.username='{username}'") spark.sql(f"SET c.database={database}") spark.sql(f"SET c.source='{source}'") spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") spark.sql("CREATE DATABASE ${c.database}") spark.sql("USE ${c.database}") dbutils.fs.rm(source, True)
複製並執行以下程式碼來配置一些將用於隨機生成 generate 資料的 tables 和函數:
-- Configure random data generator CREATE TABLE user_ping_raw (user_id STRING, ping INTEGER, time TIMESTAMP) USING json LOCATION ${c.source}; CREATE TABLE user_ids (user_id STRING); INSERT INTO user_ids VALUES ("potato_luver"), ("beanbag_lyfe"), ("default_username"), ("the_king"), ("n00b"), ("frodo"), ("data_the_kid"), ("el_matador"), ("the_wiz"); CREATE FUNCTION get_ping() RETURNS INT RETURN int(rand() * 250); CREATE FUNCTION is_active() RETURNS BOOLEAN RETURN CASE WHEN rand() > .25 THEN true ELSE false END;
步驟 2:將範例數據寫入雲端記憶體
在 Azure Databricks 上,寫入 Delta Lake 以外的數據格式並不罕見。 此處提供的程式代碼會將資料寫入 JSON,模擬一個可能將其他系統的結果匯入物件儲存的外部系統。
複製並執行下列程式代碼,以撰寫一批原始 JSON 數據:
-- Write a new batch of data to the data source INSERT INTO user_ping_raw SELECT *, get_ping() ping, current_timestamp() time FROM user_ids WHERE is_active()=true;
步驟 3:使用 COPY INTO 以等冪方式載入 JSON 數據
您必須先建立目標 Delta Lake table,才能使用 COPY INTO
。 在 Databricks Runtime 11.3 LTS 和更高版本中,您只需在 CREATE TABLE
語句中提供 table 名稱,不需要其他任何內容。 針對舊版的 Databricks Runtime,您必須在建立空白 table時提供 schema。
複製並執行下列程式代碼,以建立您的目標 Delta table 並從來源載入資料:
-- Create target table and load data CREATE TABLE IF NOT EXISTS user_ping_target; COPY INTO user_ping_target FROM ${c.source} FILEFORMAT = JSON FORMAT_OPTIONS ("mergeSchema" = "true") COPY_OPTIONS ("mergeSchema" = "true")
因為此動作是等冪的,所以您可以多次執行,但數據只會載入一次。
步驟 4:預覽 table 的內容
您可以執行簡單的 SQL 查詢,以手動檢閱此 table的內容。
複製並執行下列程式代碼,以預覽您的 table:
-- Review updated table SELECT * FROM user_ping_target
步驟 5:載入更多數據和預覽結果
您可以多次重新執行步驟 2-4,將新的隨機原始 JSON 數據批次放入來源,然後使用 COPY INTO
將其等冪地加載到 Delta Lake,最後預覽結果。 嘗試不按順序或多次執行這些步驟,以模擬寫入多個批次的原始數據或多次執行 COPY INTO
,而不需要 having 有新數據到達。
步驟 6:整理教學課程
當您完成本教學課程時,如果您不想再保留相關聯的資源,則可以清除這些資源。
複製並執行下列程序代碼,以卸除資料庫、tables和 remove 所有數據:
%python # Drop database and tables and remove data spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") dbutils.fs.rm(source, True)
若要停止計算資源,請移至 [叢集] 索引卷標,然後 您的叢集終止。
其他資源
- COPY INTO 參考文章