Создание сквозного конвейера данных в Databricks
В этой статье показано, как создать и развернуть сквозной конвейер обработки данных, включая прием необработанных данных, преобразование данных и выполнение анализа обработанных данных.
Примечание.
Хотя в этой статье показано, как создать полный конвейер данных с помощью записных книжек Databricks и задания Azure Databricks для оркестрации рабочего процесса, Databricks рекомендует использовать Delta Live Tables, декларативный интерфейс для создания надежных, легко обслуживаемых и поддающихся тестированию конвейеров обработки данных.
Что такое конвейер данных
Конвейер данных реализует шаги, необходимые для перемещения данных из исходных систем, преобразования этих данных на основе требований и хранения данных в целевой системе. Конвейер данных включает все процессы, необходимые для преобразования необработанных данных в подготовленные данные, которые пользователи могут использовать. Например, конвейер данных может подготовить данные, чтобы аналитики и специалисты по обработке и анализу данных могли извлечь значение из данных с помощью анализа и отчетности.
Рабочий процесс извлечения, преобразования и загрузки (ETL) является общим примером конвейера данных. При обработке ETL данные передаются из исходных систем и записываются в промежуточную область, преобразуются на основе требований (обеспечение качества данных, дедупликации записей и т. д.), а затем записываются в целевую систему, например хранилище данных или озеро данных.
Шаги конвейера данных
Чтобы приступить к созданию конвейеров данных в Azure Databricks, в примере, приведенном в этой статье, описывается создание рабочего процесса обработки данных:
- Используйте функции Azure Databricks для изучения необработанного набора данных.
- Создайте блокнот Databricks для приема сырых исходных данных и записи их в целевую таблицу.
- Создайте записную книжку Databricks для преобразования необработанных исходных данных и записи преобразованных данных в целевую таблицу.
- Создайте записную книжку Databricks для запроса преобразованных данных.
- Автоматизируйте конвейер данных с помощью задания Azure Databricks.
Требования
- Вы вошли в Azure Databricks и в рабочей области Обработка и анализ данных и инженерии.
- У вас есть разрешение на создание кластера или доступ к кластеру.
- (Необязательно) Чтобы опубликовать таблицы в каталоге Unity, необходимо создать каталог и схему в каталоге Unity.
Пример: набор данных "Миллион песен"
Набор данных, используемый в этом примере, представляет собой подмножество набора данных "Миллион песен", коллекцию функций и метаданных для современных музыкальных треков. Этот набор данных доступен в примерах наборов данных, включенных в рабочую область Azure Databricks.
Шаг 1. Создание кластера
Чтобы выполнить обработку и анализ данных в этом примере, создайте кластер для предоставления вычислительных ресурсов, необходимых для выполнения команд.
Примечание.
Так как в этом примере используется образец набора данных, хранящегося в DBFS, и рекомендуется сохранять таблицы в каталоге Unity, создайте кластер, настроенный в режиме доступа одного пользователя. Режим доступа с одним пользователем предоставляет полный доступ к DBFS, а также обеспечивает доступ к каталогу Unity. См. рекомендации по лучшим практикам для DBFS и Unity Catalog.
- На боковой панели щелкните Вычислительная среда.
- На странице "Вычислительная среда" щелкните элемент Создать кластер.
- На странице "Новый кластер" введите уникальное имя кластера.
- В режиме доступавыберите Режим одного пользователя.
- В доступе единый пользователь или сервисный объектвыберите ваше имя пользователя.
- Оставьте оставшиеся значения в состоянии по умолчанию и щелкните Создать кластер.
Дополнительные сведения о кластерах Databricks см. в статье "Вычисления".
Шаг 2. Изучение исходных данных
Сведения об использовании интерфейса Azure Databricks для изучения необработанных исходных данных см. в статье "Изучение исходных данных для конвейера данных". Если вы хотите перейти непосредственно к приему и подготовке данных, перейдите к шагу 3. Прием необработанных данных.
Шаг 3. Прием необработанных данных
На этом шаге вы загружаете необработанные данные в таблицу, чтобы сделать ее доступной для дальнейшей обработки. Для управления ресурсами данных на платформе Databricks, таких как таблицы, Databricks рекомендует каталоге Unity. Однако если у вас нет разрешений на создание необходимого каталога и схемы для публикации таблиц в каталоге Unity, вы по-прежнему можете выполнить следующие действия, публикуя таблицы в хранилище метаданных Hive.
Для приема данных Databricks рекомендует использовать автозагрузчик. Автозагрузчик автоматически определяет и обрабатывает новые файл, когда они поступают в облачное объектное хранилище.
Вы можете настроить Auto Loader для автоматического определения схемы загружаемых данных, что позволяет инициализировать таблицы без явного указания схемы данных, а также изменять схему таблицы с добавлением новых столбцов. Это устраняет необходимость вручную отслеживать и применять изменения схемы с течением времени. Databricks рекомендует вывод схемы при использовании автозагрузчика. Однако, как показано на этапе исследования данных, данные песен не содержат сведения о заголовке. Так как заголовок не хранится с данными, необходимо явно определить схему, как показано в следующем примере.
На боковой панели щелкните Новая и выберите Записная книжка в меню. Откроется диалоговое окно Создание записной книжки.
Введите имя записной книжки, например
Ingest songs data
. По умолчанию:- Python — это выбранный язык.
- Записная книжка подключена к последнему используемому кластеру. В этом случае кластер, созданный на шаге 1. Создание кластера.
Введите следующую ячейку записной книжки:
from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField # Define variables used in the code below file_path = "/databricks-datasets/songs/data-001/" table_name = "<table-name>" checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data" schema = StructType( [ StructField("artist_id", StringType(), True), StructField("artist_lat", DoubleType(), True), StructField("artist_long", DoubleType(), True), StructField("artist_location", StringType(), True), StructField("artist_name", StringType(), True), StructField("duration", DoubleType(), True), StructField("end_of_fade_in", DoubleType(), True), StructField("key", IntegerType(), True), StructField("key_confidence", DoubleType(), True), StructField("loudness", DoubleType(), True), StructField("release", StringType(), True), StructField("song_hotnes", DoubleType(), True), StructField("song_id", StringType(), True), StructField("start_of_fade_out", DoubleType(), True), StructField("tempo", DoubleType(), True), StructField("time_signature", DoubleType(), True), StructField("time_signature_confidence", DoubleType(), True), StructField("title", StringType(), True), StructField("year", IntegerType(), True), StructField("partial_sequence", IntegerType(), True) ] ) (spark.readStream .format("cloudFiles") .schema(schema) .option("cloudFiles.format", "csv") .option("sep","\t") .load(file_path) .writeStream .option("checkpointLocation", checkpoint_path) .trigger(availableNow=True) .toTable(table_name) )
Если вы используете Unity Catalog, замените
<table-name>
на имя каталога, схемы и таблицы для хранения загруженных записей (например,data_pipelines.songs_data.raw_song_data
). В противном случае замените<table-name>
именем таблицы для хранения загруженных записей, например,raw_song_data
.Замените
<checkpoint-path>
путь к каталогу в DBFS для хранения файлов контрольных точек, например/tmp/pipeline_get_started/_checkpoint/song_data
.Щелкните и выберите Запустить ячейку. В этом примере определяется схема данных с использованием информации из
README
, загружаются данные песен из всех файлов, содержащихся вfile_path
, и записываются данные в таблицу, указанную вtable_name
.
Шаг 4. Подготовка необработанных данных
Чтобы подготовить необработанные данные для анализа, выполните следующие действия, чтобы преобразовать необработанные данные песен, отфильтровав ненужные столбцы и добавив новое поле, содержащее метку времени для создания новой записи.
На боковой панели щелкните Новая и выберите Записная книжка в меню. Откроется диалоговое окно Создание записной книжки.
Введите имя записной книжки. Например,
Prepare songs data
. Измените язык по умолчанию на SQL.Введите следующую ячейку записной книжки:
CREATE OR REPLACE TABLE <table-name> ( artist_id STRING, artist_name STRING, duration DOUBLE, release STRING, tempo DOUBLE, time_signature DOUBLE, title STRING, year DOUBLE, processed_time TIMESTAMP ); INSERT INTO <table-name> SELECT artist_id, artist_name, duration, release, tempo, time_signature, title, year, current_timestamp() FROM <raw-songs-table-name>
Если вы используете каталог Unity, замените
<table-name>
каталогом, схемой и именем таблицы, чтобы содержать отфильтрованные и преобразованные записи (например,data_pipelines.songs_data.prepared_song_data
). В противном случае замените<table-name>
именем таблицы, содержащей отфильтрованные и преобразованные записи (например,prepared_song_data
).Замените
<raw-songs-table-name>
на имя таблицы, которая содержит исходные записи песен, загруженные на предыдущем шаге.Щелкните и выберите Запустить ячейку.
Шаг 5. Запрос преобразованных данных
На этом шаге вы расширяете конвейер обработки, добавляя запросы для анализа данных песен. Эти запросы используют подготовленные записи, созданные на предыдущем шаге.
На боковой панели щелкните Новая и выберите Записная книжка в меню. Откроется диалоговое окно Создание записной книжки.
Введите имя записной книжки. Например,
Analyze songs data
. Измените язык по умолчанию на SQL.Введите следующую ячейку записной книжки:
-- Which artists released the most songs each year? SELECT artist_name, count(artist_name) AS num_songs, year FROM <prepared-songs-table-name> WHERE year > 0 GROUP BY artist_name, year ORDER BY num_songs DESC, year DESC
Замените
<prepared-songs-table-name>
именем таблицы, содержащей подготовленные данные. Например,data_pipelines.songs_data.prepared_song_data
.Щелкните в меню действий ячейки, выберите Добавить ячейку ниже и введите следующую команду в новой ячейке:
-- Find songs for your DJ list SELECT artist_name, title, tempo FROM <prepared-songs-table-name> WHERE time_signature = 4 AND tempo between 100 and 140;
Замените
<prepared-songs-table-name>
именем подготовленной таблицы, созданной на предыдущем шаге. Например,data_pipelines.songs_data.prepared_song_data
.Чтобы выполнить запросы и просмотреть выходные данные, нажмите кнопку "Выполнить все".
Шаг 6. Создание задания Azure Databricks для запуска конвейера
Вы можете создать рабочий процесс для автоматизации выполнения действий приема, обработки и анализа данных с помощью задания Azure Databricks.
- В рабочей области Обработка и анализ данных и инженерии выполните одно из следующих действий:
- Щелкните рабочие процессы на боковой панели и щелкните .
- На боковой панели щелкните Новый и выберите задачу.
- В диалоговом окне задачи на вкладке "Задачи " замените имя задания... именем задания. Например, "Рабочий процесс "Песни".
- В поле "Имя задачи" введите имя первой задачи, например
Ingest_songs_data
. - В типевыберите тип задачи записной книжки .
- В источниквыберите рабочее пространство.
- Используйте браузер файлов, чтобы найти записную книжку приема данных, щелкните имя записной книжки и нажмите кнопку "Подтвердить".
- В кластеревыберите Shared_job_cluster или кластер, созданный на шаге
Create a cluster
. - Нажмите кнопку Создать.
- Нажмите кнопку под только что созданной задачей и выберите Записная книжка.
- В поле "Имя задачи" введите имя задачи, например
Prepare_songs_data
. - В типевыберите тип задачи записной книжки .
- В источниквыберите рабочее пространство.
- Используйте браузер файлов для поиска записной книжки подготовки данных, щелкните имя записной книжки и нажмите кнопку "Подтвердить".
- В кластеревыберите Shared_job_cluster или кластер, созданный на шаге
Create a cluster
. - Нажмите кнопку Создать.
- Нажмите кнопку под только что созданной задачей и выберите Записная книжка.
- В поле "Имя задачи" введите имя задачи, например
Analyze_songs_data
. - В типевыберите тип задачи записной книжки .
- В источниквыберите рабочее пространство.
- Используйте браузер файлов, чтобы найти записную книжку для анализа данных, щелкните имя записной книжки и нажмите кнопку "Подтвердить".
- В кластеревыберите Shared_job_cluster или кластер, созданный на шаге
Create a cluster
. - Нажмите кнопку Создать.
- Чтобы запустить рабочий процесс, нажмите кнопку . Чтобы просмотреть
сведения о выполнении , щелкните ссылку в столбцевремени начала для запуска в представлении выполнения заданий . Щелкните каждую задачу, чтобы просмотреть сведения о выполнении задачи. - Чтобы просмотреть результаты после завершения рабочего процесса, щелкните окончательную задачу анализа данных. Откроется страница вывода и отображается результаты запроса.
Шаг 7. Планирование задания конвейера данных
Примечание.
Чтобы продемонстрировать использование задания Azure Databricks для оркестрации запланированного рабочего процесса, этот пример начала разделяет этапы приема, подготовки и анализа на отдельные записные книжки, а затем каждая записная книжка используется для создания задачи в задании. Если все обработка содержится в одной записной книжке, вы можете легко запланировать записную книжку непосредственно из пользовательского интерфейса записной книжки Azure Databricks. См. статью "Создание запланированных заданий записной книжки и управление ими".
Обычное требование заключается в том, чтобы запустить конвейер данных на запланированной основе. Чтобы определить расписание для задания, запускающего конвейер:
- Щелкните рабочие процессы на боковой панели.
- В столбце Name щелкните на имя задания. На боковой панели отображаются Сведения о задании.
- Щелкните Добавить триггер в панели Сведения о задании и выберите Запланированные в Тип триггера.
- Укажите период, время начала и часовой пояс. При необходимости установите флажок
Показать синтаксис Cron , чтобы отобразить и изменить расписание всинтаксиса Cron в . - Нажмите кнопку Сохранить.
Подробнее
- Дополнительные сведения о записных книжках Databricks см. в статье "Общие сведения о записных книжках Databricks".
- Дополнительные сведения о заданиях Azure Databricks см. в статье "Что такое задания Databricks?".
- Дополнительные сведения о Delta Lake см. в статье "Что такое Delta Lake?".
- Дополнительные сведения о конвейерах обработки данных с помощью разностных динамических таблиц см. в статье Что такое разностные динамические таблицы?.