Поделиться через


Уровни изоляции и конфликты записи в Azure Databricks

Уровень изоляции table определяет степень изоляции транзакции от изменений, внесенных параллельными операциями. Конфликты записи в Azure Databricks зависят от уровня изоляции.

Delta Lake предоставляет гарантии транзакций ACID для операций чтения и записи. Это означает следующее.

  • Несколько писателей в нескольких кластерах могут одновременно изменять tablepartition. Писатели видят моментальное представление согласованности table, и записи выполняются в последовательном порядке.
    • Читатели продолжают видеть согласованное представление данных table, с которыми началась задача Azure Databricks, даже если table изменяется в ходе выполнения задачи.

См. сведения о гарантиях ACID в Azure Databricks?.

Примечание.

Azure Databricks использует Delta Lake для всех tables по умолчанию. В этой статье описывается поведение Delta Lake в Azure Databricks.

Внимание

Изменения метаданных вызывают сбой всех одновременных операций записи. Эти операции включают изменения в протокол table, свойства table или данные schema.

Потоковое чтение завершается ошибкой, когда происходит фиксация, изменяющая метаданные table. Чтобы сохранить поток, необходимо перезапустить его. Рекомендуемые методы см . в разделе "Рекомендации по рабочей среде" для структурированной потоковой передачи.

Ниже приведены примеры запросов, которые изменяют метаданные:

-- Set a table property.
ALTER TABLE table-name SET TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')

-- Enable a feature using a table property and update the table protocol.
ALTER TABLE table_name SET TBLPROPERTIES ('delta.enableDeletionVectors' = true);

-- Drop a table feature.
ALTER TABLE table_name DROP FEATURE deletionVectors;

-- Upgrade to UniForm.
REORG TABLE table_name APPLY (UPGRADE UNIFORM(ICEBERG_COMPAT_VERSION=2));

-- Update the table schema.
ALTER TABLE table_name ADD COLUMNS (col_name STRING);

Конфликты записи с параллелизмом на уровне строк

Параллелизм на уровне строк уменьшает конфликты между параллельными операциями записи, обнаруживая изменения на уровне строк и автоматически разрешая конфликты, возникающие при одновременной записи update или удалении разных строк в одном файле данных.

Параллелизм на уровне строк обычно доступен в Databricks Runtime 14.2 и выше. Параллелизм на уровне строк по умолчанию поддерживается для следующих условий:

  • Tables с включенными векторами удаления и без секционирования.
  • Tables при жидкостной кластеризации, если вы не отключили векторы удаления.

Tables с секциями не поддерживают параллелизм на уровне строк, но по-прежнему могут избежать конфликтов между OPTIMIZE и всеми другими операциями записи при включении векторов удаления. См . ограничения параллелизма на уровне строк.

Сведения о других версиях среды выполнения Databricks см. в разделе "Поведение предварительной версии параллелизма на уровне строк" (устаревшая версия).

MERGE INTO для поддержки параллелизма на уровне строк требуется Photon в Databricks Runtime 14.2. В Databricks Runtime 14.3 LTS и более поздних версиях Фотон не требуется.

В следующем table описывается, какие пары операций записи могут конфликтовывать в каждом уровне изоляции с включенной параллелизмом на уровне строк.

Примечание.

Tables с идентификатором columns не поддерживают одновременные транзакции. См. использование идентификатора columns в Delta Lake.

INSERT (1) UPDATE, УДАЛИТЬ, MERGE INTO OPTIMIZE
INSERT Не может конфликтовать
UPDATE, DELETE, MERGE INTO Не удается конфликтуть в WriteSerializable. Может конфликтуть в Сериализуемом режиме при изменении одной строки. См . ограничения параллелизма на уровне строк. Может конфликтуться при изменении одной строки. См . ограничения параллелизма на уровне строк.
OPTIMIZE Не может конфликтовать Может конфликтуться при ZORDER BY использовании. Не удается конфликтуть в противном случае. Может конфликтуться при ZORDER BY использовании. Не удается конфликтуть в противном случае.

Внимание

(1) Все операции INSERT, описанные выше, в tables представляют собой операции добавления, которые не считывают данные из той же table перед фиксацией. INSERT операции, содержащие вложенные запросы, считывающие те же table, поддерживают ту же параллельность, что и MERGE.

REORG операции имеют семантику изоляции, идентичную OPTIMIZE перезаписи файлов данных для отражения изменений, записанных в векторах удаления. Когда вы используете REORG для обновления, table протоколы изменяются, и это конфликтует со всеми текущими операциями.

Запись конфликтов без параллелизма на уровне строк

В следующем table описывается, какие пары операций записи могут конфликтовывать в каждой уровне изоляции.

Tables не поддерживают параллелизм на уровне строк, если у них определены секции или не включены векторы удаления. Databricks Runtime 14.2 или более поздней версии требуется для параллелизма на уровне строк.

Примечание.

Tables с идентификатором columns не поддерживают одновременные транзакции. См. идентификатор columns в Delta Lake.

INSERT (1) UPDATE, УДАЛИТЬ, MERGE INTO OPTIMIZE
INSERT Не может конфликтовать
UPDATE, DELETE, MERGE INTO Не удается конфликтуть в WriteSerializable. Может конфликтуться в Сериализуемом режиме. См . разделы избежать конфликтов. Может конфликтовывать в сериализуемой и writeSerializable. См . разделы избежать конфликтов.
OPTIMIZE Не может конфликтовать Невозможно конфликтовать с tables при включенных векторах удаления, если не используется ZORDER BY. Может конфликтуть в противном случае. Невозможно конфликтовать с tables при включенных векторах удаления, если не используется ZORDER BY. Может конфликтуть в противном случае.

Внимание

(1) Все операции INSERT, описанные выше в tables, представляют собой операции добавления, которые не считывают данные из той же table перед подтверждением изменений. Операции INSERT, которые содержат вложенные запросы, читающие те же table, поддерживают ту же параллельность, что и MERGE.

REORG операции имеют семантику изоляции, идентичную OPTIMIZE перезаписи файлов данных для отражения изменений, записанных в векторах удаления. При использовании REORG для применения обновления table протоколы изменяются, что конфликтует со всеми текущими операциями.

Ограничения параллелизма на уровне строк

Некоторые ограничения применяются к параллелизму на уровне строк. Для следующих операций разрешение конфликтов следует обычному параллелизму для конфликтов записи в Azure Databricks. См. конфликты записи без параллелизма на уровне строк.

  • Команды со сложными условными предложениями, включая следующие:
    • Условия для сложных типов данных, таких как структуры, массивы или карты.
    • Условия, использующие недетерминированные выражения и вложенные запросы.
    • Условия, содержащие коррелированные вложенные запросы.
  • В Databricks Runtime 14.2 команды MERGE должны использовать явный предикат в целевом table для фильтрации строк, соответствующих исходному table. Для разрешения слиянием фильтр проверяет только строки, которые могут конфликтовать на основе условий фильтра в параллельных операциях.

Примечание.

Обнаружение конфликтов на уровне строк может увеличить общее время выполнения. В случае многих параллельных транзакций модуль записи определяет задержку по разрешению конфликтов и конфликтам.

Все ограничения для векторов удаления также применяются. См . ограничения.

Когда Delta Lake выполняет фиксацию без чтения table?

Операции Delta Lake INSERT или операции добавления не считывают состояние table перед фиксацией, если выполнены следующие условия:

  1. Логика выражается с помощью INSERT логики SQL или режима добавления.
  2. Логика не содержит вложенных запросов или условных условий, ссылающихся на table, предназначенных для операции записи.

Как и в других фиксациях, Delta Lake проверяет и разрешает версии table во время фиксации, используя метаданные в журнале транзакций, но версия table на самом деле не считывается.

Примечание.

