Справочник по языку Python DLT
В этой статье содержатся сведения о интерфейсе программирования DLT Python.
Дополнительные сведения об API SQL см. в справочнике по языку DLT SQL.
Дополнительные сведения о настройке автозагрузчика см. в разделе Что такое автозагрузчик?.
перед началом работы
Ниже приведены важные рекомендации при реализации конвейеров с помощью интерфейса Python DLT:
- Так как функции Python
table()
иview()
вызываются несколько раз во время планирования и выполнения обновления конвейера, не включают код в одну из этих функций, которые могут иметь побочные эффекты (например, код, который изменяет данные или отправляет сообщение электронной почты). Чтобы избежать непредвиденного поведения, функции Python, определяющие наборы данных, должны содержать только код, необходимый для определения таблицы или представления. - Для выполнения таких операций, как отправка сообщений электронной почты или интеграция с внешней службой мониторинга, особенно в функциях, определяющих наборы данных, используйте перехватчики событий. Реализация этих операций в функциях, определяющих наборы данных, приведет к неожиданному поведению.
- Функции Python
table
иview
должны возвращать кадр данных. Некоторые функции, работающие с дейтафреймами, не возвращают дейтафреймы и не должны использоваться. Эти операции включают такие функции, какcollect()
,count()
,toPandas()
,save()
иsaveAsTable()
. Поскольку преобразования DataFrame выполняются после того, как полный граф потока данных был разрешён, использование таких операций может приводить к непреднамеренным побочным эффектам.
Импорт модуля Python dlt
Функции Python DLT определяются в модуле dlt
. Конвейеры, реализованные с помощью API Python, должны импортировать этот модуль:
import dlt
создание материализованного представления или потоковой таблицы DLT
В Python DLT определяет, следует ли обновлять набор данных в виде материализованного представления или таблицы потоковой передачи на основе определяющего запроса. Декоратор @table
можно использовать для определения материализованных представлений и потоковых таблиц.
Чтобы определить материализованное представление в Python, примените @table
к запросу, который выполняет статическое чтение к источнику данных. Чтобы определить потоковую таблицу, примените @table
к запросу, который выполняет потоковое чтение из источника данных или используйте функцию create_streaming_table(). Оба типа набора данных имеют одну и ту же спецификацию синтаксиса, как показано ниже.
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
создание представления DLT
Чтобы определить представление в Python, примените декоратор @view
. Как и декоратор @table
, можно использовать представления в DLT для статических или потоковых наборов данных. Ниже приведен синтаксис для определения представлений с помощью Python:
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Пример. Определение таблиц и представлений
Чтобы определить таблицу или представление в Python, примените декоратор @dlt.view
или @dlt.table
к функции. Имя функции или параметр name
можно использовать для назначения имени таблицы или представления. В следующем примере определяются два разных набора данных: представление с именем taxi_raw
, которое принимает JSON-файл в качестве источника входных данных и таблицу с именем filtered_data
, которая принимает представление taxi_raw
в качестве входных данных:
import dlt
@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return spark.read.table("taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return spark.read.table("taxi_raw").where(...)
Пример. Доступ к набору данных, определенному в том же конвейере
Заметка
Хотя функции dlt.read()
и dlt.read_stream()
по-прежнему доступны и полностью поддерживаются интерфейсом Python DLT, Databricks рекомендует всегда использовать функции spark.read.table()
и spark.readStream.table()
из-за следующих действий:
- Функции
spark
поддерживают чтение внутренних и внешних наборов данных, включая наборы данных во внешнем хранилище или определенные в других конвейерах. Функцииdlt
поддерживают только чтение внутренних наборов данных. - Функции
spark
поддерживают указание параметров, таких какskipChangeCommits
, для выполнения операций чтения. Указание параметров не поддерживается функциямиdlt
.
Чтобы получить доступ к набору данных, определенному в том же конвейере, используйте функции spark.read.table()
или spark.readStream.table()
:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredA():
return spark.read.table("customers_raw").where(...)
Заметка
При запросе представлений или таблиц в конвейере можно указать каталог и схему напрямую или использовать значения по умолчанию, настроенные в конвейере. В этом примере таблица customers
записывается и считывается из каталога по умолчанию и схемы, настроенной для конвейера.
Пример. Чтение из таблицы, зарегистрированной в хранилище метаданных
Чтобы считывать данные из таблицы, зарегистрированной в хранилище метаданных Hive, в аргументе функции можно указать имя таблицы с именем базы данных:
@dlt.table
def customers():
return spark.read.table("sales.customers").where(...)
Пример чтения из таблицы каталога Unity см. в разделе прием данных в конвейер каталога Unity.
пример . Доступ к набору данных с помощью spark.sql
Вы также можете вернуть набор данных с помощью выражения spark.sql
в функции запроса. Чтобы прочитать из внутреннего набора данных, можно оставить имя неквалифицированным для использования каталога и схемы по умолчанию, или предварительно указать их.
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM catalog_name.schema_name.customers_cleaned WHERE city = 'Chicago'")
Окончательное удаление записей из материализованного представления или потоковой таблицы
Чтобы окончательно удалить записи из материализованного представления или потоковой таблицы с включенными векторами удаления, например для соответствия GDPR, необходимо выполнить дополнительные операции в базовых таблицах Delta объекта. Чтобы обеспечить перманентное удаление записей из материализованного представления, см. удаление записей из материализованного представления с активными векторами удаления. Чтобы гарантировать удаление записей из потоковой таблицы, см. постоянное удаление записей из потоковой таблицы.
запись во внешние службы потоковой передачи событий или таблицы Delta с помощью DLT API sink
Важный
API sink
DLT находится в общедоступной предварительной версии.
Заметка
- Запуск полного обновления не очищает данные из приемников. Все повторно обработанные данные будут добавлены в приемник, и существующие данные не будут изменены.
- Ожидания DLT не поддерживаются в API
sink
.
Чтобы записать в стриминговую службу, такую как Apache Kafka или Центры событий Azure, или в таблицу Delta из конвейера DLT, используйте функцию create_sink()
, включенную в модуль Python dlt
. После создания приемника с функцией create_sink()
используется приемник в потоке добавления для записи данных в приемник. Поток добавления — единственный тип потока, поддерживаемый функцией create_sink()
. Другие типы потоков, например apply_changes
, не поддерживаются.
Ниже приведен синтаксис для создания приемника с функцией create_sink()
:
create_sink(<sink_name>, <format>, <options>)
Аргументы |
---|
name Тип: str Строка, идентифицирующая приемник и используемая для ссылки на него и управления им. Имена приемников должны быть уникальными для конвейера, включая весь исходный код, например записные книжки или модули, которые являются частью конвейера. Этот параметр является обязательным. |
format Тип: str Строка, определяющая выходной формат либо kafka , либо delta .Этот параметр является обязательным. |
options Тип: dict Необязательный список параметров приемника, отформатированный как {"key": "value"} , где и ключ, и значение — строки. Поддерживаются все параметры среды выполнения Databricks, поддерживаемые приемниками Kafka и Delta. См. параметры Kafka в разделе Конфигурация модуля записи структурированной потоковой передачи Kafka. Параметры Delta см. в таблице Delta в качестве приемника. |
Пример: создание приемника Kafka с помощью функции create_sink()
create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
Пример: Создание приемника Delta с помощью функции create_sink()
и пути к файловой системе
В следующем примере создается приемник, который записывает в таблицу Delta, передавая путь файловой системы в качестве аргумента.
create_sink(
"my_delta_sink",
"delta",
{ "path": "//path/to/my/delta/table" }
)
Пример: Создание приемника Delta с помощью функции create_sink()
и имени таблицы каталога Unity Catalog
Заметка
Приемник Delta поддерживает внешние и управляемые таблицы каталога Unity и управляемые таблицы хранилища метаданных Hive. Имена таблиц должны быть полностью квалифицированы. Например, таблицы каталога Unity должны использовать трехуровневый идентификатор: <catalog>.<schema>.<table>
. Таблицы хранилища метаданных Hive должны использовать <schema>.<table>
.
В следующем примере создается приемник, который записывает данные в одну из Delta-таблиц, передавая имя таблицы в каталоге Unity.
create_sink(
"my_delta_sink",
"delta",
{ "tableName": "my_catalog.my_schema.my_table" }
)
пример. Использование потока добавления для записи в приемник Delta
В следующем примере создается приемник, который записывается в таблицу Delta, а затем создает поток добавления для записи в этот приемник:
create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})
@append_flow(name = "flow", target = "my_sink")
def flowFunc():
return <streaming-query>
Пример: использование потока добавления для записи в хранилище Kafka
В следующем примере создается приемник, который записывается в раздел Kafka, а затем создает поток добавления для записи в этот приемник:
create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
@append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))
Схема фрейма данных, записанного в Kafka, должна содержать столбцы, указанные в настройках записи структурированной потоковой передачи Kafka.
Создание таблицы для использования в качестве целевой цели операций потоковой передачи
Используйте функцию create_streaming_table()
для создания целевой таблицы для выходных записей потоковых операций, включая apply_changes(), apply_changes_from_snapshot()и @append_flow.
Заметка
Функции create_target_table()
и create_streaming_live_table()
устарели. Databricks рекомендует обновить существующий код для использования функции create_streaming_table()
.
create_streaming_table(
name = "<table-name>",
comment = "<comment>",
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
path="<storage-location-path>",
schema="schema-definition",
expect_all = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
row_filter = "row-filter-clause"
)
Аргументы |
---|
name Тип: str Имя таблицы. Этот параметр является обязательным. |
comment Тип: str Необязательное описание таблицы. |
spark_conf Тип: dict Необязательный список конфигураций Spark для выполнения этого запроса. |
table_properties Тип: dict Необязательный список свойств таблицы для таблицы. |
partition_cols Тип: array Необязательный список одного или нескольких столбцов, используемых для секционирования таблицы. |
cluster_by Тип: array При необходимости включите кластеризацию жидкости в таблице и определите столбцы, используемые в качестве ключей кластеризации. См. Использование кластеризации жидкости для таблиц Delta. |
path Тип: str Необязательное расположение хранилища для данных таблицы. Если не задано, система по умолчанию использует расположение хранилища конвейера. |
schema Тип: str или StructType Необязательное определение схемы для таблицы. Схемы можно определить как строку DDL SQL или с помощью Python StructType . |
expect_all expect_all_or_drop expect_all_or_fail Тип: dict Необязательные ограничения качества данных для таблицы. См. множество ожиданий. |
row_filter (общедоступная предварительная версия)Тип: str Необязательная клаузула фильтра строк для таблицы. См. публикуйте таблицы с фильтрами строк и масками столбцов. |
Управление материализацией таблиц
Таблицы также предлагают дополнительный контроль над их материализацией:
- Укажите, как использовать
cluster_by
для кластеризации -таблиц. Для ускорения запросов можно использовать кластеризацию жидкости. См. Использование кластеризации жидкости для таблиц Delta. - Укажите, как таблицы секционированы с помощью
partition_cols
. - Свойства таблицы можно задать при определении представления или таблицы. См. свойства таблицы DLT .
- Задайте расположение хранилища для данных таблицы с помощью параметра
path
. По умолчанию данные таблицы хранятся в месте хранения данных конвейера, еслиpath
не задан. - В определении схемы можно использовать столбцы, созданные ,. См. пример: укажите схему и столбцы кластера.
Заметка
Для таблиц размером меньше 1 ТБ, Databricks рекомендует доверять управление организацией данных DLT. Не следует указывать столбцы разбиения, если вы не ожидаете, что таблица увеличится и превысит терабайт.
Пример : Укажите столбцы схемы и кластера
При необходимости можно указать схему таблицы с помощью StructType
Python или строки DDL SQL. При указании строки DDL определение может включать созданные столбцы.
В следующем примере создается таблица с именем sales
со схемой, указанной с помощью StructType
Python:
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
В следующем примере задается схема таблицы с использованием строки DDL, определяется созданный столбец и определяются столбцы кластеризации.
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
cluster_by = ["order_day_of_week", "customer_id"])
def sales():
return ("...")
По умолчанию DLT определяет схему из определения table
, если не указать схему.
Пример: указание столбцов секций
В следующем примере с помощью строки DDL задается схема таблицы, определяется сгенерированный столбец и задается столбец секционирования:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
Пример. Определение ограничений таблицы
Важный
Ограничения таблиц находятся в общедоступной предварительной версии.
При указании схемы можно определить первичные и внешние ключи. Ограничения являются информационными и не применяются. См. пункт CONSTRAINT в справочной информации по языку SQL.
В следующем примере определяется таблица с ограничением первичного и внешнего ключа:
@dlt.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
"""
def sales():
return ("...")
Пример. Определение фильтра строк и маски столбцов
Важный
Фильтры строк и маски столбцов находятся в общедоступной предварительной версии.
Чтобы создать материализованное представление или таблицу потоковой передачи с фильтром строк и маской столбцов, используйте предикат ROW FILTER и предикат MASK . В следующем примере показано, как определить материализованное представление и потоковую таблицу с фильтрацией строк и маскированием столбцов.
@dlt.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
return ("...")
Дополнительные сведения о фильтрах строк и масках столбцов см. в разделе о публикации таблиц с фильтрами строк и масками столбцов.
Настройка потоковой таблицы для пропуска изменений в исходной потоковой таблице
Заметка
- Флаг
skipChangeCommits
работает только сspark.readStream
с помощью функцииoption()
. Этот флаг нельзя использовать в функцииdlt.read_stream()
. - Флаг
skipChangeCommits
нельзя использовать, если исходная таблица потоковой передачи определена в качестве цели функции apply_changes().
По умолчанию для потоковых таблиц требуются источники, которые поддерживают только добавление. Если потоковая таблица использует другую потоковую таблицу в качестве источника, и исходная потоковая таблица требует обновлений или удаления данных, например в соответствии с GDPR «право на забвение», можно установить флаг skipChangeCommits
при чтении исходной потоковой таблицы, чтобы игнорировать эти изменения. Дополнительные сведения об этом флаге см. в разделе Игнорировать обновления и удалять.
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")
Свойства DLT для Python
В следующих таблицах описаны параметры и свойства, которые можно указать при определении таблиц и представлений с помощью DLT:
@table или @view |
---|
name Тип: str Необязательное имя таблицы или представления. Если оно не определено, имя функции используется в качестве имени таблицы или представления. |
comment Тип: str Необязательное описание таблицы. |
spark_conf Тип: dict Необязательный список конфигураций Spark для выполнения этого запроса. |
table_properties Тип: dict Необязательный список свойств для таблицы. |
path Тип: str Необязательное расположение хранилища для данных таблицы. Если не задано, система по умолчанию использует расположение хранилища конвейера. |
partition_cols Тип: a collection of str Необязательная коллекция, например, list , которая может содержать один или несколько столбцов для секционирования таблицы. |
cluster_by Тип: array При необходимости включите кластеризацию жидкости в таблице и определите столбцы, используемые в качестве ключей кластеризации. См. Использование кластеризации жидкости для таблиц Delta. |
schema Тип: str или StructType Необязательное определение схемы для таблицы. Схемы можно определить в виде строки DDL SQL или с использованием Python StructType . |
temporary Тип: bool Создайте таблицу, но не публикуйте метаданные для таблицы. Ключевое слово temporary указывает DLT создать таблицу, доступную для конвейера, но не должен быть доступ к ней за пределами конвейера. Чтобы сократить время обработки, временная таблица сохраняется в течение всего времени существования конвейера, создающего его, а не только одного обновления.Значение по умолчанию — False. |
row_filter (общедоступная предварительная версия)Тип: str Необязательное условие фильтрации строк для таблицы. Смотрите Публикация таблиц с фильтрами строк и масками столбцов. |
Определение таблицы или представления |
---|
def <function-name>() Функция Python, определяющая набор данных. Если параметр name не задан, <function-name> используется в качестве имени целевого набора данных. |
query Инструкция Spark SQL, возвращающая набор данных Spark или кадр данных Koalas. Используйте dlt.read() или spark.read.table() для выполнения полного чтения из набора данных, определенного в том же потоке данных. Чтобы прочитать внешний набор данных, используйте функцию spark.read.table() . Нельзя использовать dlt.read() для чтения внешних наборов данных. Так как spark.read.table() можно использовать для чтения внутренних наборов данных, наборов данных, определенных вне текущего конвейера, и позволяет указать параметры чтения данных, Databricks рекомендует использовать его вместо функции dlt.read() .При определении набора данных в конвейере по умолчанию он будет использовать каталог и схему, определенную в конфигурации конвейера. Функцию spark.read.table() можно использовать для чтения из набора данных, определенного в конвейере, без квалификации. Например, для чтения из набора данных с именем customers :spark.read.table("customers") Вы также можете использовать функцию spark.read.table() для чтения из таблицы, зарегистрированной в хранилище метаданных, при необходимости указав имя таблицы с именем базы данных:spark.read.table("sales.customers") Используйте dlt.read_stream() или spark.readStream.table() для выполнения потокового чтения из набора данных, определенного в том же конвейере. Для выполнения потокового чтения из внешнего набора данных используйтефункция spark.readStream.table() . Так как spark.readStream.table() можно использовать для чтения внутренних наборов данных, наборов данных, определенных вне текущего конвейера, и позволяет указать параметры чтения данных, Databricks рекомендует использовать его вместо функции dlt.read_stream() .Чтобы определить запрос в функции DLT table с помощью синтаксиса SQL, используйте функцию spark.sql . См. пример. Доступ к набору данных с помощью spark.sql . Чтобы определить запрос в функции DLT table с помощью Python, используйте синтаксис PySpark. |
Ожидания |
---|
@expect("description", "constraint") Объявите ограничение качества данных, определенное по description . Если строка нарушает ожидание, включите строку в целевой набор данных. |
@expect_or_drop("description", "constraint") Объявите ограничение на качество данных, определённое по description . Если строка нарушает ожидание, удалите строку из целевого набора данных. |
@expect_or_fail("description", "constraint") Объявить ограничение качества данных, обнаруженное description . Если строка нарушает ожидание, немедленно остановите выполнение. |
@expect_all(expectations) Объявите одно или несколько ограничений качества данных. expectations — это словарь Python, где ключом является описание ожидания, а значение — ограничение ожидания. Если строка нарушает любое из ожиданий, включите строку в целевой набор данных. |
@expect_all_or_drop(expectations) Объявите одно или несколько ограничений качества данных. expectations — это словарь Python, где ключом является описание ожидания, а значение — ограничение ожидания. Если строка нарушает любое из ожиданий, удалите строку из целевого набора данных. |
@expect_all_or_fail(expectations) Объявите одно или несколько ограничений качества данных. expectations — это словарь Python, где ключом является описание ожидания, а значение — ограничение ожидания. Если строка нарушает любое из ожиданий, немедленно остановите выполнение. |
запись измененных данных из канала изменений с помощью Python в DLT
Используйте функцию apply_changes()
в API Python, чтобы воспользоваться возможностями фиксации изменений данных (CDC) для обработки исходных данных из потока изменений (CDF).
Важно
Для применения изменений необходимо объявить целевую таблицу потоковой передачи. При необходимости можно указать схему для целевой таблицы. При указании схемы целевой таблицы apply_changes()
необходимо включить столбцы __START_AT
и __END_AT
с тем же типом данных, что и поля sequence_by
.
Чтобы создать требуемую целевую таблицу, можно использовать функцию create_streaming_table() в интерфейсе Python DLT.
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Заметка
Для обработки APPLY CHANGES
поведение по умолчанию для INSERT
и UPDATE
событий заключается в том, чтобы события upsert CDC из источника: обновите все строки в целевой таблице, соответствующие указанным ключам, или вставить новую строку, если соответствующая запись не существует в целевой таблице. Обработку событий DELETE
можно указать с условием APPLY AS DELETE WHEN
.
Дополнительные сведения об обработке CDC с помощью канала изменений см. в статье API APPLY CHANGES: упрощение отслеживания измененных данных с помощьюDLT. Пример использования функции apply_changes()
см. в разделе Пример: Обработка SCD типа 1 и SCD типа 2 с исходными данными CDF.
Важный
Для применения изменений необходимо объявить целевую таблицу потоковой передачи. При необходимости можно указать схему для целевой таблицы. При указании схемы целевой таблицы apply_changes
необходимо включить столбцы __START_AT
и __END_AT
с тем же типом данных, что и поле sequence_by
.
См. Интерфейсы API APPLY CHANGES: Упрощение фиксации данных изменений с помощью DLT.
изменение записи данных из моментальных снимков базы данных с помощью Python в DLT
Важный
API APPLY CHANGES FROM SNAPSHOT
находится в общедоступной предварительной версии.
Используйте функцию apply_changes_from_snapshot()
в API Python для использования функции фиксации изменений данных DLT (CDC) с целью обработки исходных данных из снимков базы данных.
Важный
Для применения изменений необходимо объявить целевую таблицу потоковой передачи. При необходимости можно указать схему для целевой таблицы. При указании схемы целевой таблицы apply_changes_from_snapshot()
необходимо также включить столбцы __START_AT
и __END_AT
с тем же типом данных, что и поле sequence_by
.
Чтобы создать требуемую целевую таблицу, можно использовать функцию create_streaming_table() в интерфейсе Python DLT.
apply_changes_from_snapshot(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
) -> None
Заметка
Для обработки APPLY CHANGES FROM SNAPSHOT
поведение по умолчанию заключается в том, чтобы вставить новую строку, если соответствующая запись с теми же ключами не существует в целевом объекте. Если соответствующая запись существует, она обновляется только в том случае, если все значения в строке изменились. Строки с ключами, которые присутствуют в целевом объекте, но больше не значатся в источнике, удаляются.
Для получения дополнительной информации о процессе CDC с использованием моментальных снимков см. раздел API Apply Changes: упрощение отслеживания изменений данных с помощью DLT. Примеры использования функции apply_changes_from_snapshot()
смотрите в примере периодического приема моментальных снимков и примере приема исторических моментальных снимков.
Аргументы |
---|
target Тип: str Имя обновляемой таблицы. Вы можете использовать функцию create_streaming_table() для создания целевой таблицы перед запуском функции apply_changes() .Этот параметр является обязательным. |
source Тип: str или lambda function Либо имя таблицы или представления, для создания периодических моментальных снимков, либо лямбда-функция на Python, которая возвращает таблицу данных моментального снимка для обработки и версию моментального снимка. См. чтобы реализовать аргумент source .Этот параметр является обязательным. |
keys Тип: list Столбец или сочетание столбцов, однозначно определяющих строку в исходных данных. Это позволяет определить, какие события CDC применяются к определенным записям в целевой таблице. Можно указать любой из следующих вариантов: — список строк: ["userId", "orderId"] — список функций col() Spark SQL: [col("userId"), col("orderId"] Аргументы функций col() не могут включать квалификаторы. Например, можно использовать col(userId) , но использовать col(source.userId) нельзя.Этот параметр является обязательным. |
stored_as_scd_type Тип: str или int Следует ли хранить записи как SCD типа 1 или SCD типа 2. Установите значение 1 для SCD типа 1 или 2 для SCD типа 2.Это предложение является необязательным. Значение по умолчанию — SCD тип 1. |
track_history_column_list track_history_except_column_list Тип: list Подмножество выходных столбцов для отслеживания истории в целевой таблице. Используйте track_history_column_list , чтобы указать полный список столбцов для отслеживания. Использованиеtrack_history_except_column_list , чтобы указать столбцы, которые следует исключить из отслеживания. Можно объявить любое значение в виде списка строк или как функции col() Spark SQL:- track_history_column_list = ["userId", "name", "city"] .- track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") Аргументы функций col() не могут включать квалификаторы. Например, можно использовать col(userId) , но использовать col(source.userId) нельзя.Этот параметр является необязательным. Значение по умолчанию — включать все столбцы в целевую таблицу, если нет track_history_column_list илиtrack_history_except_column_list аргумент передается функции. |
Реализация аргумента source
Функция apply_changes_from_snapshot()
включает аргумент source
. Для обработки исторических моментальных снимков аргумент source
, как ожидается, будет лямбда-функцией Python, которая возвращает два значения функции apply_changes_from_snapshot()
: DataFrame Python, содержащий данные моментального снимка для обработки, и версию моментального снимка.
Ниже приведена подпись лямбда-функции:
lambda Any => Optional[(DataFrame, Any)]
- Аргумент лямбда-функции является последней обработанной версией моментального снимка.
- Возвращаемое значение лямбда-функции —
None
или кортеж из двух значений: первое значение кортежа — это DataFrame, содержащий снимок данных для обработки. Вторым значением кортежа является версия, представляющая логический порядок моментального снимка.
Пример, реализующий и вызывающий лямбда-функцию:
def next_snapshot_and_version(latest_snapshot_version):
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
apply_changes_from_snapshot(
# ...
source = next_snapshot_and_version,
# ...
)
Среда выполнения DLT выполняет следующие действия при каждом запуске конвейера, содержащего функцию apply_changes_from_snapshot()
:
- Запускает функцию
next_snapshot_and_version
для загрузки следующего DataFrame моментального снимка и соответствующей версии этого снимка. - Если DataFrame не возвращается, выполнение завершается, и обновление конвейера помечается как завершённое.
- Обнаруживает изменения в новом моментальном снимке и поэтапно применяет их к целевой таблице.
- Возвращается к первому шагу, чтобы загрузить следующий снапшот и его версию.
Ограничения
Интерфейс Python DLT имеет следующее ограничение:
Функция pivot()
не поддерживается. Операция pivot
в Spark требует активной загрузки входных данных для вычисления выходной схемы. Эта возможность не поддерживается в DLT.