Jaa


Tutorial: COPY INTO with Spark SQL

Databricks recommends that you use the COPY INTO command for incremental and bulk data loading for data sources that contain thousands of files. Databricks recommends that you use Auto Loader for advanced use cases.

In this tutorial, you use the COPY INTO command to load data from cloud object storage into a table in your Azure Databricks workspace.

Requirements

  1. An Azure subscription, an Azure Databricks workspace in that subscription, and a cluster in that workspace. To create these, see Quickstart: Run a Spark job on Azure Databricks Workspace using the Azure portal. If you follow this quickstart, you do not need to follow the instructions in the Run a Spark SQL job section.
  2. An all-purpose cluster in your workspace running Databricks Runtime 11.3 LTS or above. To create an all-purpose cluster, see Compute configuration reference.
  3. Familiarity with the Azure Databricks workspace user interface. See Navigate the workspace.
  4. Familiarity working with Databricks notebooks.
  5. A location you can write data to; this demo uses the DBFS root as an example, but Databricks recommends an external storage location configured with Unity Catalog.

Step 1. Configure your environment and create a data generator

This tutorial assumes basic familiarity with Azure Databricks and a default workspace configuration. If you are unable to run the code provided, contact your workspace administrator to make sure you have access to compute resources and a location to which you can write data.

Note that the provided code uses a source parameter to specify the location you’ll configure as your COPY INTO data source. As written, this code points to a location on DBFS root. If you have write permissions on an external object storage location, replace the dbfs:/ portion of the source string with the path to your object storage. Because this code block also does a recursive delete to reset this demo, make sure that you don’t point this at production data and that you keep the /user/{username}/copy-into-demo nested directory to avoid overwriting or deleting existing data.

  1. Create a new SQL notebook and attach it to a cluster running Databricks Runtime 11.3 LTS or above.

  2. Copy and run the following code to reset the storage location and database used in this tutorial:

    %python
    # Set parameters for isolation in workspace and reset demo
    
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    dbutils.fs.rm(source, True)
    
  3. Copy and run the following code to configure some tables and functions that will be used to randomly generate data:

    -- Configure random data generator
    
    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};
    
    CREATE TABLE user_ids (user_id STRING);
    
    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");
    
    CREATE FUNCTION get_ping()
        RETURNS INT
        RETURN int(rand() * 250);
    
    CREATE FUNCTION is_active()
        RETURNS BOOLEAN
        RETURN CASE
            WHEN rand() > .25 THEN true
            ELSE false
            END;
    

Step 2: Write the sample data to cloud storage

Writing to data formats other than Delta Lake is rare on Azure Databricks. The code provided here writes to JSON, simulating an external system that might dump results from another system into object storage.

  1. Copy and run the following code to write a batch of raw JSON data:

    -- Write a new batch of data to the data source
    
    INSERT INTO user_ping_raw
    SELECT *,
      get_ping() ping,
      current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;
    

Step 3: Use COPY INTO to load JSON data idempotently

You must create a target Delta Lake table before you can use COPY INTO. In Databricks Runtime 11.3 LTS and above, you do not need to provide anything other than a table name in your CREATE TABLE statement. For previous versions of Databricks Runtime, you must provide a schema when creating an empty table.

  1. Copy and run the following code to create your target Delta table and load data from your source:

    -- Create target table and load data
    
    CREATE TABLE IF NOT EXISTS user_ping_target;
    
    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")
    

Because this action is idempotent, you can run it multiple times but data will only be loaded once.

Step 4: Preview the contents of your table

You can run a simple SQL query to manually review the contents of this table.

  1. Copy and execute the following code to preview your table:

    -- Review updated table
    
    SELECT * FROM user_ping_target
    

Step 5: Load more data and preview results

You can re-run steps 2-4 many times to land new batches of random raw JSON data in your source, idempotently load them to Delta Lake with COPY INTO, and preview the results. Try running these steps out of order or multiple times to simulate multiple batches of raw data being written or executing COPY INTO multiple times without new data having arrived.

Step 6: Clean up tutorial

When you are done with this tutorial, you can clean up the associated resources if you no longer want to keep them.

  1. Copy and run the following code to drop the database, tables, and remove all data:

    %python
    # Drop database and tables and remove data
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    dbutils.fs.rm(source, True)
    
  2. To stop your compute resource, go to the Clusters tab and Terminate your cluster.

Additional resources