Многие распространенные шаблоны используют операции MERGE для insert данных на основе условий table. Хотя эту логику можно переписать с помощью инструкций INSERT, если любое условное выражение ссылается на column в целевом table, эти инструкции имеют те же ограничения параллелизма, что и MERGE.

Запись сериализуемых и сериализуемых уровней изоляции

Уровень изоляции table определяет степень изоляции транзакции от изменений, внесенных параллельными транзакциями. Delta Lake на Azure Databricks поддерживает два уровня изоляции: Serializable и WriteSerializable.

  • Serializable: наиболее надежный уровень изоляции. Это гарантирует, что зафиксированные операции записи и все операции чтения будут иметь уровень Serializable. Операции разрешены до тех пор, пока существует последовательная последовательность их однократного выполнения, что создает тот же результат, что и в table. Для операций записи последовательность данных точно такая же, как и в истории table.

  • WriteSerializable (по умолчанию): более слабый уровень изоляции, чем Serializable. Он обеспечивает сериализуемость только операций записи (но не операций чтения). Однако он все-таки более надежен, чем изоляция уровня Snapshot. Уровень изоляции WriteSerializable используется по умолчанию, поскольку он обеспечивает оптимальный баланс согласованности и доступности данных для наиболее распространенных операций.

    В этом режиме содержимое Delta table может отличаться от ожидаемого на основании последовательности операций, представленной в журнале table. Это связано с тем, что этот режим позволяет некоторым парам параллельных операций записи (например, операции X и Y) продолжать работу таким образом, чтобы результат был таким, как если бы Y был выполнен до X (то есть сериализуемая между ними), даже если журнал покажет, что Y был зафиксирован после X. Чтобы запретить эту переупорядочение, set уровень изоляции table, чтобы быть сериализуемым, чтобы привести к сбою этих транзакций.

Операции чтения всегда используют изоляцию Snapshot. Уровень изоляции записи определяет, возможно ли, чтобы читатель увидел моментальный снимок table, который, согласно истории, "никогда не существовал".

Для уровня сериализуемости читатель всегда видит только те tables, которые соответствуют истории. На уровне WriteSerializable читатель может увидеть table, которого нет в журнале Delta.

Например, рассмотрим txn1, длительно выполняемая операция удаления и txn2, которое вставляет данные, удаленные операцией txn1. txn2 и txn1 завершены, и они записываются в журнал в таком порядке. В соответствии с историей данные, вставленные в txn2, не должны существовать в table. Что касается уровня Serializable, читатель никогда не будет видеть данные, вставленные операцией txn2. Однако, на уровне WriteSerializable читатель может в определенный момент просмотреть данные, вставленные операцией txn2.

Дополнительные сведения о том, какие типы операций могут конфликтовать друг с другом на каждом уровне изоляции и возможных ошибках, см. в разделе "Избегайте конфликтов" с помощью условий секционирования и разрознения команд.

Set уровень изоляции

Уровень изоляции set с помощью команды ALTER TABLE.

ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.isolationLevel' = <level-name>)

where <level-name> Serializable или WriteSerializable.

Например, чтобы изменить уровень изоляции со стандартного WriteSerializable на Serializable, выполните:

ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')

Избегайте конфликтов с помощью условий секционирования и разбиения команд

Во всех случаях, помеченных как "может конфликтовать", то, будут ли эти две операции конфликтовать, зависит от того, работают ли они с одинаковыми set файлов. Вы можете сделать два набора файлов несвязанными, секционируя table по тем же columns, что и те, которые используются в условиях операций. Например, две команды UPDATE table WHERE date > '2010-01-01' ... и DELETE table WHERE date < '2010-01-01' конфликтуют, если table не секционирован по дате, так как оба могут попытаться изменить одинаковые set файлов. Секционирование table с помощью date позволит избежать конфликта. Поэтому секционирование table в соответствии с условиями, часто используемыми в команде, может значительно сократить конфликты. Однако разбиение table по column с высокой кардинальностью может привести к другим проблемам производительности из-за большого количества подкаталогов.

Исключения конфликтов

