Поделиться через


Создание конвейера обработки данных от начала до конца в Databricks

В этой статье показано, как создать и развернуть сквозной конвейер обработки данных, включая прием необработанных данных, преобразование данных и выполнение анализа обработанных данных.

Примечание.

Хотя в этой статье показано, как создать полный конвейер данных с помощью databricks записных книжек и задания Azure Databricks для оркестрации рабочего процесса, Databricks рекомендует использовать DLT, декларативный интерфейс для создания надежных, обслуживаемых и тестируемых конвейеров обработки данных.

Что такое конвейер данных

Конвейер данных реализует шаги, необходимые для перемещения данных из исходных систем, преобразования этих данных на основе требований и хранения данных в целевой системе. Конвейер данных включает все процессы, необходимые для преобразования необработанных данных в подготовленные данные, которые пользователи могут использовать. Например, конвейер данных может подготовить данные, чтобы аналитики и специалисты по обработке и анализу данных могли извлечь значение из данных с помощью анализа и отчетности.

Рабочий процесс извлечения, преобразования и загрузки (ETL) является общим примером конвейера данных. При обработке ETL данные передаются из исходных систем и записываются в промежуточную область, преобразуются на основе требований (обеспечение качества данных, дедупликации записей и т. д.), а затем записываются в целевую систему, например хранилище данных или озеро данных.

Шаги конвейера данных

Чтобы приступить к созданию конвейеров данных в Azure Databricks, в примере, приведенном в этой статье, описывается создание рабочего процесса обработки данных:

  • Используйте функции Azure Databricks для изучения необработанного набора данных.
  • Создайте блокнот Databricks для приема сырых исходных данных и записи их в целевую таблицу.
  • Создайте записную книжку Databricks для преобразования необработанных исходных данных и записи преобразованных данных в целевую таблицу.
  • Создайте записную книжку Databricks для запроса преобразованных данных.
  • Автоматизируйте конвейер данных с помощью задания Azure Databricks.

Требования

Пример: набор данных "Миллион песен"

Набор данных, используемый в этом примере, представляет собой подмножество набора данных "Миллион песен", коллекцию функций и метаданных для современных музыкальных треков. Этот набор данных доступен в примерах наборов данных, включенных в рабочую область Azure Databricks.

шаг 1. Создание вычислительного ресурса

Чтобы выполнить обработку и анализ данных в этом примере, создайте вычислительный ресурс для выполнения команд.

Примечание.

Так как в этом примере используется пример набора данных, хранящегося в DBFS, и рекомендуется сохранять таблицы для каталоге Unity, вы создадите вычислительный ресурс, настроенный с выделенным режимом доступа. Выделенный режим доступа предоставляет полный доступ к DBFS, а также обеспечивает доступ к каталогу Unity. См. рекомендации по лучшим практикам для DBFS и Unity Catalog.

  1. На боковой панели щелкните Вычислительная среда.
  2. На странице вычислений щелкните Создать ресурс вычислений.
  3. На новой странице вычислений введите уникальное имя вычислительного ресурса.
  4. В разделе Advanced, установите режим доступа на Ручной, затем выберите Выделенный.
  5. В отдельный пользователь или группавыберите имя пользователя.
  6. Оставьте остальные значения на их значениях по умолчанию и нажмите Создать.

Дополнительные сведения о вычислительных ресурсах Databricks см. в разделе вычислений.

Шаг 2. Изучение исходных данных

Сведения об использовании интерфейса Azure Databricks для изучения необработанных исходных данных см. в статье "Изучение исходных данных для конвейера данных". Если вы хотите перейти непосредственно к приему и подготовке данных, перейдите к шагу 3. Прием необработанных данных.

Шаг 3. Прием необработанных данных

На этом шаге вы загружаете необработанные данные в таблицу, чтобы сделать ее доступной для дальнейшей обработки. Для управления ресурсами данных на платформе Databricks, таких как таблицы, Databricks рекомендует Unity Catalog. Однако если у вас нет разрешений на создание необходимого каталога и схемы для публикации таблиц в каталоге Unity, вы по-прежнему можете выполнить следующие действия, публикуя таблицы в хранилище метаданных Hive.

Для приема данных Databricks рекомендует использовать автозагрузчик. Автозагрузчик автоматически определяет и обрабатывает новые файл, когда они поступают в облачное объектное хранилище.

