Поделиться через


Справочник по языку Python DLT

В этой статье содержатся сведения о интерфейсе программирования DLT Python.

Дополнительные сведения об API SQL см. в справочнике по языку DLT SQL.

Дополнительные сведения о настройке автозагрузчика см. в разделе Что такое автозагрузчик?.

перед началом работы

Ниже приведены важные рекомендации при реализации конвейеров с помощью интерфейса Python DLT:

  • Так как функции Python table() и view() вызываются несколько раз во время планирования и выполнения обновления конвейера, не включают код в одну из этих функций, которые могут иметь побочные эффекты (например, код, который изменяет данные или отправляет сообщение электронной почты). Чтобы избежать непредвиденного поведения, функции Python, определяющие наборы данных, должны содержать только код, необходимый для определения таблицы или представления.
  • Для выполнения таких операций, как отправка сообщений электронной почты или интеграция с внешней службой мониторинга, особенно в функциях, определяющих наборы данных, используйте перехватчики событий. Реализация этих операций в функциях, определяющих наборы данных, приведет к неожиданному поведению.
  • Функции Python table и view должны возвращать кадр данных. Некоторые функции, работающие с дейтафреймами, не возвращают дейтафреймы и не должны использоваться. Эти операции включают такие функции, как collect(), count(), toPandas(), save()и saveAsTable(). Поскольку преобразования DataFrame выполняются после того, как полный граф потока данных был разрешён, использование таких операций может приводить к непреднамеренным побочным эффектам.

Импорт модуля Python dlt

Функции Python DLT определяются в модуле dlt. Конвейеры, реализованные с помощью API Python, должны импортировать этот модуль:

import dlt

создание материализованного представления или потоковой таблицы DLT

В Python DLT определяет, следует ли обновлять набор данных в виде материализованного представления или таблицы потоковой передачи на основе определяющего запроса. Декоратор @table можно использовать для определения материализованных представлений и потоковых таблиц.

Чтобы определить материализованное представление в Python, примените @table к запросу, который выполняет статическое чтение к источнику данных. Чтобы определить потоковую таблицу, примените @table к запросу, который выполняет потоковое чтение из источника данных или используйте функцию create_streaming_table(). Оба типа набора данных имеют одну и ту же спецификацию синтаксиса, как показано ниже.

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

создание представления DLT

Чтобы определить представление в Python, примените декоратор @view. Как и декоратор @table, можно использовать представления в DLT для статических или потоковых наборов данных. Ниже приведен синтаксис для определения представлений с помощью Python:

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Пример. Определение таблиц и представлений

