Introdução ao uso de COPY INTO para carregar dados
O comando SQL COPY INTO
permite carregar dados de um local de arquivo para uma tabela do Delta. Esta é uma operação repetível e idempotente. Os arquivos no local de origem que já foram carregados são ignorados.
COPY INTO
oferece as seguintes funcionalidades:
- Filtros de arquivo ou diretório facilmente configuráveis do armazenamento em nuvem, incluindo volumes S3, ADLS Gen2, ABFS, GCS e Catálogo do Unity.
- Suporte para vários formatos de arquivo de origem: CSV, JSON, XML, Avro, ORC, Parquet, texto e arquivos binários
- Processamento de arquivos exatamente uma vez (idempotente) por padrão
- Inferência do esquema da tabela de destino, mapeamento, mesclagem e evolução
Observação
Para obter uma experiência de ingestão de arquivos mais escalonável e robusta, a Databricks recomenda que os usuários de SQL aproveitem as tabelas de streaming. Confira Carregar dados usando tabelas de streaming no Databricks SQL.
Aviso
COPY INTO
respeita a configuração do espaço de trabalho para vetores de exclusão. Se habilitado, os vetores de exclusão serão habilitados na tabela de destino quando COPY INTO
for executado em uma computação ou SQL warehouse que esteja executando o Databricks Runtime 14.0 ou superior. Uma vez habilitados, os vetores de exclusão bloqueiam as consultas em uma tabela no Databricks Runtime 11.3 LTS e versões anteriores. Consulte O que são vetores de exclusão? e Habilitar automaticamente vetores de exclusão.
Requisitos
Um administrador de conta deve seguir as etapas em Configurar o acesso a dados para ingestão para configurar o acesso aos dados no armazenamento de objetos de nuvem antes que os usuários possam carregar dados usando COPY INTO
.
Exemplo: carregar dados em uma tabela do Delta Lake sem esquema
Observação
Este recurso está disponível no Databricks Runtime 11.3 LTS e versões superiores.
Você pode criar tabelas delta de espaço reservado vazias para que o esquema seja posteriormente inferido durante um comando COPY INTO
definindo mergeSchema
como true
em COPY_OPTIONS
:
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
A instrução de SQL acima é idempotente e pode ser agendada para ser executada para ingerir dados exatamente uma vez em uma tabela Delta.
Observação
A tabela Delta vazia não é utilizável fora de COPY INTO
. INSERT INTO
e MERGE INTO
não há suporte para gravar dados em tabelas Delta sem esquema. Depois que os dados são inseridos na tabela com COPY INTO
, a tabela se torna consultável.
Confira Criar tabelas de destino para o COPY INTO.
Exemplo: definir o esquema e carregar dados em uma tabela do Delta Lake
O exemplo a seguir mostra como criar uma tabela Delta e usar o comando SQL COPY INTO
para carregar dados de exemplo dos conjuntos de dados do Databricks nela. Execute o código Python, R, Scala ou SQL de exemplo em um notebook anexado a um cluster do Azure Databricks. Também é possível executar o código SQL em uma consulta associada a um SQL warehouse no SQL do Databricks.
SQL
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
Python
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
R
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
Scala
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
Para limpar, execute o seguinte código, que exclui a tabela:
Python
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
Scala
spark.sql("DROP TABLE " + table_name)
SQL
DROP TABLE default.loan_risks_upload
Limpar arquivos de metadados
Você pode executar o VACUUM para limpar arquivos de metadados não referenciados criados por COPY INTO
no Databricks Runtime 15.2 e superior.
Referência
- Databricks Runtime 7.x e posteriores: COPY INTO
Recursos adicionais
Carregar dados usando COPY INTO com volumes ou locais externos do Catálogo do Unity
Para padrões de uso comuns, incluindo exemplos de várias
COPY INTO
operações na mesma tabela Delta, confira Padrões comuns de carregamento de dados usando COPY INTO.