教程:将 COPY INTO 与 Spark SQL 配合使用

Databricks 建议对包含数千个文件的数据源使用 COPY INTO 命令进行增量和批量数据加载。 Databricks 建议在高级用例中使用自动加载程序

在本教程中,你将使用 COPY INTO 命令将数据从云对象存储加载到 Azure Databricks 工作区的表中。

要求

  1. 一个 Azure 订阅、该订阅中的一个 Azure Databricks 工作区以及该工作区中的一个群集。 要创建这些内容,请参阅快速入门:使用 Azure 门户在 Azure Databricks 工作区上运行 Spark 作业。 如果按照此快速入门中的说明操作,则无需按照“运行 Spark SQL 作业”部分中的说明操作。
  2. 工作区中运行 Databricks Runtime 11.3 LTS 或更高版本的全用途群集。 要创建通用群集,请参阅计算配置参考
  3. 熟悉 Azure Databricks 工作区用户界面。 请参阅浏览工作区
  4. 熟悉如何使用 Databricks 笔记本
  5. 可将数据写入到此位置;此演示使用 DBFS 根作为示例,但 Databricks 建议使用 Unity 目录配置的外部存储位置。

步骤 1。 配置环境并创建数据生成器

本教程假定基本熟悉 Azure Databricks 和默认工作区配置。 如果无法运行提供的代码,请联系工作区管理员,确保有权访问计算资源和可以写入数据的位置。

请注意,提供的代码使用参数 source 来指定将配置为 COPY INTO 数据源的位置。 编写后,此代码指向 DBFS 根目录中的位置。 如果对外部对象存储位置具有写入权限,请将源字符串 dbfs:/ 部分替换为对象存储的路径。 由于此代码块还会执行递归删除来重置此演示,因此请确保不要将此数据指向生产数据,并且保留 /user/{username}/copy-into-demo 嵌套目录以避免覆盖或删除现有数据。

  1. 创建一个新的 SQL 笔记本将其附加到运行 Databricks Runtime 11.3 LTS 或更高版本的群集

  2. 复制并运行以下代码以重置本教程中使用的存储位置和数据库:

    %python
    # Set parameters for isolation in workspace and reset demo
    
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    dbutils.fs.rm(source, True)
    
  3. 复制并运行以下代码以配置一些用于随机生成数据的表和函数:

    -- Configure random data generator
    
    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};
    
    CREATE TABLE user_ids (user_id STRING);
    
    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");
    
    CREATE FUNCTION get_ping()
        RETURNS INT
        RETURN int(rand() * 250);
    
    CREATE FUNCTION is_active()
        RETURNS BOOLEAN
        RETURN CASE
            WHEN rand() > .25 THEN true
            ELSE false
            END;
    

步骤 2:将示例数据写入云存储

Azure Databricks 上很少写入 Delta Lake 以外的数据格式。 此处提供的代码将写入 JSON,模拟外部系统,该系统可能会将另一个系统的结果转储到对象存储中。

  1. 复制并运行以下代码以编写一批原始 JSON 数据:

    -- Write a new batch of data to the data source
    
    INSERT INTO user_ping_raw
    SELECT *,
      get_ping() ping,
      current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;
    

步骤 3:使用 COPY INTO 以幂等形式加载 JSON 数据

必须先创建目标 Delta Lake 表,然后才能使用 COPY INTO。 在 Databricks Runtime 11.3 LTS 及更高版本中,无需在 CREATE TABLE 语句中提供表名以外的任何内容。 对于以前版本的 Databricks Runtime,必须在创建空表时提供架构。

  1. 复制并运行以下代码以创建目标 Delta 表并从源加载数据:

    -- Create target table and load data
    
    CREATE TABLE IF NOT EXISTS user_ping_target;
    
    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")
    

由于此操作是幂等的,因此可以多次运行该操作,但数据只会加载一次。

步骤 4:预览表的内容

可以运行简单的 SQL 查询来手动查看此表的内容。

  1. 复制并执行以下代码以预览表:

    -- Review updated table
    
    SELECT * FROM user_ping_target
    

步骤 5:加载更多数据和预览结果

可以多次重新运行步骤 2-4,以将新批随机原始 JSON 数据加载到源中,并用 COPY INTO 以幂等方式将其加载到 Delta Lake,并预览结果。 尝试无序或多次运行这些步骤,以模拟正在写入多批原始数据或在没有新数据到达的情况下多次执行 COPY INTO

步骤 6:清理教程

完成本教程后,如果不再需要保留相关资源,可以在工作区中清理这些资源。

  1. 复制并运行以下代码以删除数据库、表和删除所有数据:

    %python
    # Drop database and tables and remove data
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    dbutils.fs.rm(source, True)
    
  2. 若要停止计算资源,请转到“群集”选项卡并终止群集。

其他资源