Чтобы определить таблицу или представление в Python, примените декоратор @dlt.view или @dlt.table к функции. Имя функции или параметр name можно использовать для назначения имени таблицы или представления. В следующем примере определяются два разных набора данных: представление с именем taxi_raw, которое принимает JSON-файл в качестве источника входных данных и таблицу с именем filtered_data, которая принимает представление taxi_raw в качестве входных данных:

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return spark.read.table("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return spark.read.table("taxi_raw").where(...)

Пример. Доступ к набору данных, определенному в том же конвейере

Заметка

Хотя функции dlt.read() и dlt.read_stream() по-прежнему доступны и полностью поддерживаются интерфейсом Python DLT, Databricks рекомендует всегда использовать функции spark.read.table() и spark.readStream.table() из-за следующих действий:

  • Функции spark поддерживают чтение внутренних и внешних наборов данных, включая наборы данных во внешнем хранилище или определенные в других конвейерах. Функции dlt поддерживают только чтение внутренних наборов данных.
  • Функции spark поддерживают указание параметров, таких как skipChangeCommits, для выполнения операций чтения. Указание параметров не поддерживается функциями dlt.

Чтобы получить доступ к набору данных, определенному в том же конвейере, используйте функции spark.read.table() или spark.readStream.table():

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return spark.read.table("customers_raw").where(...)

Заметка

При запросе представлений или таблиц в конвейере можно указать каталог и схему напрямую или использовать значения по умолчанию, настроенные в конвейере. В этом примере таблица customersзаписывается и считывается из каталога по умолчанию и схемы, настроенной для конвейера.

Пример. Чтение из таблицы, зарегистрированной в хранилище метаданных

Чтобы считывать данные из таблицы, зарегистрированной в хранилище метаданных Hive, в аргументе функции можно указать имя таблицы с именем базы данных:

@dlt.table
def customers():
  return spark.read.table("sales.customers").where(...)

Пример чтения из таблицы каталога Unity см. в разделе прием данных в конвейер каталога Unity.

пример . Доступ к набору данных с помощью spark.sql

Вы также можете вернуть набор данных с помощью выражения spark.sql в функции запроса. Чтобы прочитать из внутреннего набора данных, можно оставить имя неквалифицированным для использования каталога и схемы по умолчанию, или предварительно указать их.

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM catalog_name.schema_name.customers_cleaned WHERE city = 'Chicago'")

Окончательное удаление записей из материализованного представления или потоковой таблицы

Чтобы окончательно удалить записи из материализованного представления или потоковой таблицы с включенными векторами удаления, например для соответствия GDPR, необходимо выполнить дополнительные операции в базовых таблицах Delta объекта. Чтобы обеспечить перманентное удаление записей из материализованного представления, см. удаление записей из материализованного представления с активными векторами удаления. Чтобы гарантировать удаление записей из потоковой таблицы, см. постоянное удаление записей из потоковой таблицы.

запись во внешние службы потоковой передачи событий или таблицы Delta с помощью DLT API sink

Важный

API sink DLT находится в общедоступной предварительной версии.

Заметка

  • Запуск полного обновления не очищает данные из приемников. Все повторно обработанные данные будут добавлены в приемник, и существующие данные не будут изменены.
  • Ожидания DLT не поддерживаются в API sink.

Чтобы записать в стриминговую службу, такую как Apache Kafka или Центры событий Azure, или в таблицу Delta из конвейера DLT, используйте функцию create_sink(), включенную в модуль Python dlt. После создания приемника с функцией create_sink() используется приемник в потоке добавления для записи данных в приемник. Поток добавления — единственный тип потока, поддерживаемый функцией create_sink(). Другие типы потоков, например apply_changes, не поддерживаются.

Ниже приведен синтаксис для создания приемника с функцией create_sink():

create_sink(<sink_name>, <format>, <options>)
Аргументы
name

Тип: str

Строка, идентифицирующая приемник и используемая для ссылки на него и управления им. Имена приемников должны быть уникальными для конвейера, включая весь исходный код, например записные книжки или модули, которые являются частью конвейера.

Этот параметр является обязательным.
format

Тип: str

Строка, определяющая выходной формат либо kafka, либо delta.

Этот параметр является обязательным.
options

Тип: dict

Необязательный список параметров приемника, отформатированный как {"key": "value"}, где и ключ, и значение — строки. Поддерживаются все параметры среды выполнения Databricks, поддерживаемые приемниками Kafka и Delta. См. параметры Kafka в разделе Конфигурация модуля записи структурированной потоковой передачи Kafka. Параметры Delta см. в таблице Delta в качестве приемника.

Пример: создание приемника Kafka с помощью функции create_sink()

create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

Пример: Создание приемника Delta с помощью функции create_sink() и пути к файловой системе

В следующем примере создается приемник, который записывает в таблицу Delta, передавая путь файловой системы в качестве аргумента.

create_sink(
  "my_delta_sink",
    "delta",
    { "path": "//path/to/my/delta/table" }
)

Пример: Создание приемника Delta с помощью функции create_sink() и имени таблицы каталога Unity Catalog

Заметка

Приемник Delta поддерживает внешние и управляемые таблицы каталога Unity и управляемые таблицы хранилища метаданных Hive. Имена таблиц должны быть полностью квалифицированы. Например, таблицы каталога Unity должны использовать трехуровневый идентификатор: <catalog>.<schema>.<table>. Таблицы хранилища метаданных Hive должны использовать <schema>.<table>.

В следующем примере создается приемник, который записывает данные в одну из Delta-таблиц, передавая имя таблицы в каталоге Unity.

create_sink(
  "my_delta_sink",
    "delta",
    { "tableName": "my_catalog.my_schema.my_table" }
)

пример. Использование потока добавления для записи в приемник Delta

В следующем примере создается приемник, который записывается в таблицу Delta, а затем создает поток добавления для записи в этот приемник:

create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

@append_flow(name = "flow", target = "my_sink")
def flowFunc():
  return <streaming-query>

Пример: использование потока добавления для записи в хранилище Kafka

В следующем примере создается приемник, который записывается в раздел Kafka, а затем создает поток добавления для записи в этот приемник:

create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

@append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
  return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))

