Поделиться через


Создание сквозного конвейера данных в Databricks

В этой статье показано, как создать и развернуть сквозной конвейер обработки данных, включая прием необработанных данных, преобразование данных и выполнение анализа обработанных данных.

Примечание.

Хотя в этой статье показано, как создать полный конвейер данных с помощью записных книжек Databricks и задания Azure Databricks для оркестрации рабочего процесса, Databricks рекомендует использовать Delta Live Tables, декларативный интерфейс для создания надежных, легко обслуживаемых и поддающихся тестированию конвейеров обработки данных.

Что такое конвейер данных

Конвейер данных реализует шаги, необходимые для перемещения данных из исходных систем, преобразования этих данных на основе требований и хранения данных в целевой системе. Конвейер данных включает все процессы, необходимые для преобразования необработанных данных в подготовленные данные, которые пользователи могут использовать. Например, конвейер данных может подготовить данные, чтобы аналитики и специалисты по обработке и анализу данных могли извлечь значение из данных с помощью анализа и отчетности.

Рабочий процесс извлечения, преобразования и загрузки (ETL) является общим примером конвейера данных. При обработке ETL данные передаются из исходных систем и записываются в промежуточную область, преобразуются на основе требований (обеспечение качества данных, дедупликации записей и т. д.), а затем записываются в целевую систему, например хранилище данных или озеро данных.

Шаги конвейера данных

Чтобы приступить к созданию конвейеров данных в Azure Databricks, в примере, приведенном в этой статье, описывается создание рабочего процесса обработки данных:

  • Используйте функции Azure Databricks для изучения необработанного набора данных.
  • Создайте блокнот Databricks для приема сырых исходных данных и записи их в целевую таблицу.
  • Создайте записную книжку Databricks для преобразования необработанных исходных данных и записи преобразованных данных в целевую таблицу.
  • Создайте записную книжку Databricks для запроса преобразованных данных.
  • Автоматизируйте конвейер данных с помощью задания Azure Databricks.

Требования

Пример: набор данных "Миллион песен"

Набор данных, используемый в этом примере, представляет собой подмножество набора данных "Миллион песен", коллекцию функций и метаданных для современных музыкальных треков. Этот набор данных доступен в примерах наборов данных, включенных в рабочую область Azure Databricks.

Шаг 1. Создание кластера

Чтобы выполнить обработку и анализ данных в этом примере, создайте кластер для предоставления вычислительных ресурсов, необходимых для выполнения команд.

Примечание.

Так как в этом примере используется образец набора данных, хранящегося в DBFS, и рекомендуется сохранять таблицы в каталоге Unity, создайте кластер, настроенный в режиме доступа одного пользователя. Режим доступа с одним пользователем предоставляет полный доступ к DBFS, а также обеспечивает доступ к каталогу Unity. См. рекомендации по лучшим практикам для DBFS и Unity Catalog.

  1. На боковой панели щелкните Вычислительная среда.
  2. На странице "Вычислительная среда" щелкните элемент Создать кластер.
  3. На странице "Новый кластер" введите уникальное имя кластера.
  4. В режиме доступавыберите Режим одного пользователя.
  5. В доступе единый пользователь или сервисный объектвыберите ваше имя пользователя.
  6. Оставьте оставшиеся значения в состоянии по умолчанию и щелкните Создать кластер.

Дополнительные сведения о кластерах Databricks см. в статье "Вычисления".

Шаг 2. Изучение исходных данных

Сведения об использовании интерфейса Azure Databricks для изучения необработанных исходных данных см. в статье "Изучение исходных данных для конвейера данных". Если вы хотите перейти непосредственно к приему и подготовке данных, перейдите к шагу 3. Прием необработанных данных.

Шаг 3. Прием необработанных данных

На этом шаге вы загружаете необработанные данные в таблицу, чтобы сделать ее доступной для дальнейшей обработки. Для управления ресурсами данных на платформе Databricks, таких как таблицы, Databricks рекомендует каталоге Unity. Однако если у вас нет разрешений на создание необходимого каталога и схемы для публикации таблиц в каталоге Unity, вы по-прежнему можете выполнить следующие действия, публикуя таблицы в хранилище метаданных Hive.

Для приема данных Databricks рекомендует использовать автозагрузчик. Автозагрузчик автоматически определяет и обрабатывает новые файл, когда они поступают в облачное объектное хранилище.

