教程:将 COPY INTO 与 Spark SQL 配合使用

Databricks 建议对包含数千个文件的数据源使用 COPY INTO 命令进行增量和批量数据加载。 Databricks 建议对高级用例使用 自动加载程序

本教程使用 COPY INTO 命令将数据从云对象存储加载到 Azure Databricks 工作区中的 table。

要求

  1. Azure 订阅、该订阅中的 Azure Databricks 工作区以及该工作区中的群集。 若要创建这些项,请参阅 快速入门:使用 Azure 门户在 Azure Databricks 工作区上运行 Spark 作业。 如果遵循本快速入门,则无需按照 运行 Spark SQL 作业 部分中的说明进行操作。
  2. 工作区中运行 Databricks Runtime 11.3 LTS 或更高版本的全用途群集。 若要创建通用群集,请参阅 计算配置参考
  3. 熟悉 Azure Databricks 工作区用户界面。 请参阅浏览工作区
  4. 熟悉如何使用 Databricks 笔记本
  5. 可将数据写入到的位置;此演示使用 DBFS 根目录作为示例,但 Databricks 建议使用 Unity Catalog配置的外部存储位置。

步骤 1. 配置环境并创建数据生成器

本教程假定基本熟悉 Azure Databricks 和默认工作区配置。 如果无法运行提供的代码,请联系工作区管理员,以确保你有权访问计算资源和可以写入数据的位置。

请注意,提供的代码使用 source 参数来指定要配置为 COPY INTO 数据源的位置。 如本文所述,此代码指向 DBFS 根目录中的位置。 如果对外部对象存储位置具有写入权限,请将源字符串的 dbfs:/ 部分替换为对象存储的路径。 由于此代码块还会执行递归删除来重置此演示,因此请确保不要将此数据指向生产数据,并且保留 /user/{username}/copy-into-demo 嵌套目录以避免覆盖或删除现有数据。

  1. 创建新的 SQL 笔记本将其附加到运行 Databricks Runtime 11.3 LTS 或更高版本的群集

  2. 复制并运行以下代码以重置本教程中使用的存储位置和数据库:

    %python
    # Set parameters for isolation in workspace and reset demo
    
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    dbutils.fs.rm(source, True)
    
  3. 复制并运行以下代码以配置一些用于随机生成数据的表和函数:

    -- Configure random data generator
    
    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};
    
    CREATE TABLE user_ids (user_id STRING);
    
    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");
    
    CREATE FUNCTION get_ping()
        RETURNS INT
        RETURN int(rand() * 250);
    
    CREATE FUNCTION is_active()
        RETURNS BOOLEAN
        RETURN CASE
            WHEN rand() > .25 THEN true
            ELSE false
            END;
    

步骤 2:将示例数据写入云存储

Azure Databricks 上很少写入 Delta Lake 以外的数据格式。 此处提供的代码将写入 JSON,模拟可能将另一个系统的结果转储到对象存储的外部系统。

  1. 复制并运行以下代码以编写一批原始 JSON 数据:

    -- Write a new batch of data to the data source
    
    INSERT INTO user_ping_raw
    SELECT *,
      get_ping() ping,
      current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;
    

步骤 3:使用 COPY INTO 以幂等形式加载 JSON 数据

必须先创建目标 Delta Lake table,然后才能使用 COPY INTO。 在 Databricks Runtime 11.3 LTS 及更高版本中,无需在 CREATE TABLE 语句中提供 table 名称以外的任何内容。 对于以前版本的 Databricks Runtime,必须在创建空表时提供架构。

  1. 复制并运行以下代码以创建目标 Delta table 并从源加载数据:

    -- Create target table and load data
    
    CREATE TABLE IF NOT EXISTS user_ping_target;
    
    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")
    

由于此操作是幂等的,因此可以多次运行该操作,但数据只会加载一次。

步骤 4:预览表的内容

可以运行简单的 SQL 查询来手动查看此 table的内容。

  1. 复制并执行以下代码以预览 table:

    -- Review updated table
    
    SELECT * FROM user_ping_target
    

步骤 5:加载更多数据和预览结果

可以多次重新运行步骤 2-4,以将新批随机原始 JSON 数据加载到源中,并用 COPY INTO 以幂等方式将其加载到 Delta Lake,并预览结果。 尝试不按顺序或多次运行这些步骤,以模拟将多个批次的原始数据写入系统,或在没有新数据 having 到达的情况下多次执行 COPY INTO

步骤 6:整理教程

完成本教程后,如果不再需要保留关联资源,则可以清理这些资源。

  1. 复制并运行以下代码来删除数据库、tables和 remove 所有数据:

    %python
    # Drop database and tables and remove data
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    dbutils.fs.rm(source, True)
    
  2. 若要停止计算资源,请转到 群集 选项卡,终止 群集。

其他资源