Поделиться через


API APPLY CHANGES: упрощение захвата данных об изменениях с помощью Delta Live Tables

Delta Live Tables упрощает сбор данных об изменениях (CDC) с помощью API APPLY CHANGES и APPLY CHANGES FROM SNAPSHOT. Используемый интерфейс зависит от источника измененных данных:

  • Используется APPLY CHANGES для обработки изменений из веб-канала измененных данных (CDF).
  • Используйте APPLY CHANGES FROM SNAPSHOT (общедоступную предварительную версию) для обработки изменений моментальных снимков базы данных.

Ранее инструкция MERGE INTO часто использовалась для обработки записей CDC в Azure Databricks. MERGE INTO Однако может привести к неправильным результатам из-за неупорядоченных записей или требуется сложная логика для повторного упорядочивания записей.

API APPLY CHANGES поддерживается в интерфейсах SQL и Python для Delta Live Tables. API APPLY CHANGES FROM SNAPSHOT поддерживается в интерфейсе Python Delta Live Tables.

Обе APPLY CHANGES и APPLY CHANGES FROM SNAPSHOT поддерживают обновление таблиц с помощью SCD типа 1 и типа 2:

  • Используйте SCD типа 1 для непосредственного обновления записей. Журнал не сохраняется для обновленных записей.
  • Используйте SCD типа 2 для хранения истории записей при всех обновлениях или обновлениях определённого набора столбцов.

Сведения о синтаксисе и других ссылках см. в следующей статье:

Примечание.

В этой статье описывается, как обновлять таблицы в конвейере Delta Live Tables на основе изменений в исходных данных. Информацию о том, как записывать и запрашивать сведения об изменениях на уровне строк для таблиц Delta, см. в разделе Использование канала данных изменений Delta Lake в Azure Databricks.

Требования

Чтобы использовать API CDC, конвейер должен быть настроен для использования бессерверных конвейеров DLT или разностных динамических таблиц Pro или Advancedвыпусков.

Как CDC реализуется с помощью APPLY CHANGES API?

Автоматически обрабатывая записи вне последовательности, API APPLY CHANGES в Delta Live Tables обеспечивает правильную обработку записей CDC и удаляет необходимость разработки сложной логики для обработки записей вне последовательности. Необходимо указать столбец в исходных данных, чтобы выполнить последовательность записей, которые Delta Live Tables интерпретируют как монотонно возрастающее представление правильного порядка данных. Delta Live Tables автоматически обрабатывает данные, поступающие не по порядку. Для изменений типа SCD 2, Delta Live Tables распространяют соответствующие значения последовательности в столбцы __START_AT и __END_AT целевой таблицы. При каждом значении последовательности должно быть одно отдельное обновление, а значения последовательности NULL не поддерживаются.

Чтобы выполнить обработку CDC, используя APPLY CHANGES, сначала создайте потоковую таблицу, а затем используйте инструкцию APPLY CHANGES INTO в SQL или функцию apply_changes() в Python, чтобы указать источник, ключи и последовательности для канала изменений. Чтобы создать целевую таблицу потоковой передачи, используйте инструкцию CREATE OR REFRESH STREAMING TABLE в SQL или функцию create_streaming_table() в Python. См. примеры обработки SCD типа 1 и типа 2.

Дополнительные сведения о синтаксисе см. в справочнике по SQL Delta Live Tables или в справочнике по Python .

Как CDC реализуется с помощью APPLY CHANGES FROM SNAPSHOT API?

Внимание

APPLY CHANGES FROM SNAPSHOT API находится в общедоступной предварительной версии.

APPLY CHANGES FROM SNAPSHOT — это декларативный API, который эффективно определяет изменения исходных данных путем сравнения ряда моментальных снимков в порядке, а затем выполняет обработку, необходимую для обработки записей CDC в моментальных снимках. APPLY CHANGES FROM SNAPSHOT поддерживается только интерфейсом Python Delta Live Tables.

APPLY CHANGES FROM SNAPSHOT поддерживает прием моментальных снимков из нескольких типов источников:

  • Используйте периодическую загрузку моментальных снимков для приёма моментальных снимков из существующей таблицы или представления. APPLY CHANGES FROM SNAPSHOT имеет простой, упрощенный интерфейс для поддержки периодически приема моментальных снимков из существующего объекта базы данных. Новый моментальный снимок создается при каждом обновлении конвейера, и время этого создания используется в качестве версии моментального снимка. При запуске конвейера в непрерывном режиме несколько моментальных снимков вводятся при каждом обновлении конвейера с периодичностью, определяемой настройкой интервала триггера для потока, выполняющего обработку APPLY CHANGES FROM SNAPSHOT.
  • Используйте прием исторических моментальных снимков для обработки файлов, содержащих моментальные снимки базы данных, например моментальные снимки, созданные из базы данных Oracle или MySQL или хранилища данных.