При возникновении конфликта транзакции отобразится одно из следующих исключений:

ConcurrentAppendException

Это исключение возникает, когда параллельная операция добавляет файлы в ту же partition (или где-либо в несекционированных table), в то время как ваша операция их считывает. Добавление файлов может быть вызвано операциями INSERT, DELETE, UPDATE или MERGE.

С уровня изоляции по умолчаниюWriteSerializableфайлы, добавленные слепымиINSERT операциями (то есть операции, которые слепо добавляют данные без чтения данных) не конфликтуют с какой-либо операцией, даже если они касаются одной и той же partition (или где-либо в незапарационном table). Если уровень изоляции от set до Serializable, то слепые апдейты могут конфликтовать.

Это исключение часто возникает во время выполнения параллельных операций DELETE, UPDATE или MERGE. Хотя одновременные операции могут физически обновлять разные каталоги partition, одна из них может читать тот же partition, который другой одновременно обновляет, что приводит к конфликту. Этого можно избежать, явно указав разделение в условии операции. Рассмотрим следующий пример.

// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
    source.as("s"),
    "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

Предположим, приведенный выше код запущен параллельно для различных дат или стран. Так как каждое задание работает над независимым partition на целевой Дельта table, ожидается, что конфликты отсутствуют. Однако условие недостаточно явно и может сканировать все элементы table и конфликтовать с параллельными операциями при обновлении любых других разделов. Вместо этого можно изменить инструкцию, добавив в условие объединения определенную дату и страну, как показано в следующем примере.

// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
    source.as("s"),
    "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '" + <date> + "' AND t.country = '" + <country> + "'")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

Теперь эту операцию можно безопасно запускать в параллельном режиме в разные даты и в разных странах.

ConcurrentDeleteReadException

Это исключение возникает, когда параллельная операция удаляет файл, который читает ваша операция. Распространенные причины: операция DELETE, UPDATE или MERGE, которая повторно записывает файлы.

ConcurrentDeleteDeleteException

Это исключение возникает, когда параллельная операция удаляет файл, который также удаляет ваша операция. Это может быть вызвано двумя одновременно выполняемыми операциями сжатия, которые выполняют повторную запись одних и тех же файлов.

MetadataChangedException

Это исключение возникает, когда одновременная транзакция обновляет метаданные delta table. Распространенными причинами являются операции ALTER TABLE или записи в delta table, которые updateschematable.

ConcurrentTransactionException

Если потоковый запрос, использующий одно и то же расположение контрольной точки, запускается несколько раз параллельно и пытается одновременно записать в Delta table. Никогда не допускайте, чтобы два потоковых запроса одновременно использовали одно и то же расположение контрольной точки и выполнялись одновременно.

ProtocolChangedException

Это исключение может возникать в следующих случаях:

  • При обновлении вашей Delta table до новой версии протокола. Для выполнения будущих операций может потребоваться обновить среду выполнения Databricks.
  • При одновременном создании или замене table несколькими писателями.
  • Когда несколько операций записи одновременно выполняют запись по пути в свободном месте накопителя.

Дополнительные сведения см. в статье о том, как Azure Databricks управляет совместимостью функций Delta Lake?

Поведение предварительной версии параллелизма на уровне строк (устаревшая версия)

В этом разделе описано поведение предварительной версии для параллелизма на уровне строк в Databricks Runtime 14.1 и ниже. Параллелизм на уровне строк всегда требует векторов удаления.

В версиях Databricks Runtime 13.3 LTS и выше с включенной функцией кластеризации жидкостей tables параллелизм на уровне строк включается автоматически.

В Databricks Runtime 14.0 и 14.1 можно включить параллелизм на уровне строк для tables с векторами удаления, задав следующую конфигурацию для кластера или SparkSession:

spark.databricks.delta.rowLevelConcurrencyPreview = true

В Databricks Runtime 14.1 и ниже вычисления, отличные от Photon, поддерживают только параллелизм на уровне строк для DELETE операций.