Руководство по COPY INTO с помощью Spark SQL
Databricks рекомендует использовать команду COPY INTO для добавочной и массовой загрузки данных для источников данных, содержащих тысячи файлов. Databricks рекомендует использовать автозагрузчик для расширенных вариантов использования.
В этом руководстве вы используете команду COPY INTO
для загрузки данных из облачного хранилища объектов в таблицу в рабочей области Azure Databricks.
Требования
- Подписка Azure, рабочая область Azure Databricks в этой подписке и кластер в этой рабочей области. Чтобы создать их, обратитесь к . Краткое руководство: Запуск задания Spark в рабочей области Azure Databricks с использованием портала Azure. Если следовать этому краткому руководству, вам не нужно следовать инструкциям в разделе Запуск задания SQL Spark.
- Универсальный кластер , в вашей рабочей области, работающий под управлением Databricks Runtime 11.3 LTS или более поздней версии. Сведения о создании кластера всех целей см. в справочнике по конфигурации вычислений.
- Знакомство с пользовательским интерфейсом рабочей области Azure Databricks. См. Навигация по рабочей области.
- Знакомство с записными книжками Databricks.
- Расположение, в который можно записать данные; Эта демонстрация использует корневой каталог DBFS в качестве примера, но Databricks рекомендует внешнее расположение хранилища, настроенное в каталоге Unity.
Шаг 1. Настройка среды и создание генератора данных
В этом руководстве предполагается базовое знакомство с Azure Databricks и конфигурацией рабочей области по умолчанию. Если не удается запустить предоставленный код, обратитесь к администратору рабочей области, чтобы убедиться, что у вас есть доступ к вычислительным ресурсам и расположению, в которое можно записать данные.
Обратите внимание, что указанный код использует параметр source
для указания расположения, которое будет настроено в качестве источника данных COPY INTO
. Как записано, этот код указывает на расположение в корневом каталоге DBFS. Если у вас есть разрешение на запись в расположение внешнего хранилища объектов, замените часть исходной строки с указанием части dbfs:/
путем указания пути к вашему хранилищу объектов. Так как этот блок кода также выполняет рекурсивное удаление для сброса этой демонстрации, убедитесь, что вы не используете его для данных в рабочей среде и сохраняете вложенный каталог /user/{username}/copy-into-demo
, чтобы не перезаписать или удалить существующие данные.
Создайте новую записную книжку SQL и подключите ее к кластеру под управлением Databricks Runtime 11.3 LTS или более поздней версии.
Скопируйте и запустите следующий код, чтобы сбросить расположение хранилища и используемую в этом руководстве базу данных.
%python # Set parameters for isolation in workspace and reset demo username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0] database = f"copyinto_{username}_db" source = f"dbfs:/user/{username}/copy-into-demo" spark.sql(f"SET c.username='{username}'") spark.sql(f"SET c.database={database}") spark.sql(f"SET c.source='{source}'") spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") spark.sql("CREATE DATABASE ${c.database}") spark.sql("USE ${c.database}") dbutils.fs.rm(source, True)
Скопируйте и запустите следующий код, чтобы настроить некоторые таблицы и функции, которые будут использоваться для случайного создания данных:
-- Configure random data generator CREATE TABLE user_ping_raw (user_id STRING, ping INTEGER, time TIMESTAMP) USING json LOCATION ${c.source}; CREATE TABLE user_ids (user_id STRING); INSERT INTO user_ids VALUES ("potato_luver"), ("beanbag_lyfe"), ("default_username"), ("the_king"), ("n00b"), ("frodo"), ("data_the_kid"), ("el_matador"), ("the_wiz"); CREATE FUNCTION get_ping() RETURNS INT RETURN int(rand() * 250); CREATE FUNCTION is_active() RETURNS BOOLEAN RETURN CASE WHEN rand() > .25 THEN true ELSE false END;
Шаг 2. Запись примеров данных в облачное хранилище
Запись в форматы данных, отличные от Delta Lake, редко используется в Azure Databricks. Приведенный здесь код записывается в JSON, имитируя внешнюю систему, которая может дампать результаты из другой системы в хранилище объектов.
Скопируйте и запустите следующий код для записи пакета необработанных данных JSON:
-- Write a new batch of data to the data source INSERT INTO user_ping_raw SELECT *, get_ping() ping, current_timestamp() time FROM user_ids WHERE is_active()=true;
Шаг 3. Загрузка идемпотентных данных JSON с помощью COPY INTO
Прежде чем использовать COPY INTO
, необходимо создать целевую таблицу Delta Lake. В Databricks Runtime 11.3 LTS и более поздних версиях вам не нужно предоставлять ничего, кроме имени таблицы в инструкции CREATE TABLE
. Для предыдущих версий Databricks Runtime необходимо указать схему при создании пустой таблицы.
Скопируйте и запустите следующий код, чтобы создать целевую таблицу Delta и загрузить данные из источника:
-- Create target table and load data CREATE TABLE IF NOT EXISTS user_ping_target; COPY INTO user_ping_target FROM ${c.source} FILEFORMAT = JSON FORMAT_OPTIONS ("mergeSchema" = "true") COPY_OPTIONS ("mergeSchema" = "true")
Так как это действие является идемпотентным, его можно запустить несколько раз, но данные будут загружаться только один раз.
Шаг 4. Просмотр содержимого таблицы
Вы можете запустить простой SQL-запрос, чтобы вручную просмотреть содержимое этой таблицы.
Скопируйте и выполните следующий код, чтобы просмотреть таблицу:
-- Review updated table SELECT * FROM user_ping_target
Шаг 5. Загрузка дополнительных данных и результатов предварительной версии
Вы можете многократно повторно выполнять шаги 2-4, чтобы получить новые пакеты случайных необработанных данных JSON в вашем источнике, идемпотентно загрузить их в Delta Lake с помощью COPY INTO
и просмотреть результаты. Попробуйте выполнить эти шаги по порядку или несколько раз, чтобы имитировать несколько пакетов необработанных данных, записываемых или выполняющихся COPY INTO
несколько раз без получения новых данных.
Шаг 6. Руководство по очистке
Когда вы закончите работу с этим руководством, вы можете очистить связанные ресурсы, если вы больше не хотите их сохранить.
Скопируйте и запустите следующий код, чтобы удалить базу данных, таблицы и удалить все данные:
%python # Drop database and tables and remove data spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") dbutils.fs.rm(source, True)
Чтобы остановить вычислительный ресурс, перейдите на вкладку Кластеры и завершите кластер.
Дополнительные ресурсы
- Справочная статья COPY INTO