Загрузка данных с помощью DLT
Вы можете загрузить данные из любого источника данных, поддерживаемого Apache Spark в Azure Databricks, с помощью DLT. Наборы данных (таблицы и представления) можно определить в DLT для любого запроса, возвращающего кадр данных Spark, включая потоковую передачу кадров данных и Pandas для кадров данных Spark. Для задач загрузки данных Databricks рекомендует использовать потоковые таблицы для большинства сценариев. Потоковые таблицы подходят для загрузки данных из облачного хранилища объектов с помощью автозагрузчика или из систем обмена сообщениями, таких как Kafka. В приведенных ниже примерах показаны некоторые распространенные шаблоны.
Важный
Не все источники данных поддерживают SQL. Вы можете комбинировать ноутбуки SQL и Python в конвейере обработки данных DLT, чтобы использовать SQL для всех операций после этапа загрузки данных.
Дополнительные сведения о работе с библиотеками, не упакованными в DLT по умолчанию, см. в статье Управление зависимостями Python для конвейеров DLT.
загружать файлы из облачного хранилища объектов
Databricks рекомендует использовать автозагрузчик с DLT для большинства задач приема данных из облачного хранилища объектов. Автозагрузчик и DLT предназначены для добавочной и идемпотентной загрузки постоянно растущих данных по мере поступления в облачное хранилище. В следующих примерах используется автозагрузчик для создания наборов данных из CSV-файлов и JSON:
Заметка
Чтобы загрузить файлы с помощью "Auto Loader" в конвейере, поддерживающем Unity Catalog, необходимо использовать внешние расположения. Дополнительные сведения об использовании каталога Unity с DLT см. в статье Использование каталога Unity с конвейерами DLT.
Питон
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")
См. Что такое Auto Loader? и синтаксис Auto Loader SQL.
Предупреждение
Если вы используете автозагрузчик с уведомлениями о файлах и запускаете полное обновление для конвейера или потоковой таблицы, необходимо вручную очистить ресурсы. Для выполнения очистки можно использовать CloudFilesResourceManager в рабочей тетради.
Загрузка данных из шины сообщений
Конвейеры DLT можно настроить для приема данных из шин сообщений с потоковыми таблицами. Databricks рекомендует объединять потоковые таблицы с непрерывным выполнением и расширенным автомасштабированием, чтобы обеспечить наиболее эффективный способ загрузки с низкими задержками из шин сообщений. См. раздел Оптимизация использования кластеров потоков DLT с улучшенным автомасштабированием.
Например, следующий код настраивает потоковую таблицу для приема данных из Kafka:
import dlt
@dlt.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "topic1")
.option("startingOffsets", "latest")
.load()
)
Вы можете записывать низкоуровневые операции на чистом SQL, чтобы выполнять потоковые преобразования данных, как показано в следующем примере:
CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
*
FROM
STREAM(kafka_raw)
WHERE ...
Пример работы с Центрами событий см. в статье Использование Центров событий Azure в качестве источника данных DLT.
См. Настройка источников потоковой передачи данных.
Загрузка данных из внешних систем
DLT поддерживает загрузку данных из любого источника данных, поддерживаемого Azure Databricks. См. Подключение к источникам данных. Вы также можете загрузить внешние данные, используя Lakehouse Federation, для поддерживаемых источников данных . Так как для федерации Lakehouse требуется Databricks Runtime 13.3 LTS или более поздней версии, для использования конвейера федерации Lakehouse необходимо настроить использование канала предварительной версии .
Некоторые источники данных не поддерживают эквивалентную поддержку в SQL. Если вы не можете использовать федерацию Lakehouse с одним из этих источников данных, вы можете использовать записную книжку Python для приема данных из источника. Исходный код Python и SQL можно добавить в тот же конвейер DLT. В следующем примере объявляется материализованное представление для доступа к текущему состоянию данных в удаленной таблице PostgreSQL:
import dlt
@dlt.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
Загрузка небольших или статических наборов данных из облачного хранилища объектов
Вы можете загружать небольшие или статические наборы данных с помощью синтаксиса загрузки Apache Spark. DLT поддерживает все форматы файлов, поддерживаемые Apache Spark в Azure Databricks. Полный список см. в параметрах формата данных.
В следующих примерах показано, как загрузить JSON для создания таблиц DLT:
Питон
@dlt.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
SQL
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;
Заметка
Конструкция SELECT * FROM format.`path`;
SQL распространена во всех средах SQL в Azure Databricks. Рекомендуется использовать шаблон прямого доступа к файлам с помощью SQL с DLT.
Обеспечение безопасного доступа к учетным данным хранилища с секретами в потоке обработки
Вы можете использовать секреты Azure Databricks для хранения учетных данных, таких как ключи доступа или пароли. Чтобы настроить секрет в конвейере, используйте свойство Spark в конфигурации кластера параметров конвейера. См. раздел «Настройка вычислительных ресурсов для конвейера DLT».
В следующем примере используется секрет для хранения ключа доступа, необходимого для чтения входных данных из учетной записи хранения Azure Data Lake Storage Gen2 (ADLS Gen2) с помощью Auto Loader. Этот же метод можно использовать для настройки любого секрета, необходимого для конвейера, например ключей AWS для доступа к S3 или пароля к хранилищу метаданных Apache Hive.
Дополнительные сведения о работе с Azure Data Lake Storage 2-го поколения см. в статьях Подключение к Azure Data Lake Storage Gen2 и Хранилище блобов и.
Заметка
Необходимо добавить префикс spark.hadoop.
в ключ конфигурации spark_conf
, который задает значение секрета.
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/DLT Notebooks/DLT quickstart"
}
}
],
"name": "DLT quickstart using ADLS2"
}
Заменять
-
<storage-account-name>
с именем учетной записи хранения ADLS 2-го поколения. -
<scope-name>
с именем области секрета Azure Databricks. -
<secret-name>
с именем ключа, содержащего ключ доступа к учетной записи хранения Azure.
import dlt
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
Заменить
-
<container-name>
с именем контейнера учетной записи хранения Azure, в которой хранятся входные данные. -
<storage-account-name>
с именем учетной записи хранения ADLS второго поколения. -
<path-to-input-dataset>
пути к входной набору данных.
Загрузка данных из Центров событий Azure
Центры событий Azure — это служба потоковой передачи данных, которая предоставляет совместимый интерфейс Apache Kafka. Соединитель Структурированной потоковой передачи Kafka, включенный в среду выполнения DLT, можно использовать для загрузки сообщений из Центров событий Azure. Дополнительные сведения о загрузке и обработке сообщений из Центров событий Azure см. в статье Использование Центров событий Azure в качестве источника данных DLT.