開始使用 COPY INTO 載入數據
COPY INTO
SQL 命令可讓您將資料從檔案位置載入 Delta 資料表。 這是一個可重試且冪等的操作,已經載入的來源位置中的檔案會被略過。
COPY INTO
提供下列功能:
- 從雲端記憶體輕鬆設定的檔案或目錄篩選,包括 S3、ADLS、ABFS、GCS 和 Unity 目錄磁碟區。
- 支援多種原始程式檔格式:CSV、JSON、XML、 Avro、 ORC、 Parquet、文字和二進位檔
- 依預設,具有一次性處理特性的(等冪)檔案處理
- 目標數據表架構推斷、對應、合併和演進
注意
如需更可調整且健全的檔案擷取體驗,Databricks 建議 SQL 使用者利用串流數據表。 請參閱 在 Databricks SQL中使用串流數據表載入數據。
警告
COPY INTO
會遵守刪除向量的工作區設定。 如果啟用,當 COPY INTO
在 SQL 倉儲上執行或執行 Databricks Runtime 14.0 或更新版本的計算時,就會在目標數據表上啟用刪除向量。 啟用之後,刪除向量功能會在 Databricks Runtime 11.3 LTS 與更低版本中封鎖表的查詢。 請參閱 什麼是刪除向量? 和 自動啟用刪除向量。
需求
帳戶管理員必須遵循 配置數據存取以進行擷取 中的步驟,以設定雲端物件儲存中數據的存取權限,讓使用者能夠使用 COPY INTO
載入數據。
範例:將數據載入無架構的 Delta Lake 數據表
注意
這項功能適用於 Databricks Runtime 11.3 LTS 和更新版本。
您可以建立空白的佔位 Delta 資料表,以便稍後在 COPY INTO
指令期間推斷結構,方法是將 mergeSchema
設定為 true
中的 COPY_OPTIONS
。
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
上述 SQL 語句是等冪性的,可以排程執行,將數據精確地匯入到 Delta 表格中。
注意
空的 Delta 資料表無法在 COPY INTO
之外使用。 不支援 INSERT INTO
和 MERGE INTO
將數據寫入無架構 Delta 表。 將數據插入具有 COPY INTO
的數據表之後,數據表就會變成可查詢。
請參閱 建立 COPY INTO
的目標數據表。
範例:設定架構並將數據載入 Delta Lake 數據表
下列範例示範如何建立 Delta 數據表,然後使用 COPY INTO
SQL 命令,將 Databricks 數據集 的範例數據載入數據表。 您可以從連結至 Azure Databricks 叢集的筆記本執行範例 Python、R、Scala 或 SQL 程式代碼。 您也可以在 Databricks SQL 中,從與 SQL 倉儲相關聯的查詢中執行 SQL 代碼。
SQL
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
Python
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
R
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
Scala
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
若要清除,請執行下列程式代碼,以刪除資料表:
Python
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
Scala
spark.sql("DROP TABLE " + table_name)
SQL
DROP TABLE default.loan_risks_upload
清除元數據檔案
您可以執行 VACUUM,以清除 Databricks Runtime 15.2 和更新版本中 COPY INTO
所建立的未參考元數據檔案。
參考資料
- Databricks Runtime 7.x 及以上版本:
COPY INTO
其他資源
- 如需常見的使用模式,包括針對相同 Delta 數據表的多個
作業範例,請參閱使用 常見數據載入模式。