共用方式為


開始使用 COPY INTO 載入數據

COPY INTO SQL 命令可讓您將資料從檔案位置載入 Delta 資料表。 這是可重複使用且等冪的作業;已載入來源位置中的檔案會略過。

COPY INTO 提供下列功能:

  • 可以輕鬆設定的文件或目錄篩選器,來自雲端儲存,包括 S3、ADLS Gen2、ABFS、GCS 和 Unity Catalog 磁碟區。
  • 支援多種原始程式檔格式:CSV、JSON、XML、 AvroORCParquet、文字和二進位檔
  • 依預設,完全一次 (等冪) 檔案處理
  • 目標數據表架構推斷、對應、合併和演進

注意

如需更可調整且健全的檔案擷取體驗,Databricks 建議 SQL 使用者利用串流數據表。 請參閱 在 Databricks SQL中使用串流數據表載入數據。

警告

COPY INTO 會遵守刪除向量的工作區設定。 如果啟用,當 COPY INTO 在 SQL 倉儲上執行或執行 Databricks Runtime 14.0 或更新版本的計算時,就會在目標數據表上啟用刪除向量。 啟用之後,刪除向量會封鎖 Databricks Runtime 11.3 LTS 和以下數據表的查詢。 請參閱 什麼是刪除向量?自動啟用刪除向量

需求

帳戶管理員必須遵循設定 數據存取以擷取 數據中的步驟,以設定雲端物件記憶體中數據的存取權,使用者才能使用 COPY INTO載入數據。

範例:將數據載入無架構的 Delta Lake 數據表

注意

這項功能適用於 Databricks Runtime 11.3 LTS 和更新版本。

您可以建立空白佔位元 Delta 數據表,以便稍後在 COPY INTO 命令期間推斷架構,方法是將 mergeSchema 設定為 COPY_OPTIONS中的 true

CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

上述 SQL 語句是等冪性的,可以排程執行,將數據精確地匯入到 Delta 表格中。

注意

空的 Delta 資料表無法在 COPY INTO之外使用。 不支援 INSERT INTOMERGE INTO 將數據寫入無架構 Delta 表。 將數據插入具有 COPY INTO的數據表之後,數據表就會變成可查詢。

請參閱 建立 COPY INTO的目標數據表。

範例:設定架構並將數據載入 Delta Lake 數據表

下列範例示範如何建立 Delta 數據表,然後使用 COPY INTO SQL 命令,將 Databricks 數據集 的範例數據載入數據表。 您可以從連結至 Azure Databricks 叢集筆記本執行範例 Python、R、Scala 或 SQL 程式代碼。 您也可以從 Databricks SQL 中與 SQL 倉儲相關聯的查詢執行 SQL 程式代碼。

SQL

DROP TABLE IF EXISTS default.loan_risks_upload;

CREATE TABLE default.loan_risks_upload (
  loan_id BIGINT,
  funded_amnt INT,
  paid_amnt DOUBLE,
  addr_state STRING
);

COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;

SELECT * FROM default.loan_risks_upload;

-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0       | 1000        | 182.22    | CA         |
-- +---------+-------------+-----------+------------+
-- | 1       | 1000        | 361.19    | WA         |
-- +---------+-------------+-----------+------------+
-- | 2       | 1000        | 176.26    | TX         |
-- +---------+-------------+-----------+------------+
-- ...

Python

table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "loan_id BIGINT, " + \
  "funded_amnt INT, " + \
  "paid_amnt DOUBLE, " + \
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format
)

loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)

display(loan_risks_upload_data)

'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
'''

R

library(SparkR)
sparkR.session()

table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "loan_id BIGINT, ",
  "funded_amnt INT, ",
  "paid_amnt DOUBLE, ",
  "addr_state STRING)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  sep = ""
))

loan_risks_upload_data = tableToDF(table_name)

display(loan_risks_upload_data)

# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0       | 1000        | 182.22    | CA         |
# +---------+-------------+-----------+------------+
# | 1       | 1000        | 361.19    | WA         |
# +---------+-------------+-----------+------------+
# | 2       | 1000        | 176.26    | TX         |
# +---------+-------------+-----------+------------+
# ...

Scala

val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "loan_id BIGINT, " +
  "funded_amnt INT, " +
  "paid_amnt DOUBLE, " +
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format
)

val loan_risks_upload_data = spark.table(table_name)

display(loan_risks_upload_data)

/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
*/

若要清除,請執行下列程式代碼,以刪除資料表:

Python

spark.sql("DROP TABLE " + table_name)

R

sql(paste("DROP TABLE ", table_name, sep = ""))

Scala

spark.sql("DROP TABLE " + table_name)

SQL

DROP TABLE default.loan_risks_upload

清除元數據檔案

您可以執行 VACUUM,以清除 Databricks Runtime 15.2 和更新版本中 COPY INTO 所建立的未參考元數據檔案。

參考

  • Databricks Runtime 7.x 及以上版本:COPY INTO

其他資源