Чтобы выполнить обработку CDC из любого исходного типа с APPLY CHANGES FROM SNAPSHOT, сначала создайте потоковую таблицу, а затем используйте функцию apply_changes_from_snapshot() в Python, чтобы указать моментальный снимок, ключи и другие аргументы, необходимые для реализации обработки. Ознакомьтесь с примерами приема периодических моментальных снимков и историческими примерами приема моментальных снимков.

Моментальные снимки, передаваемые API, должны находиться в порядке возрастания по версии. Если Delta Live Tables обнаруживает моментальный снимок вне порядка, возникает ошибка.

Для получения сведений о синтаксисе см. справочник по Delta Live Tables Python.

Ограничения

Столбец, используемый для последовательности, должен быть сортируемым типом данных.

Пример: обработка SCD типа 1 и SCD типа 2 с исходными данными CDF

В следующих разделах приведены примеры запросов SCD типа 1 и типа 2 Delta Live Tables, которые обновляют целевые таблицы на основе исходных событий из потока изменений данных:

  1. Создает новые записи пользователей.
  2. Удаляет запись пользователя.
  3. Обновляет записи пользователей. В примере обработки данных типа SCD 1, последние операции с идентификатором UPDATE поступают с задержкой и удаляются из целевой таблицы, что демонстрирует обработку неупорядоченных событий.

В следующих примерах предполагается знакомство с настройкой и обновлением конвейеров Delta Live Tables. См. руководство по . Запуск первого конвейера Delta Live Tables.

Для выполнения этих примеров необходимо начать с создания примера набора данных. См. создание тестовых данных.

Ниже приведены входные записи для этих примеров.

userId name city Операция sequenceNum
124 Raul Оахака INSERT 1
123 Isabel Monterrey INSERT 1
125 Mercedes Тихуана INSERT 2
126 Lily Cancun INSERT 2
123 null null DELETE 6
125 Mercedes Guadalajara UPDATE 6
125 Mercedes Mexicali UPDATE 5
123 Isabel Чиуауа UPDATE 5

Если вы раскомментируете окончательную строку в примерах данных, вставьте следующую запись, указывающую, где должны быть усечены записи:

userId name city Операция sequenceNum
null null null TRUNCATE 3

Примечание.

Все приведенные ниже примеры включают параметры для указания обоих DELETE и TRUNCATE операций, но каждый из них является необязательным.

Обработка обновлений SCD типа 1

В следующем примере демонстрируется обработка обновлений SCD типа 1:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

После запуска примера SCD типа 1 целевая таблица содержит следующие записи:

userId name city
124 Raul Оахака
125 Mercedes Guadalajara
126 Lily Cancun

После выполнения примера SCD типа 1 с дополнительной записью TRUNCATE записи 124 и 126 усечены из-за операции TRUNCATE в sequenceNum=3, и целевая таблица содержит следующую запись:

userId name city
125 Mercedes Guadalajara

Обработка обновлений SCD типа 2

В следующем примере демонстрируется обработка обновлений SCD типа 2:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

После запуска примера SCD типа 2 целевая таблица содержит следующие записи:

userId name city __START_AT __END_AT
123 Isabel Monterrey 1 5
123 Isabel Чиуауа 5 6
124 Raul Оахака 1 null
125 Mercedes Тихуана 2 5
125 Mercedes Mexicali 5 6
125 Mercedes Guadalajara 6 null
126 Lily Cancun 2 null

Запрос типа 2 SCD также может указать подмножество выходных столбцов для отслеживания истории в целевой таблице. Изменения других столбцов обновляются непосредственно, вместо создания новых исторических записей. В следующем примере показано, как исключить столбец city из отслеживания:

В следующем примере показано использование журнала отслеживания с типом 2 SCD:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

После выполнения этого примера без дополнительной записи TRUNCATE целевая таблица содержит следующие записи:

userId name city __START_AT __END_AT
123 Isabel Чиуауа 1 6
124 Raul Оахака 1 null
125 Mercedes Guadalajara 2 null
126 Lily Cancun 2 null

Создание тестовых данных

Приведенный ниже код содержится для создания примера набора данных для использования в примерах запросов, представленных в этом руководстве. Если у вас есть необходимые учетные данные для создания новой схемы и создания новой таблицы, эти инструкции можно выполнить с помощью записной книжки или Databricks SQL. Следующий код не предназначен для запуска в рамках конвейера Delta Live Tables:

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

Пример. Периодическое обработка моментальных снимков

