Tutorial: COPY INTO con Spark SQL
Databricks recomienda usar el comando COPY INTO para la carga incremental y masiva de datos para orígenes de datos que contienen miles de archivos. Databricks recomienda usar Auto Loader para casos de uso avanzados.
En este tutorial, usará el comando COPY INTO
para cargar datos del almacenamiento de objetos de la nube en una tabla del área de trabajo de Azure Databricks.
Requisitos
- Una suscripción de Azure, un área de trabajo de Azure Databricks en esa suscripción y un clúster en esa área de trabajo. Para crearlos, consulte Inicio rápido: Ejecución de un trabajo de Spark en el área de trabajo de Azure Databricks con Azure Portal. Si sigue este inicio rápido, no es necesario seguir las instrucciones de la sección Ejecución de un trabajo de Spark SQL.
- Un clúster de uso general en el área de trabajo en la que se ejecuta Databricks Runtime 11.3 LTS o superior. Para crear un clúster polivalente, consulte Referencia de configuración de proceso.
- Estar familiarizado con la interfaz de usuario del área de trabajo de Azure Databricks. Consulte Exploración del área de trabajo.
- Estar familiarizado con los cuadernos de Databricks.
- Una ubicación en la que pueda escribir datos. Esta demostración usa la raíz de DBFS como ejemplo, pero Databricks recomienda una ubicación de almacenamiento externa configurada con Unity Catalog.
Paso 1. Configuración del entorno y creación de un generador de datos
En este tutorial se supone que está familiarizado con Azure Databricks y una configuración predeterminada del área de trabajo. Si no puede ejecutar el código proporcionado, póngase en contacto con el administrador del área de trabajo para asegurarse de que tiene acceso a los recursos de proceso y a una ubicación en la que pueda escribir datos.
Tenga en cuenta que el código proporcionado usa un parámetro source
para especificar la ubicación que configurará como origen de datos de COPY INTO
. Como se ha escrito, este código apunta a una ubicación en la raíz de DBFS. Si tiene permisos de escritura en una ubicación externa de almacenamiento de objetos, reemplace la parte dbfs:/
de la cadena de origen por la ruta de acceso al almacenamiento de objetos. Dado que este bloque de código también realiza una eliminación recursiva para restablecer esta demostración, asegúrese de que esto no señala a los datos de producción y que conserva el directorio anidado /user/{username}/copy-into-demo
para evitar sobrescribir o eliminar los datos existentes.
Cree un nuevo cuaderno SQL y adjúntelo a un clúster que ejecute Databricks Runtime 11.3 LTS o superior.
Copie y ejecute el código siguiente para restablecer la ubicación de almacenamiento y la base de datos usadas en este tutorial:
%python # Set parameters for isolation in workspace and reset demo username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0] database = f"copyinto_{username}_db" source = f"dbfs:/user/{username}/copy-into-demo" spark.sql(f"SET c.username='{username}'") spark.sql(f"SET c.database={database}") spark.sql(f"SET c.source='{source}'") spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") spark.sql("CREATE DATABASE ${c.database}") spark.sql("USE ${c.database}") dbutils.fs.rm(source, True)
Copie y ejecute el código siguiente para configurar algunas tablas y funciones que se usarán para generar datos de forma aleatoria:
-- Configure random data generator CREATE TABLE user_ping_raw (user_id STRING, ping INTEGER, time TIMESTAMP) USING json LOCATION ${c.source}; CREATE TABLE user_ids (user_id STRING); INSERT INTO user_ids VALUES ("potato_luver"), ("beanbag_lyfe"), ("default_username"), ("the_king"), ("n00b"), ("frodo"), ("data_the_kid"), ("el_matador"), ("the_wiz"); CREATE FUNCTION get_ping() RETURNS INT RETURN int(rand() * 250); CREATE FUNCTION is_active() RETURNS BOOLEAN RETURN CASE WHEN rand() > .25 THEN true ELSE false END;
Paso 2: Carga de los datos de ejemplo en el almacenamiento en la nube
Escribir en formatos de datos distintos de Delta Lake es poco frecuente en Azure Databricks. El código proporcionado aquí se escribe en JSON, simulando un sistema externo que podría volcar los resultados de otro sistema en el almacenamiento de objetos.
Copie y ejecute el código siguiente para escribir un lote de datos JSON sin procesar:
-- Write a new batch of data to the data source INSERT INTO user_ping_raw SELECT *, get_ping() ping, current_timestamp() time FROM user_ids WHERE is_active()=true;
Paso 3: Uso de COPY INTO para cargar datos JSON idempotentemente
Debe crear antes una tabla Delta Lake de destino para poder usar COPY INTO
. En Databricks Runtime 11.3 LTS y versiones superiores, no es necesario proporcionar nada más que un nombre de tabla en la instrucción CREATE TABLE
. Para las versiones anteriores de Databricks Runtime, debe proporcionar un esquema al crear una tabla vacía.
Copie y ejecute el código siguiente para crear la tabla Delta de destino y cargar los datos desde el origen:
-- Create target table and load data CREATE TABLE IF NOT EXISTS user_ping_target; COPY INTO user_ping_target FROM ${c.source} FILEFORMAT = JSON FORMAT_OPTIONS ("mergeSchema" = "true") COPY_OPTIONS ("mergeSchema" = "true")
Dado que esta acción es idempotente, puede ejecutarla varias veces, pero los datos solo se cargarán una vez.
Paso 4: Obtención de una vista previa del contenido de la tabla
Puede ejecutar una consulta SQL simple para revisar manualmente el contenido de esta tabla.
Copie y ejecute el código siguiente para obtener una vista previa de la tabla:
-- Review updated table SELECT * FROM user_ping_target
Paso 5: Carga de más datos y obtención de una vista previa de los resultados
Puede volver a ejecutar los pasos del 2 al 4 muchas veces para colocar nuevos lotes de datos JSON sin procesar aleatorios en el origen, cargarlos de forma idempotente en Delta Lake con COPY INTO
y obtener una vista previa de los resultados. Intente ejecutar estos pasos desordenados o varias veces para simular que se escriben varios lotes de datos sin procesar o ejecutar COPY INTO
varias veces sin que hayan llegado nuevos datos.
Paso 6: Limpieza del tutorial
Cuando haya terminado con este tutorial, puede limpiar los recursos asociados si ya no quiere conservarlos.
Copie y ejecute el código siguiente para quitar la base de datos, las tablas, y eliminar todos los datos:
%python # Drop database and tables and remove data spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") dbutils.fs.rm(source, True)
Para detener el recurso de proceso, vaya a la pestaña Clústeres para Finalizar el clúster.
Recursos adicionales
- Artículo de referencia de COPY INTO