Compartir a través de


Tutorial: COPY INTO con Spark SQL

Databricks recomienda usar el comando COPY INTO para la carga incremental y masiva de datos para orígenes de datos que contienen miles de archivos. Databricks recomienda usar Auto Loader para casos de uso avanzados.

En este tutorial, usará el comando COPY INTO para cargar datos del almacenamiento de objetos de la nube en una tabla del área de trabajo de Azure Databricks.

Requisitos

  1. Una suscripción de Azure, un área de trabajo de Azure Databricks en esa suscripción y un clúster en esa área de trabajo. Para crearlos, consulte Inicio rápido: Ejecución de un trabajo de Spark en el área de trabajo de Azure Databricks con Azure Portal. Si sigue este inicio rápido, no es necesario seguir las instrucciones de la sección Ejecución de un trabajo de Spark SQL.
  2. Un clúster de uso general en el área de trabajo en la que se ejecuta Databricks Runtime 11.3 LTS o superior. Para crear un clúster polivalente, consulte Referencia de configuración de proceso.
  3. Estar familiarizado con la interfaz de usuario del área de trabajo de Azure Databricks. Consulte Exploración del área de trabajo.
  4. Estar familiarizado con los cuadernos de Databricks.
  5. Una ubicación en la que pueda escribir datos. Esta demostración usa la raíz de DBFS como ejemplo, pero Databricks recomienda una ubicación de almacenamiento externa configurada con Unity Catalog.

Paso 1. Configuración del entorno y creación de un generador de datos

En este tutorial se supone que está familiarizado con Azure Databricks y una configuración predeterminada del área de trabajo. Si no puede ejecutar el código proporcionado, póngase en contacto con el administrador del área de trabajo para asegurarse de que tiene acceso a los recursos de proceso y a una ubicación en la que pueda escribir datos.

Tenga en cuenta que el código proporcionado usa un parámetro source para especificar la ubicación que configurará como origen de datos de COPY INTO. Como se ha escrito, este código apunta a una ubicación en la raíz de DBFS. Si tiene permisos de escritura en una ubicación externa de almacenamiento de objetos, reemplace la parte dbfs:/ de la cadena de origen por la ruta de acceso al almacenamiento de objetos. Dado que este bloque de código también realiza una eliminación recursiva para restablecer esta demostración, asegúrese de que esto no señala a los datos de producción y que conserva el directorio anidado /user/{username}/copy-into-demo para evitar sobrescribir o eliminar los datos existentes.

  1. Cree un nuevo cuaderno SQL y adjúntelo a un clúster que ejecute Databricks Runtime 11.3 LTS o superior.

  2. Copie y ejecute el código siguiente para restablecer la ubicación de almacenamiento y la base de datos usadas en este tutorial:

    %python
    # Set parameters for isolation in workspace and reset demo
    
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    dbutils.fs.rm(source, True)
    
  3. Copie y ejecute el código siguiente para configurar algunas tablas y funciones que se usarán para generar datos de forma aleatoria:

    -- Configure random data generator
    
    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};
    
    CREATE TABLE user_ids (user_id STRING);
    
    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");
    
    CREATE FUNCTION get_ping()
        RETURNS INT
        RETURN int(rand() * 250);
    
    CREATE FUNCTION is_active()
        RETURNS BOOLEAN
        RETURN CASE
            WHEN rand() > .25 THEN true
            ELSE false
            END;
    

Paso 2: Carga de los datos de ejemplo en el almacenamiento en la nube

Escribir en formatos de datos distintos de Delta Lake es poco frecuente en Azure Databricks. El código proporcionado aquí se escribe en JSON, simulando un sistema externo que podría volcar los resultados de otro sistema en el almacenamiento de objetos.

  1. Copie y ejecute el código siguiente para escribir un lote de datos JSON sin procesar:

    -- Write a new batch of data to the data source
    
    INSERT INTO user_ping_raw
    SELECT *,
      get_ping() ping,
      current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;
    

Paso 3: Uso de COPY INTO para cargar datos JSON idempotentemente

Debe crear antes una tabla Delta Lake de destino para poder usar COPY INTO. En Databricks Runtime 11.3 LTS y versiones superiores, no es necesario proporcionar nada más que un nombre de tabla en la instrucción CREATE TABLE. Para las versiones anteriores de Databricks Runtime, debe proporcionar un esquema al crear una tabla vacía.

  1. Copie y ejecute el código siguiente para crear la tabla Delta de destino y cargar los datos desde el origen:

    -- Create target table and load data
    
    CREATE TABLE IF NOT EXISTS user_ping_target;
    
    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")
    

Dado que esta acción es idempotente, puede ejecutarla varias veces, pero los datos solo se cargarán una vez.

Paso 4: Obtención de una vista previa del contenido de la tabla

Puede ejecutar una consulta SQL simple para revisar manualmente el contenido de esta tabla.

  1. Copie y ejecute el código siguiente para obtener una vista previa de la tabla:

    -- Review updated table
    
    SELECT * FROM user_ping_target
    

Paso 5: Carga de más datos y obtención de una vista previa de los resultados

Puede volver a ejecutar los pasos del 2 al 4 muchas veces para colocar nuevos lotes de datos JSON sin procesar aleatorios en el origen, cargarlos de forma idempotente en Delta Lake con COPY INTO y obtener una vista previa de los resultados. Intente ejecutar estos pasos desordenados o varias veces para simular que se escriben varios lotes de datos sin procesar o ejecutar COPY INTO varias veces sin que hayan llegado nuevos datos.

Paso 6: Limpieza del tutorial

Cuando haya terminado con este tutorial, puede limpiar los recursos asociados si ya no quiere conservarlos.

  1. Copie y ejecute el código siguiente para quitar la base de datos, las tablas, y eliminar todos los datos:

    %python
    # Drop database and tables and remove data
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    dbutils.fs.rm(source, True)
    
  2. Para detener el recurso de proceso, vaya a la pestaña Clústeres para Finalizar el clúster.

Recursos adicionales