Поделиться через


Руководство по COPY INTO с помощью Spark SQL

Databricks рекомендует использовать команду COPY INTO для добавочной и массовой загрузки данных для источников данных, содержащих тысячи файлов. Databricks рекомендует использовать автозагрузчик для расширенных вариантов использования.

В этом руководстве вы используете команду COPY INTO для загрузки данных из облачного хранилища объектов в таблицу в рабочей области Azure Databricks.

Требования

  1. Подписка Azure, рабочая область Azure Databricks в этой подписке и кластер в этой рабочей области. Чтобы создать их, обратитесь к . Краткое руководство: Запуск задания Spark в рабочей области Azure Databricks с использованием портала Azure. Если следовать этому краткому руководству, вам не нужно следовать инструкциям в разделе Запуск задания SQL Spark.
  2. Универсальный кластер , в вашей рабочей области, работающий под управлением Databricks Runtime 11.3 LTS или более поздней версии. Сведения о создании кластера всех целей см. в справочнике по конфигурации вычислений.
  3. Знакомство с пользовательским интерфейсом рабочей области Azure Databricks. См. Навигация по рабочей области.
  4. Знакомство с записными книжками Databricks.
  5. Расположение, в который можно записать данные; Эта демонстрация использует корневой каталог DBFS в качестве примера, но Databricks рекомендует внешнее расположение хранилища, настроенное в каталоге Unity.

Шаг 1. Настройка среды и создание генератора данных

В этом руководстве предполагается базовое знакомство с Azure Databricks и конфигурацией рабочей области по умолчанию. Если не удается запустить предоставленный код, обратитесь к администратору рабочей области, чтобы убедиться, что у вас есть доступ к вычислительным ресурсам и расположению, в которое можно записать данные.

Обратите внимание, что указанный код использует параметр source для указания расположения, которое будет настроено в качестве источника данных COPY INTO. Как записано, этот код указывает на расположение в корневом каталоге DBFS. Если у вас есть разрешение на запись в расположение внешнего хранилища объектов, замените часть исходной строки с указанием части dbfs:/ путем указания пути к вашему хранилищу объектов. Так как этот блок кода также выполняет рекурсивное удаление для сброса этой демонстрации, убедитесь, что вы не используете его для данных в рабочей среде и сохраняете вложенный каталог /user/{username}/copy-into-demo, чтобы не перезаписать или удалить существующие данные.

  1. Создайте новую записную книжку SQL и подключите ее к кластеру под управлением Databricks Runtime 11.3 LTS или более поздней версии.

  2. Скопируйте и запустите следующий код, чтобы сбросить расположение хранилища и используемую в этом руководстве базу данных.

    %python
    # Set parameters for isolation in workspace and reset demo
    
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    dbutils.fs.rm(source, True)
    
  3. Скопируйте и запустите следующий код, чтобы настроить некоторые таблицы и функции, которые будут использоваться для случайного создания данных:

    -- Configure random data generator
    
    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};
    
    CREATE TABLE user_ids (user_id STRING);
    
    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");
    
    CREATE FUNCTION get_ping()
        RETURNS INT
        RETURN int(rand() * 250);
    
    CREATE FUNCTION is_active()
        RETURNS BOOLEAN
        RETURN CASE
            WHEN rand() > .25 THEN true
            ELSE false
            END;
    

Шаг 2. Запись примеров данных в облачное хранилище

Запись в форматы данных, отличные от Delta Lake, редко используется в Azure Databricks. Приведенный здесь код записывается в JSON, имитируя внешнюю систему, которая может дампать результаты из другой системы в хранилище объектов.

  1. Скопируйте и запустите следующий код для записи пакета необработанных данных JSON:

    -- Write a new batch of data to the data source
    
    INSERT INTO user_ping_raw
    SELECT *,
      get_ping() ping,
      current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;
    

Шаг 3. Загрузка идемпотентных данных JSON с помощью COPY INTO

Прежде чем использовать COPY INTO, необходимо создать целевую таблицу Delta Lake. В Databricks Runtime 11.3 LTS и более поздних версиях вам не нужно предоставлять ничего, кроме имени таблицы в инструкции CREATE TABLE. Для предыдущих версий Databricks Runtime необходимо указать схему при создании пустой таблицы.

  1. Скопируйте и запустите следующий код, чтобы создать целевую таблицу Delta и загрузить данные из источника:

    -- Create target table and load data
    
    CREATE TABLE IF NOT EXISTS user_ping_target;
    
    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")
    

Так как это действие является идемпотентным, его можно запустить несколько раз, но данные будут загружаться только один раз.

Шаг 4. Просмотр содержимого таблицы

Вы можете запустить простой SQL-запрос, чтобы вручную просмотреть содержимое этой таблицы.

  1. Скопируйте и выполните следующий код, чтобы просмотреть таблицу:

    -- Review updated table
    
    SELECT * FROM user_ping_target
    

Шаг 5. Загрузка дополнительных данных и результатов предварительной версии

Вы можете многократно повторно выполнять шаги 2-4, чтобы получить новые пакеты случайных необработанных данных JSON в вашем источнике, идемпотентно загрузить их в Delta Lake с помощью COPY INTOи просмотреть результаты. Попробуйте выполнить эти шаги по порядку или несколько раз, чтобы имитировать несколько пакетов необработанных данных, записываемых или выполняющихся COPY INTO несколько раз без получения новых данных.

Шаг 6. Руководство по очистке

Когда вы закончите работу с этим руководством, вы можете очистить связанные ресурсы, если вы больше не хотите их сохранить.

  1. Скопируйте и запустите следующий код, чтобы удалить базу данных, таблицы и удалить все данные:

    %python
    # Drop database and tables and remove data
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    dbutils.fs.rm(source, True)
    
  2. Чтобы остановить вычислительный ресурс, перейдите на вкладку Кластеры и завершите кластер.

Дополнительные ресурсы

  • Справочная статья COPY INTO