Схема фрейма данных, записанного в Kafka, должна содержать столбцы, указанные в настройках записи структурированной потоковой передачи Kafka.

Создание таблицы для использования в качестве целевой цели операций потоковой передачи

Используйте функцию create_streaming_table() для создания целевой таблицы для выходных записей потоковых операций, включая apply_changes(), apply_changes_from_snapshot()и @append_flow.

Заметка

Функции create_target_table() и create_streaming_live_table() устарели. Databricks рекомендует обновить существующий код для использования функции create_streaming_table().

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>",
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
  row_filter = "row-filter-clause"
)
Аргументы
name

Тип: str

Имя таблицы.

Этот параметр является обязательным.
comment

Тип: str

Необязательное описание таблицы.
spark_conf

Тип: dict

Необязательный список конфигураций Spark для выполнения этого запроса.
table_properties

Тип: dict

Необязательный список свойств таблицы для таблицы.
partition_cols

Тип: array

Необязательный список одного или нескольких столбцов, используемых для секционирования таблицы.
cluster_by

Тип: array

При необходимости включите кластеризацию жидкости в таблице и определите столбцы, используемые в качестве ключей кластеризации.

См. Использование кластеризации жидкости для таблиц Delta.
path

Тип: str

Необязательное расположение хранилища для данных таблицы. Если не задано, система по умолчанию использует расположение хранилища конвейера.
schema

Тип: str или StructType

Необязательное определение схемы для таблицы. Схемы можно определить как строку DDL SQL или с помощью Python
StructType.
expect_all
expect_all_or_drop
expect_all_or_fail

Тип: dict

Необязательные ограничения качества данных для таблицы. См. множество ожиданий.
row_filter (общедоступная предварительная версия)

Тип: str

Необязательная клаузула фильтра строк для таблицы. См. публикуйте таблицы с фильтрами строк и масками столбцов.

Управление материализацией таблиц

Таблицы также предлагают дополнительный контроль над их материализацией:

  • Укажите, как использовать cluster_byдля кластеризации -таблиц. Для ускорения запросов можно использовать кластеризацию жидкости. См. Использование кластеризации жидкости для таблиц Delta.
  • Укажите, как таблицы секционированы с помощью partition_cols.
  • Свойства таблицы можно задать при определении представления или таблицы. См. свойства таблицы DLT .
  • Задайте расположение хранилища для данных таблицы с помощью параметра path. По умолчанию данные таблицы хранятся в месте хранения данных конвейера, если path не задан.
  • В определении схемы можно использовать столбцы, созданные ,. См. пример: укажите схему и столбцы кластера.

Заметка

Для таблиц размером меньше 1 ТБ, Databricks рекомендует доверять управление организацией данных DLT. Не следует указывать столбцы разбиения, если вы не ожидаете, что таблица увеличится и превысит терабайт.

Пример : Укажите столбцы схемы и кластера

При необходимости можно указать схему таблицы с помощью StructType Python или строки DDL SQL. При указании строки DDL определение может включать созданные столбцы.