В следующем примере показан процесс SCD тип 2, который вводит в систему моментальные снимки таблицы, хранящейся в mycatalog.myschema.mytable. Результаты обработки записываются в таблицу с именем target.

mycatalog.myschema.mytable записи в метке времени 2024-01-01 00:00:00

Ключ Значение
1 А1
2 А2

mycatalog.myschema.mytable записи в метке времени 2024-01-01 12:00:00

Ключ Значение
2 Б2
3 a3
import dlt

@dlt.view(name="source")
def source():
 return spark.read.table("mycatalog.myschema.mytable")

dlt.create_streaming_table("target")

dlt.apply_changes_from_snapshot(
 target="target",
 source="source",
 keys=["key"],
 stored_as_scd_type=2
)

После обработки моментальных снимков целевая таблица содержит следующие записи:

Ключ Значение __START_AT __END_AT
1 А1 2024-01-01 00:00:00 2024-01-01 12:00:00
2 А2 2024-01-01 00:00:00 2024-01-01 12:00:00
2 Б2 2024-01-01 12:00:00 null
3 a3 2024-01-01 12:00:00 null

Пример: обработка исторических моментальных снимков

В следующем примере показана обработка SCD типа 2, которая обновляет целевую таблицу на основе исходных событий из двух моментальных снимков, хранящихся в облачной системе хранения:

Моментальный снимок в timestamp, хранящийся в /<PATH>/filename1.csv

Ключ TrackingColumn NonTrackingColumn
1 А1 b1
2 А2 Б2
4 a4 b4

Моментальный снимок в timestamp + 5, хранящийся в /<PATH>/filename2.csv

Ключ TrackingColumn NonTrackingColumn
2 a2_new Б2
3 a3 b3
4 a4 b4_new

В следующем примере кода демонстрируется обработка обновлений SCD типа 2 с этими моментальными снимками:

import dlt

def exist(file_name):
  # Storage system-dependent function that returns true if file_name exists, false otherwise

# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
  latest_snapshot_version = latest_snapshot_version or 0
  next_version = latest_snapshot_version + 1
  file_name = "dir_path/filename_" + next_version + ".csv"
  if (exist(file_name)):
    return (spark.read.load(file_name), next_version)
   else:
     # No snapshot available
     return None

dlt.create_streaming_live_table("target")

dlt.apply_changes_from_snapshot(
  target = "target",
  source = next_snapshot_and_version,
  keys = ["Key"],
  stored_as_scd_type = 2,
  track_history_column_list = ["TrackingCol"]
)

После обработки моментальных снимков целевая таблица содержит следующие записи:

Ключ TrackingColumn NonTrackingColumn __START_AT __END_AT
1 А1 b1 1 2
2 А2 Б2 1 2
2 a2_new Б2 2 null
3 a3 b3 2 null
4 a4 b4_new 1 null

добавить, изменить или удалить данные в целевой потоковой таблице

Если конвейер публикует таблицы в каталоге Unity, можно использовать инструкции языка обработки данных языка обработки данных (DML), включая инструкции вставки, обновления, удаления и слияния, для изменения целевых таблиц потоковой передачи, созданных APPLY CHANGES INTO операторами.

Примечание.

  • Инструкции DML, изменяющие схему таблицы потоковой передачи, не поддерживаются. Убедитесь, что операторы DML не пытаются изменять схему таблицы.
  • Инструкции DML, обновляющие потоковую таблицу, могут выполняться только в общем кластере каталога Unity или хранилище SQL с помощью Databricks Runtime 13.3 LTS и более поздних версий.
  • Поскольку для потоковой передачи требуются источники данных с возможностью только добавления, если ваша обработка требует потоковой передачи из исходной потоковой таблицы с изменениями (например, с использованием операторов DML), задайте флаг skipChangeCommits при чтении потоковой таблицы-источника. При установке skipChangeCommits транзакции, которые удаляют или изменяют записи в исходной таблице, игнорируются. Если для обработки не требуется потоковая таблица, можно использовать материализованное представление (которое не имеет ограничения только для добавления) в качестве целевой таблицы.

Поскольку Delta Live Tables используют указанный столбец SEQUENCE BY и распространяют соответствующие значения последовательности в столбцы __START_AT и __END_AT целевой таблицы (для SCD типа 2), необходимо убедиться, что инструкции DML используют допустимые значения для этих столбцов, чтобы поддерживать правильный порядок записей. Узнайте, как CDC реализован с помощью API APPLY CHANGES?.

Дополнительные сведения об использовании инструкций DML для потоковых таблиц см. в разделе Добавление, изменение или удаление данных в потоковой таблице.

В следующем примере вставляется активная запись с начальной последовательностью 5:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