Вы можете настроить Auto Loader для автоматического определения схемы загружаемых данных, что позволяет инициализировать таблицы без явного указания схемы данных, а также изменять схему таблицы с добавлением новых столбцов. Это устраняет необходимость вручную отслеживать и применять изменения схемы с течением времени. Databricks рекомендует вывод схемы при использовании автозагрузчика. Однако, как показано на этапе исследования данных, данные песен не содержат сведения о заголовке. Так как заголовок не хранится с данными, необходимо явно определить схему, как показано в следующем примере.

  1. На боковой панели щелкните Новая иконкаНовая и выберите Записная книжка в меню. Откроется диалоговое окно Создание записной книжки.

  2. Введите имя записной книжки, например Ingest songs data. По умолчанию:

  3. Введите следующее в первую ячейку блокнота:

    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define variables used in the code below
    file_path = "/databricks-datasets/songs/data-001/"
    table_name = "<table-name>"
    checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data"
    
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    (spark.readStream
      .format("cloudFiles")
      .schema(schema)
      .option("cloudFiles.format", "csv")
      .option("sep","\t")
      .load(file_path)
      .writeStream
      .option("checkpointLocation", checkpoint_path)
      .trigger(availableNow=True)
      .toTable(table_name)
    )
    

    Если вы используете Unity Catalog, замените <table-name> на имя каталога, схемы и таблицы для хранения загруженных записей (например, data_pipelines.songs_data.raw_song_data). В противном случае замените <table-name> именем таблицы для хранения загруженных записей, например, raw_song_data.

    Замените <checkpoint-path> путь к каталогу в DBFS для хранения файлов контрольных точек, например /tmp/pipeline_get_started/_checkpoint/song_data.

  4. Щелкните Меню запускаи выберите Запустить ячейку. В этом примере определяется схема данных с использованием информации из README, загружаются данные песен из всех файлов, содержащихся в file_path, и записываются данные в таблицу, указанную в table_name.

Шаг 4. Подготовка необработанных данных

Чтобы подготовить необработанные данные для анализа, выполните следующие действия, чтобы преобразовать необработанные данные песен, отфильтровав ненужные столбцы и добавив новое поле, содержащее метку времени для создания новой записи.

  1. На боковой панели щелкните Новая иконкаНовая и выберите Записная книжка в меню. Откроется диалоговое окно Создание записной книжки.

  2. Введите имя записной книжки. Например, Prepare songs data. Измените язык по умолчанию на SQL.

  3. Введите следующее в первую ячейку записной книжки:

    CREATE OR REPLACE TABLE
      <table-name> (
        artist_id STRING,
        artist_name STRING,
        duration DOUBLE,
        release STRING,
        tempo DOUBLE,
        time_signature DOUBLE,
        title STRING,
        year DOUBLE,
        processed_time TIMESTAMP
      );
    
    INSERT INTO
      <table-name>
    SELECT
      artist_id,
      artist_name,
      duration,
      release,
      tempo,
      time_signature,
      title,
      year,
      current_timestamp()
    FROM
      <raw-songs-table-name>
    

    Если вы используете каталог Unity, замените <table-name> каталогом, схемой и именем таблицы, чтобы содержать отфильтрованные и преобразованные записи (например, data_pipelines.songs_data.prepared_song_data). В противном случае замените <table-name> именем таблицы, содержащей отфильтрованные и преобразованные записи (например, prepared_song_data).

    Замените <raw-songs-table-name> на имя таблицы, которая содержит исходные записи песен, загруженные на предыдущем шаге.

  4. Щелкните Меню запускаи выберите Запустить ячейку.

Шаг 5. Запрос преобразованных данных

На этом шаге вы расширяете конвейер обработки, добавляя запросы для анализа данных песен. Эти запросы используют подготовленные записи, созданные на предыдущем шаге.

  1. На боковой панели щелкните Новая иконкаНовая и выберите Записная книжка в меню. Откроется диалоговое окно Создание записной книжки.

  2. Введите имя записной книжки. Например, Analyze songs data. Измените язык по умолчанию на SQL.

  3. Введите следующее в первую ячейку записной книжки:

    -- Which artists released the most songs each year?
    SELECT
      artist_name,
      count(artist_name)
    AS
      num_songs,
      year
    FROM
      <prepared-songs-table-name>
    WHERE
      year > 0
    GROUP BY
      artist_name,
      year
    ORDER BY
      num_songs DESC,
      year DESC
    

    Замените <prepared-songs-table-name> именем таблицы, содержащей подготовленные данные. Например, data_pipelines.songs_data.prepared_song_data.

  4. Щелкните значок стрелки вниз в меню действий ячейки, выберите Добавить ячейку ниже и введите следующее в новой ячейке:

     -- Find songs for your DJ list
     SELECT
       artist_name,
       title,
       tempo
     FROM
       <prepared-songs-table-name>
     WHERE
       time_signature = 4
       AND
       tempo between 100 and 140;
    

    Замените <prepared-songs-table-name> именем подготовленной таблицы, созданной на предыдущем шаге. Например, data_pipelines.songs_data.prepared_song_data.

  5. Чтобы выполнить запросы и просмотреть выходные данные, нажмите кнопку "Выполнить все".