Вы можете настроить Auto Loader для автоматического определения схемы загружаемых данных, что позволяет инициализировать таблицы без явного указания схемы данных, а также изменять схему таблицы с добавлением новых столбцов. Это устраняет необходимость вручную отслеживать и применять изменения схемы с течением времени. Databricks рекомендует вывод схемы при использовании автозагрузчика. Однако, как показано на этапе исследования данных, данные песен не содержат сведения о заголовке. Так как заголовок не хранится с данными, необходимо явно определить схему, как показано в следующем примере.

  1. На боковой панели щелкните Новая иконкаНовая и выберите Записная книжка в меню. Откроется диалоговое окно Создание записной книжки.

  2. Введите имя записной книжки, например Ingest songs data. По умолчанию:

    • Python — это выбранный язык.
    • Записная книжка подключена к последнему используемому кластеру. В этом случае кластер, созданный на шаге 1. Создание кластера.
  3. Введите следующую ячейку записной книжки:

    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define variables used in the code below
    file_path = "/databricks-datasets/songs/data-001/"
    table_name = "<table-name>"
    checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data"
    
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    (spark.readStream
      .format("cloudFiles")
      .schema(schema)
      .option("cloudFiles.format", "csv")
      .option("sep","\t")
      .load(file_path)
      .writeStream
      .option("checkpointLocation", checkpoint_path)
      .trigger(availableNow=True)
      .toTable(table_name)
    )
    

    Если вы используете Unity Catalog, замените <table-name> на имя каталога, схемы и таблицы для хранения загруженных записей (например, data_pipelines.songs_data.raw_song_data). В противном случае замените <table-name> именем таблицы для хранения загруженных записей, например, raw_song_data.

    Замените <checkpoint-path> путь к каталогу в DBFS для хранения файлов контрольных точек, например /tmp/pipeline_get_started/_checkpoint/song_data.

  4. Щелкните Меню запускаи выберите Запустить ячейку. В этом примере определяется схема данных с использованием информации из README, загружаются данные песен из всех файлов, содержащихся в file_path, и записываются данные в таблицу, указанную в table_name.

Шаг 4. Подготовка необработанных данных

Чтобы подготовить необработанные данные для анализа, выполните следующие действия, чтобы преобразовать необработанные данные песен, отфильтровав ненужные столбцы и добавив новое поле, содержащее метку времени для создания новой записи.

  1. На боковой панели щелкните Новая иконкаНовая и выберите Записная книжка в меню. Откроется диалоговое окно Создание записной книжки.

  2. Введите имя записной книжки. Например, Prepare songs data. Измените язык по умолчанию на SQL.

  3. Введите следующую ячейку записной книжки:

    CREATE OR REPLACE TABLE
      <table-name> (
        artist_id STRING,
        artist_name STRING,
        duration DOUBLE,
        release STRING,
        tempo DOUBLE,
        time_signature DOUBLE,
        title STRING,
        year DOUBLE,
        processed_time TIMESTAMP
      );
    
    INSERT INTO
      <table-name>
    SELECT
      artist_id,
      artist_name,
      duration,
      release,
      tempo,
      time_signature,
      title,
      year,
      current_timestamp()
    FROM
      <raw-songs-table-name>
    

    Если вы используете каталог Unity, замените <table-name> каталогом, схемой и именем таблицы, чтобы содержать отфильтрованные и преобразованные записи (например, data_pipelines.songs_data.prepared_song_data). В противном случае замените <table-name> именем таблицы, содержащей отфильтрованные и преобразованные записи (например, prepared_song_data).

    Замените <raw-songs-table-name> на имя таблицы, которая содержит исходные записи песен, загруженные на предыдущем шаге.

  4. Щелкните Меню запускаи выберите Запустить ячейку.

Шаг 5. Запрос преобразованных данных

На этом шаге вы расширяете конвейер обработки, добавляя запросы для анализа данных песен. Эти запросы используют подготовленные записи, созданные на предыдущем шаге.

  1. На боковой панели щелкните Новая иконкаНовая и выберите Записная книжка в меню. Откроется диалоговое окно Создание записной книжки.

  2. Введите имя записной книжки. Например, Analyze songs data. Измените язык по умолчанию на SQL.

  3. Введите следующую ячейку записной книжки:

    -- Which artists released the most songs each year?
    SELECT
      artist_name,
      count(artist_name)
    AS
      num_songs,
      year
    FROM
      <prepared-songs-table-name>
    WHERE
      year > 0
    GROUP BY
      artist_name,
      year
    ORDER BY
      num_songs DESC,
      year DESC
    

    Замените <prepared-songs-table-name> именем таблицы, содержащей подготовленные данные. Например, data_pipelines.songs_data.prepared_song_data.

  4. Щелкните Down Caret в меню действий ячейки, выберите Добавить ячейку ниже и введите следующую команду в новой ячейке:

     -- Find songs for your DJ list
     SELECT
       artist_name,
       title,
       tempo
     FROM
       <prepared-songs-table-name>
     WHERE
       time_signature = 4
       AND
       tempo between 100 and 140;
    

    Замените <prepared-songs-table-name> именем подготовленной таблицы, созданной на предыдущем шаге. Например, data_pipelines.songs_data.prepared_song_data.

  5. Чтобы выполнить запросы и просмотреть выходные данные, нажмите кнопку "Выполнить все".

