开始使用 COPY INTO 加载数据
通过 COPY INTO
SQL 命令,你可以将文件位置中的数据加载到 Delta 表中。 这是可重试的幂等操作;会跳过已经加载的源位置中的文件。
COPY INTO
提供以下功能:
- 云存储中可轻松配置的文件或目录筛选器,包括 S3、ADLS Gen2、ABFS、GCS 和 Unity Catalog 卷。
- 支持多种源文件格式:CSV、JSON、XML、Avro、ORC、Parquet、文本和二进制文件
- 默认只进行一次(幂等)文件处理
- 目标表架构推理、映射、合并和演变
注意
为了获得缩放性更高且更强大的文件引入体验,Databricks 建议 SQL 用户利用流式处理表。 请参阅 在 Databricks SQL 中使用流式处理表加载数据。
警告
COPY INTO
遵循删除向量的工作区设置。 如果启用,则当 COPY INTO
在 SQL 仓库或运行 Databricks Runtime 14.0 或更高版本的计算上运行时,将在目标表上启用删除向量。 启用后,删除向量会阻止对 Databricks Runtime 11.3 LTS 及更低版本中的表的查询。 请参阅什么是删除向量?以及自动启用删除向量。
要求
帐户管理员必须按照配置数据访问以进行引入中的步骤来配置对云对象存储中的数据的访问权限,然后用户才能使用 COPY INTO
加载数据。
示例:将数据加载到无架构 Delta Lake 表中
注意
此功能在 Databricks Runtime 11.3 LTS 及更高版本中可用。
可通过在 COPY_OPTIONS
中将 mergeSchema
设置为 true
来创建空占位符 Delta 表,以便稍后在 COPY INTO
命令期间推断架构:
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
上面的 SQL 语句是幂等的,可以计划运行以将数据一次性完全引入 Delta 表中。
注意
空 Delta 表在超出 COPY INTO
的范围不可用。 INSERT INTO
和 MERGE INTO
不支持将数据写入无架构 Delta 表。 使用 COPY INTO
将数据插入到表中后,该表将变为可查询。
示例:设置架构并将数据加载到 Delta Lake 表中
以下示例演示如何创建 Delta 表并使用 COPY INTO
SQL 命令将示例数据从 Databricks 数据集加载到该表中。 可以从附加到 Azure Databricks 群集的笔记本中运行示例 Python、R、Scala 或 SQL 代码。 还可以从与 Databricks SQL 中的 SQL 仓库关联的查询中运行 SQL 代码。
SQL
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
Python
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
R
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
Scala
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
若要清理,请运行以下代码来删除该表:
Python
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
Scala
spark.sql("DROP TABLE " + table_name)
SQL
DROP TABLE default.loan_risks_upload
清除元数据文件
可以运行 VACUUM 来清理 Databricks Runtime 15.2 及更高版本中由 COPY INTO
创建的未引用的元数据文件。
参考
- Databricks Runtime 7.x 及更高版本:COPY INTO
其他资源
有关常见使用模式(包括针对同一 Delta 表执行多个
COPY INTO
操作的示例),请参阅 使用 COPY INTO 的常见数据加载模式。