Поделиться через


Загрузка данных с помощью таблиц Delta Live Tables

Вы можете загрузить данные из любого источника данных, поддерживаемого Apache Spark, в Azure Databricks с помощью таблиц Delta Live Tables. Вы можете определить наборы данных (таблицы и представления) в Delta Live Tables для любого запроса, возвращающего Spark DataFrame, включая потоковые DataFrames и Pandas для Spark DataFrames. Для задач приема данных Databricks рекомендует использовать таблицы потоковой передачи для большинства случаев использования. Таблицы потоковой передачи удобны для приема данных из облачного хранилища объектов с помощью Auto Loader или из шин сообщений, таких как Kafka. Приведенные ниже примеры демонстрируют некоторые распространенные шаблоны.

Внимание

Не все источники данных поддерживают SQL. Вы можете смешивать записные книжки SQL и Python в конвейере Delta Live Tables, чтобы использовать SQL для всех операций, помимо приема данных.

Дополнительные сведения о работе с библиотеками, не упакованными в разностные динамические таблицы по умолчанию, см. в статье "Управление зависимостями Python для конвейеров разностных динамических таблиц".

Загрузка файлов из облачного хранилища объектов

Databricks рекомендует использовать автозагрузчик с разностными динамическими таблицами для большинства задач приема данных из облачного хранилища объектов. Автозагрузчик и разностные динамические таблицы предназначены для добавочной и идемпотентной загрузки постоянно растущих данных по мере поступления в облачное хранилище. В следующих примерах для создания наборов данных из CSV- и JSON-файлов используется Автозагрузчик.

Примечание.

Для загрузки файлов с помощью автозагрузчика в конвейере с поддержкой каталога Unity необходимо использовать внешние расположения. Дополнительные сведения об использовании каталога Unity с разностными динамическими таблицами см. в статье Использование каталога Unity с конвейерами таблиц Delta Live Tables.

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")

См . раздел "Что такое автозагрузчик" и синтаксис SQL автозагрузчика.

Предупреждение

Если вы используете автозагрузчик с уведомлениями о файлах и запускаете полное обновление для конвейера или потоковой таблицы, необходимо вручную очистить ресурсы. Для выполнения очистки можно использовать CloudFilesResourceManager в записной книжке.

Загрузка данных из шины сообщений

Конвейеры Delta Live Tables можно настроить для приема данных из автобусов сообщений с помощью потоковых таблиц. Databricks рекомендует объединять потоковые таблицы с непрерывным выполнением и расширенным автомасштабированием, чтобы обеспечить наиболее эффективное прием для низкой задержки загрузки из автобусов сообщений. См. статью "Оптимизация использования кластеров конвейеров Delta Live Tables с расширенным автомасштабированием".

Например, следующий код настраивает потоковую таблицу для приема данных из Kafka:

import dlt

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<server:ip>")
      .option("subscribe", "topic1")
      .option("startingOffsets", "latest")
      .load()
  )

Вы можете записывать подчиненные операции в чистом SQL, чтобы выполнять преобразования потоковой передачи данных, как показано в следующем примере:

CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
  *
FROM
  STREAM(LIVE.kafka_raw)
WHERE ...

Пример работы с Центрами событий см. в разделе "Использование Центры событий Azure в качестве источника данных Delta Live Tables".

См. раздел "Настройка источников данных потоковой передачи".

Загрузка данных из внешних систем

Delta Live Tables поддерживает загрузку данных из любого источника данных, поддерживаемого Azure Databricks. См. статью "Подключение к источникам данных". Вы также можете загрузить внешние данные с помощью Федерации Lakehouse для поддерживаемых источников данных. Так как для федерации Lakehouse требуется Среда выполнения Databricks 13.3 LTS или более поздней версии, для использования конвейера Lakehouse федерации необходимо настроить для использования канала предварительной версии.

Некоторые источники данных не поддерживают эквивалентную поддержку в SQL. Если вы не можете использовать федерацию Lakehouse с одним из этих источников данных, вы можете использовать записную книжку Python для приема данных из источника. Исходный код Python и SQL можно добавить в тот же конвейер разностных динамических таблиц. В следующем примере объявляется материализованное представление для доступа к текущему состоянию данных в удаленной таблице PostgreSQL:

import dlt

@dlt.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Загрузка небольших или статических наборов данных из облачного хранилища объектов

Вы можете загружать небольшие или статические наборы данных с помощью синтаксиса загрузки Apache Spark. Delta Live Tables поддерживает все форматы файлов, поддерживаемые Apache Spark в Azure Databricks. Полный список см. в разделе "Параметры формата данных".

В следующих примерах показано, как загрузить JSON для создания таблиц Delta Live Tables:

Python

@dlt.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;

Примечание.

Конструкция SELECT * FROM format.`path`; SQL распространена во всех средах SQL в Azure Databricks. Это рекомендуемый шаблон прямого доступа к файлам с помощью SQL с разностными динамическими таблицами.

Безопасный доступ к учетным данным хранилища с секретами в конвейере

Секреты Azure Databricks можно использовать для хранения учетных данных, таких как ключи доступа или пароли. Чтобы настроить секрет в конвейере, используйте свойство Spark в конфигурации кластера параметров конвейера. Сведения о настройке вычислений для конвейера Delta Live Tables.

В следующем примере используется секрет для хранения ключа доступа, необходимого для чтения входных данных из учетной записи хранения Azure Data Lake Storage 2-го поколения (ADLS Gen2) с помощью автозагрузчика. Вы можете использовать этот же метод для настройки любого секрета, необходимого для вашего конвейера, например, ключей AWS для доступа к S3 или пароля к хранилищу метаданных Apache Hive.

Дополнительные сведения о работе с Azure Data Lake Storage 2-го поколения см. в статье "Подключение к Azure Data Lake Storage 2-го поколения и хранилищу BLOB-объектов".

Примечание.

Вам нужно добавить префикс в spark.hadoop. ключ конфигурации spark_conf, который задает значение секрета.

{
    "id": "43246596-a63f-11ec-b909-0242ac120002",
    "clusters": [
      {
        "spark_conf": {
          "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
        },
        "autoscale": {
          "min_workers": 1,
          "max_workers": 5,
          "mode": "ENHANCED"
        }
      }
    ],
    "development": true,
    "continuous": false,
    "libraries": [
      {
        "notebook": {
          "path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
        }
      }
    ],
    "name": "DLT quickstart using ADLS2"
}

Replace

  • <storage-account-name> на имя учетной записи хранения ADLS 2-го поколения.
  • <scope-name> на имя секретной области Azure Databricks.
  • <secret-name> на имя ключа, содержащего ключ доступа к учетной записи хранения Azure.
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Replace

  • <container-name> именем контейнера учетной записи хранения Azure, в котором хранятся входные данные.
  • <storage-account-name> на имя учетной записи хранения ADLS 2-го поколения.
  • <path-to-input-dataset> путем к входному набору данных.

Загрузка данных из Центры событий Azure

Центры событий Azure — это служба потоковой передачи данных, которая предоставляет совместимый интерфейс Apache Kafka. Для загрузки сообщений из Центры событий Azure можно использовать соединитель Структурированной потоковой передачи Kafka, включенный в среду выполнения Delta Live Tables. Дополнительные сведения о загрузке и обработке сообщений из Центры событий Azure см. в разделе "Использование Центры событий Azure в качестве источника данных Delta Live Tables".