В следующем примере создается таблица с именем sales со схемой, указанной с помощью StructTypePython:

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

В следующем примере задается схема таблицы с использованием строки DDL, определяется созданный столбец и определяются столбцы кластеризации.

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  cluster_by = ["order_day_of_week", "customer_id"])
def sales():
  return ("...")

По умолчанию DLT определяет схему из определения table, если не указать схему.

Пример: указание столбцов секций

В следующем примере с помощью строки DDL задается схема таблицы, определяется сгенерированный столбец и задается столбец секционирования:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

Пример. Определение ограничений таблицы

Важный

Ограничения таблиц находятся в общедоступной предварительной версии.

При указании схемы можно определить первичные и внешние ключи. Ограничения являются информационными и не применяются. См. пункт CONSTRAINT в справочной информации по языку SQL.

В следующем примере определяется таблица с ограничением первичного и внешнего ключа:

@dlt.table(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """
def sales():
   return ("...")

Пример. Определение фильтра строк и маски столбцов

Важный

Фильтры строк и маски столбцов находятся в общедоступной предварительной версии.

Чтобы создать материализованное представление или таблицу потоковой передачи с фильтром строк и маской столбцов, используйте предикат ROW FILTER и предикат MASK . В следующем примере показано, как определить материализованное представление и потоковую таблицу с фильтрацией строк и маскированием столбцов.

@dlt.table(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
   return ("...")

Дополнительные сведения о фильтрах строк и масках столбцов см. в разделе о публикации таблиц с фильтрами строк и масками столбцов.

Настройка потоковой таблицы для пропуска изменений в исходной потоковой таблице

Заметка

  • Флаг skipChangeCommits работает только с spark.readStream с помощью функции option(). Этот флаг нельзя использовать в функции dlt.read_stream().
  • Флаг skipChangeCommits нельзя использовать, если исходная таблица потоковой передачи определена в качестве цели функции apply_changes().

По умолчанию для потоковых таблиц требуются источники, которые поддерживают только добавление. Если потоковая таблица использует другую потоковую таблицу в качестве источника, и исходная потоковая таблица требует обновлений или удаления данных, например в соответствии с GDPR «право на забвение», можно установить флаг skipChangeCommits при чтении исходной потоковой таблицы, чтобы игнорировать эти изменения. Дополнительные сведения об этом флаге см. в разделе Игнорировать обновления и удалять.

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("A")

Свойства DLT для Python

В следующих таблицах описаны параметры и свойства, которые можно указать при определении таблиц и представлений с помощью DLT:

@table или @view
name

Тип: str

Необязательное имя таблицы или представления. Если оно не определено, имя функции используется в качестве имени таблицы или представления.
comment

Тип: str

Необязательное описание таблицы.
spark_conf

Тип: dict

Необязательный список конфигураций Spark для выполнения этого запроса.
table_properties

Тип: dict

Необязательный список свойств для таблицы.
path

Тип: str

Необязательное расположение хранилища для данных таблицы. Если не задано, система по умолчанию использует расположение хранилища конвейера.
partition_cols

Тип: a collection of str

Необязательная коллекция, например, list, которая может содержать один или несколько столбцов для секционирования таблицы.
cluster_by

Тип: array

При необходимости включите кластеризацию жидкости в таблице и определите столбцы, используемые в качестве ключей кластеризации.

См. Использование кластеризации жидкости для таблиц Delta.
schema

Тип: str или StructType

Необязательное определение схемы для таблицы. Схемы можно определить в виде строки DDL SQL или с использованием Python StructType.
temporary

Тип: bool

Создайте таблицу, но не публикуйте метаданные для таблицы. Ключевое слово temporary указывает DLT создать таблицу, доступную для конвейера, но не должен быть доступ к ней за пределами конвейера. Чтобы сократить время обработки, временная таблица сохраняется в течение всего времени существования конвейера, создающего его, а не только одного обновления.

Значение по умолчанию — False.
row_filter (общедоступная предварительная версия)

Тип: str

Необязательное условие фильтрации строк для таблицы. Смотрите Публикация таблиц с фильтрами строк и масками столбцов.
Определение таблицы или представления
def <function-name>()

Функция Python, определяющая набор данных. Если параметр name не задан, <function-name> используется в качестве имени целевого набора данных.
query

Инструкция Spark SQL, возвращающая набор данных Spark или кадр данных Koalas.

Используйте dlt.read() или spark.read.table() для выполнения полного чтения из набора данных, определенного в том же потоке данных. Чтобы прочитать внешний набор данных, используйте функцию spark.read.table(). Нельзя использовать dlt.read() для чтения внешних наборов данных. Так как spark.read.table() можно использовать для чтения внутренних наборов данных, наборов данных, определенных вне текущего конвейера, и позволяет указать параметры чтения данных, Databricks рекомендует использовать его вместо функции dlt.read().

При определении набора данных в конвейере по умолчанию он будет использовать каталог и схему, определенную в конфигурации конвейера. Функцию spark.read.table() можно использовать для чтения из набора данных, определенного в конвейере, без квалификации. Например, для чтения из набора данных с именем customers:

spark.read.table("customers")

Вы также можете использовать функцию spark.read.table() для чтения из таблицы, зарегистрированной в хранилище метаданных, при необходимости указав имя таблицы с именем базы данных:

spark.read.table("sales.customers")

Используйте dlt.read_stream() или spark.readStream.table() для выполнения потокового чтения из набора данных, определенного в том же конвейере. Для выполнения потокового чтения из внешнего набора данных используйте
функция spark.readStream.table(). Так как spark.readStream.table() можно использовать для чтения внутренних наборов данных, наборов данных, определенных вне текущего конвейера, и позволяет указать параметры чтения данных, Databricks рекомендует использовать его вместо функции dlt.read_stream().

Чтобы определить запрос в функции DLT table с помощью синтаксиса SQL, используйте функцию spark.sql. См. пример. Доступ к набору данных с помощью spark.sql. Чтобы определить запрос в функции DLT table с помощью Python, используйте синтаксис PySpark.
Ожидания
@expect("description", "constraint")

Объявите ограничение качества данных, определенное по
description. Если строка нарушает ожидание, включите строку в целевой набор данных.
@expect_or_drop("description", "constraint")

Объявите ограничение на качество данных, определённое по
description. Если строка нарушает ожидание, удалите строку из целевого набора данных.
@expect_or_fail("description", "constraint")

Объявить ограничение качества данных, обнаруженное
description. Если строка нарушает ожидание, немедленно остановите выполнение.
@expect_all(expectations)

Объявите одно или несколько ограничений качества данных.
expectations — это словарь Python, где ключом является описание ожидания, а значение — ограничение ожидания. Если строка нарушает любое из ожиданий, включите строку в целевой набор данных.
@expect_all_or_drop(expectations)

Объявите одно или несколько ограничений качества данных.
expectations — это словарь Python, где ключом является описание ожидания, а значение — ограничение ожидания. Если строка нарушает любое из ожиданий, удалите строку из целевого набора данных.
@expect_all_or_fail(expectations)

Объявите одно или несколько ограничений качества данных.
expectations — это словарь Python, где ключом является описание ожидания, а значение — ограничение ожидания. Если строка нарушает любое из ожиданий, немедленно остановите выполнение.

запись измененных данных из канала изменений с помощью Python в DLT

Используйте функцию apply_changes() в API Python, чтобы воспользоваться возможностями фиксации изменений данных (CDC) для обработки исходных данных из потока изменений (CDF).

Важно

Для применения изменений необходимо объявить целевую таблицу потоковой передачи. При необходимости можно указать схему для целевой таблицы. При указании схемы целевой таблицы apply_changes() необходимо включить столбцы __START_AT и __END_AT с тем же типом данных, что и поля sequence_by.

Чтобы создать требуемую целевую таблицу, можно использовать функцию create_streaming_table() в интерфейсе Python DLT.

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Заметка

Для обработки APPLY CHANGES поведение по умолчанию для INSERT и UPDATE событий заключается в том, чтобы события upsert CDC из источника: обновите все строки в целевой таблице, соответствующие указанным ключам, или вставить новую строку, если соответствующая запись не существует в целевой таблице. Обработку событий DELETE можно указать с условием APPLY AS DELETE WHEN.

Дополнительные сведения об обработке CDC с помощью канала изменений см. в статье API APPLY CHANGES: упрощение отслеживания измененных данных с помощьюDLT. Пример использования функции apply_changes() см. в разделе Пример: Обработка SCD типа 1 и SCD типа 2 с исходными данными CDF.

Важный

Для применения изменений необходимо объявить целевую таблицу потоковой передачи. При необходимости можно указать схему для целевой таблицы. При указании схемы целевой таблицы apply_changes необходимо включить столбцы __START_AT и __END_AT с тем же типом данных, что и поле sequence_by.

См. Интерфейсы API APPLY CHANGES: Упрощение фиксации данных изменений с помощью DLT.

Аргументы
target

Тип: str

Имя обновляемой таблицы. Вы можете использовать функцию create_streaming_table(), чтобы создать целевую таблицу перед выполнением функции apply_changes().

Этот параметр является обязательным.
source

Тип: str

Источник данных, содержащий записи CDC.

Этот параметр является обязательным.
keys

Тип: list

Столбец или сочетание столбцов, однозначно определяющих строку в исходных данных. Это позволяет определить, какие события CDC применяются к определенным записям в целевой таблице.

Можно указать любой из следующих вариантов:

— список строк: ["userId", "orderId"]
— список функций col() Spark SQL: [col("userId"), col("orderId"]

Аргументы в функции col() не могут включать модификаторы. Например, можно использовать col(userId), но использовать col(source.userId)нельзя.

Этот параметр является обязательным.
sequence_by

Тип: str или col()

Имя столбца, указывающее логический порядок событий CDC в исходных данных. DLT использует эту последовательность для обработки событий изменений, поступающих в неправильном порядке.

Можно указать любой из следующих вариантов:

— строка: "sequenceNum"
— Функция col() Spark SQL: col("sequenceNum")

Аргументы для функций col() не могут включать квалификаторы. Например, можно использовать col(userId), но использовать col(source.userId)нельзя.

Указанный столбец должен быть сортируемым типом данных.

Этот параметр является обязательным.
ignore_null_updates

Тип: bool

Разрешить прием обновлений, содержащих подмножество целевых столбцов. Если событие CDC соответствует существующей строке и ignore_null_updates равно True, столбцы с null сохраняют свои существующие значения в целевой базе данных. Это также относится к вложенным столбцам со значением null. Если ignore_null_updates равно False, существующие значения перезаписываются со значениями null.

Этот параметр является необязательным.

Значение по умолчанию — False.
apply_as_deletes

Тип: str или expr()

Указывает, когда событие CDC должно рассматриваться как DELETE, а не upsert. Чтобы обрабатывать данные вне порядка, удаленная строка временно сохраняется в виде могилы в базовой таблице Delta, а представление создается в хранилище метаданных, которое фильтрует эти могилы. Интервал хранения можно настроить с помощью
pipelines.cdc.tombstoneGCThresholdInSeconds свойстве таблицы.

Можно указать любой из следующих вариантов:

— строка: "Operation = 'DELETE'"
Функция Spark SQL expr(): expr("Operation = 'DELETE'")

Этот параметр является необязательным.
apply_as_truncates

Тип: str или expr()

Указывает, когда событие CDC следует рассматривать как полную таблицу TRUNCATE. Так как это предложение активирует полное удаление данных целевой таблицы, его желательно использовать только для конкретных случаев, требующих этого функционала.

Параметр apply_as_truncates поддерживается только для SCD типа 1. Тип SCD 2 не поддерживает операции усечения данных.

Можно указать любой из следующих вариантов:

— строка: "Operation = 'TRUNCATE'"
— Функция expr() Spark SQL: expr("Operation = 'TRUNCATE'")

Этот параметр является необязательным.
column_list

except_column_list

Тип: list

Подмножество столбцов для включения в целевую таблицу. Используйте column_list, чтобы указать полный список столбцов для включения. Используйте except_column_list, чтобы указать столбцы для исключения. Можно объявить любое значение в виде списка строк или как функции col() Spark SQL:

- column_list = ["userId", "name", "city"].
- column_list = [col("userId"), col("name"), col("city")]
- except_column_list = ["operation", "sequenceNum"]
- except_column_list = [col("operation"), col("sequenceNum")

Аргументы для функций col() не могут включать квалификаторы. Например, можно использовать col(userId), но использовать col(source.userId)нельзя.

Этот параметр является необязательным.

Значение по умолчанию — включать все столбцы в целевую таблицу, если аргумент column_list или except_column_list не передается функции.
stored_as_scd_type

Тип: str или int

Следует ли хранить записи как SCD типа 1 или SCD типа 2.

Установите значение 1 для SCD типа 1 или 2 для SCD типа 2.

Это предложение является необязательным.

Значение по умолчанию — тип SCD 1.
track_history_column_list

track_history_except_column_list

Тип: list

Подмножество столбцов выходных данных для исторического отслеживания в целевой таблице. Используйте track_history_column_list, чтобы указать полный список столбцов для отслеживания. Использование
track_history_except_column_list, чтобы указать столбцы, которые следует исключить из отслеживания. Можно объявить любое значение в виде списка строк или как функции col() Spark SQL:
- track_history_column_list = ["userId", "name", "city"].
- track_history_column_list = [col("userId"), col("name"), col("city")]
- track_history_except_column_list = ["operation", "sequenceNum"]
- track_history_except_column_list = [col("operation"), col("sequenceNum")

Аргументы для функций col() не могут включать квалификаторы. Например, можно использовать col(userId), но использовать col(source.userId)нельзя.

Этот параметр является необязательным.

Значение по умолчанию — включать все столбцы в целевую таблицу, если не track_history_column_list или
track_history_except_column_list аргумент передается функции.

изменение записи данных из моментальных снимков базы данных с помощью Python в DLT

Важный

API APPLY CHANGES FROM SNAPSHOT находится в общедоступной предварительной версии.

Используйте функцию apply_changes_from_snapshot() в API Python для использования функции фиксации изменений данных DLT (CDC) с целью обработки исходных данных из снимков базы данных.

Важный

Для применения изменений необходимо объявить целевую таблицу потоковой передачи. При необходимости можно указать схему для целевой таблицы. При указании схемы целевой таблицы apply_changes_from_snapshot() необходимо также включить столбцы __START_AT и __END_AT с тем же типом данных, что и поле sequence_by.

Чтобы создать требуемую целевую таблицу, можно использовать функцию create_streaming_table() в интерфейсе Python DLT.

apply_changes_from_snapshot(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
) -> None

Заметка

Для обработки APPLY CHANGES FROM SNAPSHOT поведение по умолчанию заключается в том, чтобы вставить новую строку, если соответствующая запись с теми же ключами не существует в целевом объекте. Если соответствующая запись существует, она обновляется только в том случае, если все значения в строке изменились. Строки с ключами, которые присутствуют в целевом объекте, но больше не значатся в источнике, удаляются.

Для получения дополнительной информации о процессе CDC с использованием моментальных снимков см. раздел API Apply Changes: упрощение отслеживания изменений данных с помощью DLT. Примеры использования функции apply_changes_from_snapshot() смотрите в примере периодического приема моментальных снимков и примере приема исторических моментальных снимков.

Аргументы
target

Тип: str

Имя обновляемой таблицы. Вы можете использовать функцию create_streaming_table() для создания целевой таблицы перед запуском функции apply_changes().

Этот параметр является обязательным.
source

Тип: str или lambda function

Либо имя таблицы или представления, для создания периодических моментальных снимков, либо лямбда-функция на Python, которая возвращает таблицу данных моментального снимка для обработки и версию моментального снимка. См. чтобы реализовать аргумент source.

Этот параметр является обязательным.
keys

Тип: list

Столбец или сочетание столбцов, однозначно определяющих строку в исходных данных. Это позволяет определить, какие события CDC применяются к определенным записям в целевой таблице.

Можно указать любой из следующих вариантов:

— список строк: ["userId", "orderId"]
— список функций col() Spark SQL: [col("userId"), col("orderId"]

Аргументы функций col() не могут включать квалификаторы. Например, можно использовать col(userId), но использовать col(source.userId)нельзя.

Этот параметр является обязательным.
stored_as_scd_type

Тип: str или int

Следует ли хранить записи как SCD типа 1 или SCD типа 2.

Установите значение 1 для SCD типа 1 или 2 для SCD типа 2.

Это предложение является необязательным.

Значение по умолчанию — SCD тип 1.
track_history_column_list

track_history_except_column_list

Тип: list

Подмножество выходных столбцов для отслеживания истории в целевой таблице. Используйте track_history_column_list, чтобы указать полный список столбцов для отслеживания. Использование
track_history_except_column_list, чтобы указать столбцы, которые следует исключить из отслеживания. Можно объявить любое значение в виде списка строк или как функции col() Spark SQL:
- track_history_column_list = ["userId", "name", "city"].
- track_history_column_list = [col("userId"), col("name"), col("city")]
- track_history_except_column_list = ["operation", "sequenceNum"]
- track_history_except_column_list = [col("operation"), col("sequenceNum")

Аргументы функций col() не могут включать квалификаторы. Например, можно использовать col(userId), но использовать col(source.userId)нельзя.

Этот параметр является необязательным.

Значение по умолчанию — включать все столбцы в целевую таблицу, если нет track_history_column_list или
track_history_except_column_list аргумент передается функции.

Реализация аргумента source

Функция apply_changes_from_snapshot() включает аргумент source. Для обработки исторических моментальных снимков аргумент source, как ожидается, будет лямбда-функцией Python, которая возвращает два значения функции apply_changes_from_snapshot(): DataFrame Python, содержащий данные моментального снимка для обработки, и версию моментального снимка.

Ниже приведена подпись лямбда-функции:

lambda Any => Optional[(DataFrame, Any)]
  • Аргумент лямбда-функции является последней обработанной версией моментального снимка.
  • Возвращаемое значение лямбда-функции — None или кортеж из двух значений: первое значение кортежа — это DataFrame, содержащий снимок данных для обработки. Вторым значением кортежа является версия, представляющая логический порядок моментального снимка.

Пример, реализующий и вызывающий лямбда-функцию:

def next_snapshot_and_version(latest_snapshot_version):
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

apply_changes_from_snapshot(
  # ...
  source = next_snapshot_and_version,
  # ...
)

Среда выполнения DLT выполняет следующие действия при каждом запуске конвейера, содержащего функцию apply_changes_from_snapshot():

  1. Запускает функцию next_snapshot_and_version для загрузки следующего DataFrame моментального снимка и соответствующей версии этого снимка.
  2. Если DataFrame не возвращается, выполнение завершается, и обновление конвейера помечается как завершённое.
  3. Обнаруживает изменения в новом моментальном снимке и поэтапно применяет их к целевой таблице.
  4. Возвращается к первому шагу, чтобы загрузить следующий снапшот и его версию.

Ограничения

Интерфейс Python DLT имеет следующее ограничение:

Функция pivot() не поддерживается. Операция pivot в Spark требует активной загрузки входных данных для вычисления выходной схемы. Эта возможность не поддерживается в DLT.