Читать поток данных об изменениях из целевой таблицы APPLY CHANGES

В Databricks Runtime 15.2 и более поздних версиях можно считывать канал изменений данных из потоковой таблицы, используемой в запросах APPLY CHANGES или APPLY CHANGES FROM SNAPSHOT, подобно тому, как это делается с другими таблицами Delta. Для чтения потока данных изменений из целевой потоковой таблицы требуется следующее:

  • Целевая таблица потоковой передачи должна быть опубликована в каталоге Unity. См. использование каталога Unity с конвейерами Delta Live Tables.
  • Чтобы считать поток данных изменений из целевой потоковой таблицы, необходимо использовать Databricks Runtime 15.2 или более поздней версии. Чтобы прочитать канал данных изменений в другом конвейере Delta Live Tables, конвейер должен быть настроен на использование Databricks Runtime 15.2 или более поздней версии.

Вы считываете канал данных об изменении из целевой потоковой таблицы, созданной в конвейере Delta Live Tables, так же, как считывание канала данных об изменении из других таблиц Delta. Дополнительные сведения об использовании функции веб-канала изменений Delta, включая примеры в Python и SQL, см. в статье Использование веб-канала изменений Delta Lake в Azure Databricks.

Примечание.

Запись канала изменений включает метаданные , определяющие тип события изменения. При обновлении записи в таблице метаданные связанных записей изменений обычно включают значения _change_type, заданные для update_preimage и update_postimage событий.

Однако значения _change_type отличаются, если обновления вносятся в целевую потоковую таблицу, включающую изменение значений первичного ключа. Если изменения включают обновления первичных ключей, поля метаданных _change_type задаются для insert и delete событий. Изменения первичных ключей могут происходить при внесении обновлений вручную в одно из ключевых полей с использованием оператора UPDATE или MERGE, или, для таблиц SCD типа 2, когда поле __start_at изменяется, чтобы отразить более раннее начальное значение последовательности.

Запрос APPLY CHANGES определяет значения первичного ключа, которые отличаются для обработки SCD типа 1 и SCD типа 2:

  • Для обработки SCD типа 1 и интерфейса Python Delta Live Tables первичный ключ является значением параметра keys в функции apply_changes(). Для интерфейса SQL Delta Live Tables первичный ключ — это столбцы, определенные предложением KEYS в инструкции APPLY CHANGES INTO.
  • Для SCD типа 2 первичный ключ — это параметр keys или предложение KEYS плюс возвращаемое значение из операции coalesce(__START_AT, __END_AT), где __START_AT и __END_AT являются соответствующими столбцами из целевой таблицы потоковой передачи.

Получение данных о записях, обработанных запросом CDC Delta Live Tables

Примечание.

Следующие метрики фиксируются только APPLY CHANGES запросами, а не APPLY CHANGES FROM SNAPSHOT запросами.

Следующие метрики фиксируются запросами APPLY CHANGES :

  • num_upserted_rows: количество выходных строк, которые добавляются в набор данных во время обновления.
  • num_deleted_rows: количество существующих выходных строк, удаленных из набора данных во время обновления.

Метрика, выходные num_output_rows данные для потоков, отличных от CDC, не фиксируются для apply changes запросов.

Какие объекты данных используются для обработки CDC разностных динамических таблиц?

Примечание.

  • Эти структуры данных применяются только к обработке APPLY CHANGES, а не к обработке APPLY CHANGES FROM SNAPSHOT.
  • Эти структуры данных применяются только в том случае, если целевая таблица публикуется в хранилище метаданных Hive. Если конвейер публикуется в каталоге Unity, внутренние резервные таблицы недоступны для пользователей.

При объявлении целевой таблицы в хранилище метаданных Hive создаются две структуры данных:

  • Представление, использующее имя, назначенное целевой таблице.
  • Внутренняя резервная таблица, используемая Delta Live Table для управления обработкой CDC. Эта таблица называется добавлением __apply_changes_storage_ в начало имени целевой таблицы.

Например, если объявить целевую таблицу с именем dlt_cdc_target, вы увидите представление с именем dlt_cdc_target и таблицу с именем __apply_changes_storage_dlt_cdc_target в хранилище метаданных. Создание представления позволяет разностным динамическим таблицам отфильтровывать дополнительные сведения (например, памятные метки и версии), необходимые для обработки данных вне порядка. Чтобы просмотреть обработанные данные, выполните запрос к целевому представлению. Так как схема таблицы __apply_changes_storage_ может измениться для поддержки будущих функций или улучшений, не следует запрашивать таблицу для использования в рабочей среде. При добавлении данных вручную в таблицу предполагается, что записи будут поступать до других изменений, так как столбцы версии отсутствуют.