Выполнение первой рабочей нагрузки извлечения, преобразования и загрузки в Azure Databricks
Узнайте, как использовать готовые к производству средства из Azure Databricks для разработки и развертывания первых конвейеров извлечения, преобразования и загрузки (ETL) для оркестрации данных.
К концу этой статьи вы сможете свободно выполнять следующие операции:
- Запуск вычислительного кластера Databricks для всех целей.
- Создание записной книжки Databricks.
- Настройку добавочного приема данных в Delta Lake с помощью автозагрузчика.
- Выполнение ячеек записной книжки для обработки, запроса и предварительного просмотра данных.
- Планирование записной книжки в качестве задания Databricks.
В этом учебнике используются интерактивные записные книжки для выполнения распространенных задач ETL в Python или Scala.
Вы также можете использовать разностные динамические таблицы для создания конвейеров ETL. Платформа Databricks создала разностные динамические таблицы, чтобы снизить сложность сборки, развертывания и обслуживания рабочих конвейеров ETL. См. руководство по . Запуск первого конвейера Delta Live Tables.
Вы также можете использовать поставщик Databricks Terraform для создания ресурсов для этой статьи. См. статью "Создание кластеров, записных книжек и заданий с помощью Terraform".
Требования
- Вы вошли в рабочую область Azure Databricks.
- У вас должно быть разрешение на создание кластера.
Примечание.
Если у вас нет прав управления кластером, вы по-прежнему можете выполнить большинство описанных ниже действий при наличии у вас доступа к кластеру.
Шаг 1. Создание кластера
Чтобы выполнить исследовательский анализ данных и инжиниринг данных, создайте кластер для предоставления вычислительных ресурсов, необходимых для выполнения команд.
- Щелкните "
Вычисления" на боковой панели.
- На странице "Вычислительная среда" щелкните элемент Создать кластер. Откроется страница создания кластера.
- Укажите уникальное имя кластера, оставьте оставшиеся значения в состоянии по умолчанию и щелкните Создать кластер.
Дополнительные сведения о кластерах Databricks см. в статье "Вычисления".
Шаг 2. Создание записной книжки Databricks
Чтобы создать записную книжку в рабочей области, нажмите кнопку "Создать" на боковой панели и нажмите кнопку "Записная книжка". Пустая записная книжка открывается в рабочей области.
Дополнительные сведения о создании записных книжек и управлении ими см. в статье Управление записными книжками.
Шаг 3. Настройка автозагрузчика для приема данных в Delta Lake
В Databricks рекомендуется использовать Автозагрузчик для добавочного приема данных. Автозагрузчик автоматически определяет и обрабатывает новые файл, когда они поступают в облачное объектное хранилище.
Databricks рекомендует хранить данные с использованием формата Delta Lake. Delta Lake — это уровень хранения открытого кода, который предоставляет транзакции ACID и обеспечивает гибридное решение "хранилище и озеро данных". Delta Lake — это формат по умолчанию для таблиц, созданных в Databricks.
Чтобы настроить автозагрузчик для приема данных в таблицу Delta Lake, скопируйте следующий код в пустую ячейку записной книжки:
Python
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"
# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(table_name))
Scala
// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._
// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"
// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)
// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(Trigger.AvailableNow)
.toTable(table_name)
Примечание.
Переменные, определенные в этом коде, должны позволить безопасно выполнять его без риска конфликта с существующими ресурсами рабочей области или другими пользователями. Ограниченные разрешения сети или хранилища приведут к ошибкам при выполнении этого кода; обратитесь к администратору рабочей области, чтобы решить проблему этих ограничений.
Дополнительные сведения об автозагрузчике см. в статье Автозагрузчик.
Шаг 4. Обработка данных и взаимодействие с ними
Записные книжки выполняют логические ячейки последовательно. Для выполнения логики в ячейке:
Чтобы запустить ячейку, выполненную на предыдущем шаге, выберите ячейку и нажмите клавиши SHIFT+ВВОД.
Чтобы запросить только что созданную таблицу, скопируйте следующий код в пустую ячейку, а затем нажмите клавиши SHIFT+ВВОД, чтобы выполнить код в этой ячейке.
Python
df = spark.read.table(table_name)
Scala
val df = spark.read.table(table_name)
Чтобы просмотреть данные в только что созданной таблице, скопируйте следующий код в пустую ячейку, а затем нажмите клавиши SHIFT+ВВОД, чтобы выполнить код в этой ячейке.
Python
display(df)
Scala
display(df)
Дополнительные сведения об интерактивных параметрах визуализации данных см. в разделе "Визуализации" в записных книжках Databricks.
Шаг 5. Планирование задания
Записные книжки Databricks можно запускать в качестве рабочих сценариев, добавляя их в виде задачи в задание Databricks. В этом шаге вы создадите новое задание, которое можно активировать вручную.
Чтобы запланировать выполнение записной книжки в качестве задачи, выполните следующие действия.
- Нажмите Расписание справа от строки заголовка.
- Введите уникальное Имя задания.
- Выберите Вручную.
- В раскрывающемся списке Кластер выберите кластер, созданный на шаге 1.
- Нажмите кнопку Создать.
- В открывшемся окне нажмите Запустить сейчас.
- Чтобы просмотреть результаты выполнения задания, щелкните
значок рядом с меткой времени последнего выполнения .
Дополнительные сведения о заданиях см. в разделе Что такое задания?.
Дополнительные интеграции
Дополнительные сведения об интеграции и средствах для инжиниринга данных с помощью Azure Databricks: