Поделиться через


Загрузка данных с помощью DLT

Вы можете загрузить данные из любого источника данных, поддерживаемого Apache Spark в Azure Databricks, с помощью DLT. Наборы данных (таблицы и представления) можно определить в DLT для любого запроса, возвращающего кадр данных Spark, включая потоковую передачу кадров данных и Pandas для кадров данных Spark. Для задач загрузки данных Databricks рекомендует использовать потоковые таблицы для большинства сценариев. Потоковые таблицы подходят для загрузки данных из облачного хранилища объектов с помощью автозагрузчика или из систем обмена сообщениями, таких как Kafka. В приведенных ниже примерах показаны некоторые распространенные шаблоны.

Важный

Не все источники данных поддерживают SQL. Вы можете комбинировать ноутбуки SQL и Python в конвейере обработки данных DLT, чтобы использовать SQL для всех операций после этапа загрузки данных.

Дополнительные сведения о работе с библиотеками, не упакованными в DLT по умолчанию, см. в статье Управление зависимостями Python для конвейеров DLT.

загружать файлы из облачного хранилища объектов

Databricks рекомендует использовать автозагрузчик с DLT для большинства задач приема данных из облачного хранилища объектов. Автозагрузчик и DLT предназначены для добавочной и идемпотентной загрузки постоянно растущих данных по мере поступления в облачное хранилище. В следующих примерах используется автозагрузчик для создания наборов данных из CSV-файлов и JSON:

Заметка

Чтобы загрузить файлы с помощью "Auto Loader" в конвейере, поддерживающем Unity Catalog, необходимо использовать внешние расположения. Дополнительные сведения об использовании каталога Unity с DLT см. в статье Использование каталога Unity с конвейерами DLT.

Питон

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")

См. Что такое Auto Loader? и синтаксис Auto Loader SQL.

Предупреждение

Если вы используете автозагрузчик с уведомлениями о файлах и запускаете полное обновление для конвейера или потоковой таблицы, необходимо вручную очистить ресурсы. Для выполнения очистки можно использовать CloudFilesResourceManager в рабочей тетради.

Загрузка данных из шины сообщений

Конвейеры DLT можно настроить для приема данных из шин сообщений с потоковыми таблицами. Databricks рекомендует объединять потоковые таблицы с непрерывным выполнением и расширенным автомасштабированием, чтобы обеспечить наиболее эффективный способ загрузки с низкими задержками из шин сообщений. См. раздел Оптимизация использования кластеров потоков DLT с улучшенным автомасштабированием.

Например, следующий код настраивает потоковую таблицу для приема данных из Kafka:

import dlt

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<server:ip>")
      .option("subscribe", "topic1")
      .option("startingOffsets", "latest")
      .load()
  )

Вы можете записывать низкоуровневые операции на чистом SQL, чтобы выполнять потоковые преобразования данных, как показано в следующем примере:

CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
  *
FROM
  STREAM(kafka_raw)
WHERE ...

Пример работы с Центрами событий см. в статье Использование Центров событий Azure в качестве источника данных DLT.

См. Настройка источников потоковой передачи данных.

Загрузка данных из внешних систем

DLT поддерживает загрузку данных из любого источника данных, поддерживаемого Azure Databricks. См. Подключение к источникам данных. Вы также можете загрузить внешние данные, используя Lakehouse Federation, для поддерживаемых источников данных . Так как для федерации Lakehouse требуется Databricks Runtime 13.3 LTS или более поздней версии, для использования конвейера федерации Lakehouse необходимо настроить использование канала предварительной версии .

Некоторые источники данных не поддерживают эквивалентную поддержку в SQL. Если вы не можете использовать федерацию Lakehouse с одним из этих источников данных, вы можете использовать записную книжку Python для приема данных из источника. Исходный код Python и SQL можно добавить в тот же конвейер DLT. В следующем примере объявляется материализованное представление для доступа к текущему состоянию данных в удаленной таблице PostgreSQL:

import dlt

@dlt.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Загрузка небольших или статических наборов данных из облачного хранилища объектов

Вы можете загружать небольшие или статические наборы данных с помощью синтаксиса загрузки Apache Spark. DLT поддерживает все форматы файлов, поддерживаемые Apache Spark в Azure Databricks. Полный список см. в параметрах формата данных.

В следующих примерах показано, как загрузить JSON для создания таблиц DLT:

Питон

@dlt.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;

Заметка

Конструкция SELECT * FROM format.`path`; SQL распространена во всех средах SQL в Azure Databricks. Рекомендуется использовать шаблон прямого доступа к файлам с помощью SQL с DLT.

Обеспечение безопасного доступа к учетным данным хранилища с секретами в потоке обработки

Вы можете использовать секреты Azure Databricks для хранения учетных данных, таких как ключи доступа или пароли. Чтобы настроить секрет в конвейере, используйте свойство Spark в конфигурации кластера параметров конвейера. См. раздел «Настройка вычислительных ресурсов для конвейера DLT».

В следующем примере используется секрет для хранения ключа доступа, необходимого для чтения входных данных из учетной записи хранения Azure Data Lake Storage Gen2 (ADLS Gen2) с помощью Auto Loader. Этот же метод можно использовать для настройки любого секрета, необходимого для конвейера, например ключей AWS для доступа к S3 или пароля к хранилищу метаданных Apache Hive.

Дополнительные сведения о работе с Azure Data Lake Storage 2-го поколения см. в статьях Подключение к Azure Data Lake Storage Gen2 и Хранилище блобов и.

Заметка

Необходимо добавить префикс spark.hadoop. в ключ конфигурации spark_conf, который задает значение секрета.

{
  "id": "43246596-a63f-11ec-b909-0242ac120002",
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "autoscale": {
        "min_workers": 1,
        "max_workers": 5,
        "mode": "ENHANCED"
      }
    }
  ],
  "development": true,
  "continuous": false,
  "libraries": [
    {
      "notebook": {
        "path": "/Users/user@databricks.com/DLT Notebooks/DLT quickstart"
      }
    }
  ],
  "name": "DLT quickstart using ADLS2"
}

Заменять

  • <storage-account-name> с именем учетной записи хранения ADLS 2-го поколения.
  • <scope-name> с именем области секрета Azure Databricks.
  • <secret-name> с именем ключа, содержащего ключ доступа к учетной записи хранения Azure.
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Заменить

  • <container-name> с именем контейнера учетной записи хранения Azure, в которой хранятся входные данные.
  • <storage-account-name> с именем учетной записи хранения ADLS второго поколения.
  • <path-to-input-dataset> пути к входной набору данных.

Загрузка данных из Центров событий Azure

Центры событий Azure — это служба потоковой передачи данных, которая предоставляет совместимый интерфейс Apache Kafka. Соединитель Структурированной потоковой передачи Kafka, включенный в среду выполнения DLT, можно использовать для загрузки сообщений из Центров событий Azure. Дополнительные сведения о загрузке и обработке сообщений из Центров событий Azure см. в статье Использование Центров событий Azure в качестве источника данных DLT.