Compartir a través de


Comenzar a usar COPY INTO para cargar datos.

El comando SQL COPY INTO permite cargar datos desde una ubicación de archivos en una tabla Delta. Se trata de una operación que se puede volver a intentar e idempotente; se omiten los archivos de la ubicación de origen que ya se han cargado.

COPY INTO ofrece las siguientes capacidades:

  • Filtros de archivos o directorios configurables fácilmente desde el almacenamiento en la nube, incluidos los volúmenes S3, ADLS Gen2, ABFS, GCS y Unity Catalog.
  • Compatibilidad con varios formatos de archivo de origen: CSV, JSON, XML, Avro, ORC, Parquet, texto y archivos binarios
  • Procesamiento de archivos exactamente una vez (idempotente) de manera predeterminada
  • Inferencia, asignación, combinación y evolución del esquema de tabla de destino

Nota:

Para obtener una experiencia de ingesta de archivos más escalable y sólida, Databricks recomienda que los usuarios de SQL aprovechen las tablas de streaming. Consulte Carga de datos mediante tablas de secuencia en Databricks SQL.

Advertencia

COPY INTO respeta la configuración del área de trabajo para los vectores de eliminación. Si está habilitado, los vectores de eliminación se habilitan en la tabla de destino cuando COPY INTO se ejecuta en un almacenamiento de SQL o en un proceso que ejecuta Databricks Runtime 14.0 o superior. Una vez habilitado, los vectores de eliminación bloquean las consultas en una tabla de Databricks Runtime 11.3 LTS y versiones posteriores. Consulte ¿Qué son los vectores de eliminación? y vectores de eliminación de habilitación automática.

Requisitos

Un administrador de cuenta debe seguir los pasos descritos en Configurar acceso a datos para la ingesta para configurar el acceso a los datos en el almacenamiento de objetos en la nube antes de que los usuarios puedan cargar datos mediante COPY INTO.

Ejemplo: carga de datos en una tabla de Delta Lake sin esquema

Nota:

Esta característica está disponible en Databricks Runtime 11.3 LTS y versiones posteriores.

Puede crear tablas Delta vacías de marcador de posición para que el esquema se infiera más adelante durante un comando COPY INTO estableciendo mergeSchema en true en COPY_OPTIONS:

CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

La instrucción SQL anterior es idempotente y se puede programar para que se ejecute e ingiera datos una vez exactamente en una tabla Delta.

Nota:

La tabla Delta vacía no se puede usar fuera de COPY INTO. INSERT INTO y MERGE INTO no se admiten para escribir datos en tablas Delta sin esquema. Una vez insertados los datos en la tabla con COPY INTO, la tabla se puede consultar.

Consulte Creación de tablas de destino para COPY INTO.

Ejemplo: establecimiento del esquema y carga de datos en una tabla de Delta Lake

En el ejemplo siguiente se muestra cómo crear una tabla Delta y luego usar el comando SQL COPY INTO para cargar datos de ejemplo de conjuntos de datos de Databricks en la tabla. Puede ejecutar el código de ejemplo de Python, R, Scala y SQL desde un cuaderno asociado a un clúster de Azure Databricks. También puede ejecutar el código SQL desde una consulta asociada a un almacén de SQL en Databricks SQL.

SQL

DROP TABLE IF EXISTS default.loan_risks_upload;

CREATE TABLE default.loan_risks_upload (
  loan_id BIGINT,
  funded_amnt INT,
  paid_amnt DOUBLE,
  addr_state STRING
);

COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;

SELECT * FROM default.loan_risks_upload;

-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0       | 1000        | 182.22    | CA         |
-- +---------+-------------+-----------+------------+
-- | 1       | 1000        | 361.19    | WA         |
-- +---------+-------------+-----------+------------+
-- | 2       | 1000        | 176.26    | TX         |
-- +---------+-------------+-----------+------------+
-- ...

Python

table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "loan_id BIGINT, " + \
  "funded_amnt INT, " + \
  "paid_amnt DOUBLE, " + \
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format
)

loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)

display(loan_risks_upload_data)

'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
'''

R

library(SparkR)
sparkR.session()

table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "loan_id BIGINT, ",
  "funded_amnt INT, ",
  "paid_amnt DOUBLE, ",
  "addr_state STRING)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  sep = ""
))

loan_risks_upload_data = tableToDF(table_name)

display(loan_risks_upload_data)

# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0       | 1000        | 182.22    | CA         |
# +---------+-------------+-----------+------------+
# | 1       | 1000        | 361.19    | WA         |
# +---------+-------------+-----------+------------+
# | 2       | 1000        | 176.26    | TX         |
# +---------+-------------+-----------+------------+
# ...

Scala

val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "loan_id BIGINT, " +
  "funded_amnt INT, " +
  "paid_amnt DOUBLE, " +
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format
)

val loan_risks_upload_data = spark.table(table_name)

display(loan_risks_upload_data)

/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
*/

Para limpiar, ejecute el código siguiente, que elimina la tabla:

Python

spark.sql("DROP TABLE " + table_name)

R

sql(paste("DROP TABLE ", table_name, sep = ""))

Scala

spark.sql("DROP TABLE " + table_name)

SQL

DROP TABLE default.loan_risks_upload

Limpieza de archivos de metadatos

Puede ejecutar VACUUM para limpiar archivos de metadatos sin referencia creados por COPY INTO en Databricks Runtime 15.2 y versiones posteriores.

Referencia

  • Databricks Runtime 7.x y versiones superiores: COPY INTO

Recursos adicionales