Шаг 6. Создание задания Azure Databricks для запуска конвейера

Вы можете создать рабочий процесс для автоматизации выполнения действий приема, обработки и анализа данных с помощью задания Azure Databricks.

  1. В рабочей области Обработка и анализ данных и инженерии выполните одно из следующих действий:
    • Щелкните Значок рабочих процессоврабочие процессы на боковой панели и щелкните .Кнопка
    • На боковой панели щелкните Новый значокНовый и выберите задачу.
  2. В диалоговом окне задачи на вкладке "Задачи " замените имя задания... именем задания. Например, "Рабочий процесс "Песни".
  3. В поле "Имя задачи" введите имя первой задачи, например Ingest_songs_data.
  4. В типевыберите тип задачи записной книжки .
  5. В источниквыберите рабочее пространство.
  6. Используйте браузер файлов, чтобы найти записную книжку приема данных, щелкните имя записной книжки и нажмите кнопку "Подтвердить".
  7. В кластеревыберите Shared_job_cluster или кластер, созданный на шаге Create a cluster.
  8. Нажмите кнопку Создать.
  9. Нажмите кнопку под только что созданной задачей и выберите Записная книжка.
  10. В поле "Имя задачи" введите имя задачи, например Prepare_songs_data.
  11. В типевыберите тип задачи записной книжки .
  12. В источниквыберите рабочее пространство.
  13. Используйте браузер файлов для поиска записной книжки подготовки данных, щелкните имя записной книжки и нажмите кнопку "Подтвердить".
  14. В кластеревыберите Shared_job_cluster или кластер, созданный на шаге Create a cluster.
  15. Нажмите кнопку Создать.
  16. Нажмите кнопку под только что созданной задачей и выберите Записная книжка.
  17. В поле "Имя задачи" введите имя задачи, например Analyze_songs_data.
  18. В типевыберите тип задачи записной книжки .
  19. В источниквыберите рабочее пространство.
  20. Используйте браузер файлов, чтобы найти записную книжку для анализа данных, щелкните имя записной книжки и нажмите кнопку "Подтвердить".
  21. В кластеревыберите Shared_job_cluster или кластер, созданный на шаге Create a cluster.
  22. Нажмите кнопку Создать.
  23. Чтобы запустить рабочий процесс, нажмите кнопку Кнопка . Чтобы просмотреть сведения о выполнении, щелкните ссылку в столбце времени начала для запуска в представлении выполнения заданий . Щелкните каждую задачу, чтобы просмотреть сведения о выполнении задачи.
  24. Чтобы просмотреть результаты после завершения рабочего процесса, щелкните окончательную задачу анализа данных. Откроется страница вывода и отображается результаты запроса.

Шаг 7. Планирование задания конвейера данных

Примечание.

Чтобы продемонстрировать использование задания Azure Databricks для оркестрации запланированного рабочего процесса, этот пример начала разделяет этапы приема, подготовки и анализа на отдельные записные книжки, а затем каждая записная книжка используется для создания задачи в задании. Если все обработка содержится в одной записной книжке, вы можете легко запланировать записную книжку непосредственно из пользовательского интерфейса записной книжки Azure Databricks. См. статью "Создание запланированных заданий записной книжки и управление ими".

Обычное требование заключается в том, чтобы запустить конвейер данных на запланированной основе. Чтобы определить расписание для задания, запускающего конвейер:

  1. Щелкните Значок рабочих процессоврабочие процессы на боковой панели.
  2. В столбце Name щелкните на имя задания. На боковой панели отображаются Сведения о задании.
  3. Щелкните Добавить триггер в панели Сведения о задании и выберите Запланированные в Тип триггера.
  4. Укажите период, время начала и часовой пояс. При необходимости установите флажок Показать синтаксис Cron, чтобы отобразить и изменить расписание в синтаксиса Cron в.
  5. Нажмите кнопку Сохранить.

Подробнее