開始使用 COPY INTO 載入數據
COPY INTO
SQL 命令可讓您將資料從檔案位置載入 Delta 資料表。 這是可重複使用且等冪的作業;已載入來源位置中的檔案會略過。
COPY INTO
提供下列功能:
- 可以輕鬆設定的文件或目錄篩選器,來自雲端儲存,包括 S3、ADLS Gen2、ABFS、GCS 和 Unity Catalog 磁碟區。
- 支援多種原始程式檔格式:CSV、JSON、XML、 Avro、 ORC、 Parquet、文字和二進位檔
- 依預設,完全一次 (等冪) 檔案處理
- 目標數據表架構推斷、對應、合併和演進
注意
如需更可調整且健全的檔案擷取體驗,Databricks 建議 SQL 使用者利用串流數據表。 請參閱 在 Databricks SQL中使用串流數據表載入數據。
警告
COPY INTO
會遵守刪除向量的工作區設定。 如果啟用,當 COPY INTO
在 SQL 倉儲上執行或執行 Databricks Runtime 14.0 或更新版本的計算時,就會在目標數據表上啟用刪除向量。 啟用之後,刪除向量會封鎖 Databricks Runtime 11.3 LTS 和以下數據表的查詢。 請參閱 什麼是刪除向量? 和 自動啟用刪除向量。
需求
帳戶管理員必須遵循設定 數據存取以擷取 數據中的步驟,以設定雲端物件記憶體中數據的存取權,使用者才能使用 COPY INTO
載入數據。
範例:將數據載入無架構的 Delta Lake 數據表
注意
這項功能適用於 Databricks Runtime 11.3 LTS 和更新版本。
您可以建立空白佔位元 Delta 數據表,以便稍後在 COPY INTO
命令期間推斷架構,方法是將 mergeSchema
設定為 COPY_OPTIONS
中的 true
:
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
上述 SQL 語句是等冪性的,可以排程執行,將數據精確地匯入到 Delta 表格中。
注意
空的 Delta 資料表無法在 COPY INTO
之外使用。 不支援 INSERT INTO
和 MERGE INTO
將數據寫入無架構 Delta 表。 將數據插入具有 COPY INTO
的數據表之後,數據表就會變成可查詢。
請參閱 建立 COPY INTO的目標數據表。
範例:設定架構並將數據載入 Delta Lake 數據表
下列範例示範如何建立 Delta 數據表,然後使用 COPY INTO
SQL 命令,將 Databricks 數據集 的範例數據載入數據表。 您可以從連結至 Azure Databricks 叢集的筆記本執行範例 Python、R、Scala 或 SQL 程式代碼。 您也可以從 Databricks SQL 中與 SQL 倉儲相關聯的查詢執行 SQL 程式代碼。
SQL
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
Python
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
R
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
Scala
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
若要清除,請執行下列程式代碼,以刪除資料表:
Python
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
Scala
spark.sql("DROP TABLE " + table_name)
SQL
DROP TABLE default.loan_risks_upload
清除元數據檔案
您可以執行 VACUUM,以清除 Databricks Runtime 15.2 和更新版本中 COPY INTO
所建立的未參考元數據檔案。
參考
- Databricks Runtime 7.x 及以上版本:COPY INTO
其他資源
如需常見的使用模式,包括針對相同 Delta 數據表的多個
作業範例,請參閱使用 常見數據載入模式。