Шаг 6. Создание задания Azure Databricks для запуска конвейера

Вы можете создать рабочий процесс для автоматизации выполнения действий приема, обработки и анализа данных с помощью задания Azure Databricks.

  1. В рабочей области Обработка и анализ данных и инженерии выполните одно из следующих действий:
    • Щелкните значок "Рабочие процессы"на боковой панели и выберите Кнопка "Создать задание".
    • На боковой панели щелкните Новый значокНовый и выберите задачу.
  2. В диалоговом окне задачи на вкладке "Задачи" замените Добавьте название для вашего задания... на название задания. Например, "Песни: рабочий процесс".
  3. В поле "Имя задачи" введите имя первой задачи, например Ingest_songs_data.
  4. В Типе выберите тип задачи Notebook.
  5. В источниквыберите рабочее пространство.
  6. В поле пути используйте браузер файлов, чтобы найти журнал данных, а затем нажмите Подтвердить.
  7. В разделе вычислений выберите вычислительный ресурс, созданный на шаге Create a compute resource.
  8. Нажмите кнопку Создать.
  9. Нажмите кнопку под только что созданной задачей и выберите Записная книжка.
  10. В поле "Имя задачи" введите имя задачи, например Prepare_songs_data.
  11. В Типе выберите тип задачи Записная книжка.
  12. В источниквыберите рабочее пространство.
  13. Используйте браузер файлов для поиска записной книжки подготовки данных, щелкните по названию записной книжки, затем нажмите Подтвердить.
  14. На вычислительном элементевыберите вычислительный ресурс, созданный на этапе Create a compute resource.
  15. Нажмите кнопку Создать.
  16. Нажмите кнопку под только что созданной задачей и выберите Записная книжка.
  17. В поле "Имя задачи" введите имя задачи, например Analyze_songs_data.
  18. В Тип выберите тип задачи Notebook.
  19. В источниквыберите рабочее пространство.
  20. Используйте браузер файлов, чтобы найти записную книжку для анализа данных, щелкните имя записной книжки и нажмите кнопку "Подтвердить".
  21. В вычисленийвыберите вычислительный ресурс, созданный на шаге Create a compute resource.
  22. Нажмите кнопку Создать.
  23. Чтобы запустить рабочий процесс, нажмите кнопку Кнопка . Чтобы просмотреть сведения о выполнении, щелкните ссылку в столбце время начала для запуска в представлении выполнения заданий. Щелкните каждую задачу, чтобы просмотреть сведения о выполнении задачи.
  24. Чтобы просмотреть результаты после завершения рабочего процесса, щелкните окончательную задачу анализа данных. Откроется страница вывода и отображается результаты запроса.

Шаг 7. Планирование задания конвейера данных

Примечание.

Чтобы продемонстрировать использование задания Azure Databricks для управления запланированным рабочим процессом, этот пример для начинающих разделяет этапы приема, подготовки и анализа на отдельные записные книжки, а затем каждая записная книжка используется для создания задачи в рамках задания. Если вся обработка содержится в одной записной книжке, её можно легко запланировать непосредственно из пользовательского интерфейса блокнота Azure Databricks. См. статью "Создание запланированных заданий записной книжки и управление ими".

Обычное требование заключается в том, чтобы запустить конвейер данных на запланированной основе. Чтобы назначить расписание для задачи, которая запускает конвейер:

  1. Щелкните Значок рабочих процессоврабочие процессы на боковой панели.
  2. В столбце Name щелкните на имя задания. На боковой панели отображаются Сведения о задании.
  3. Щелкните Добавить триггер в панели Сведения о задании и выберите Запланированные в Тип триггера.
  4. Укажите период, время начала и часовой пояс. При необходимости установите флажок Показать синтаксис Cron, чтобы отобразить и изменить расписание в синтаксисе Quartz Cron.
  5. Нажмите кнопку Сохранить.

Подробнее