Поделиться через


Использование отказоустойчивой кластеризации для таблиц Delta

Разностная кластеризация Delta Lake заменяет секционирование таблиц и ZORDER упрощает решения по макету данных и оптимизирует производительность запросов. При "жидкой" кластеризации можно гибко переопределять ключи кластеризации, не переписывая существующие данные. Это дает возможность развивать макет данных со временем в соответствии с задачами аналитики.

Внимание

Databricks рекомендует использовать Databricks Runtime 15.2 и выше для всех таблиц с включенным кластеризациям жидкости. Поддержка общедоступной предварительной версии с ограничениями доступна в Databricks Runtime 13.3 LTS и выше.

Примечание.

Таблицы с поддержкой отказоустойчивой кластеризации поддерживают параллелизм на уровне строк в Databricks Runtime 13.3 LTS и выше. Параллелизм на уровне строк обычно доступен в Databricks Runtime 14.2 и выше для всех таблиц с включенными векторами удаления. См . сведения о уровнях изоляции и конфликтах записи в Azure Databricks.

Для чего используется кластеризация жидкости?

Databricks рекомендует кластеризацию жидкости для всех новых таблиц Delta. Ниже приведены примеры сценариев, в которых удобно использовать кластеризацию:

  • Таблицы, которые часто фильтруются по столбцам высокой кратности.
  • Таблицы со значительным неравномерным распределением данных.
  • Таблицы, размер которых быстро увеличивается и которые нуждаются в активной настройке и поддержке.
  • Таблицы, в которых требуются параллельные записи.
  • Таблицы с меняющимися шаблонами доступа.
  • Таблицы, в которых использование типичного ключа секции может привести к слишком большому или слишком малому числу секций.

Включение кластеризации жидкости

Вы можете включить кластеризацию жидкости в существующей таблице или во время создания таблицы. Кластеризация несовместима с секционированием или ZORDERтребует использования Azure Databricks для управления всеми операциями макета и оптимизации данных в таблице. После включения кластеризации жидкости запустите OPTIMIZE задания как обычно для добавочных данных кластера. Узнайте , как активировать кластеризацию.

Чтобы включить кластеризацию жидкости, добавьте фразу CLUSTER BY в инструкцию создания таблицы, как показано в приведенных ниже примерах:

Примечание.

В Databricks Runtime 14.2 и более поздних версиях можно использовать API кадра данных и API DeltaTable в Python или Scala для включения кластеризации жидкости.

SQL

-- Create an empty table
CREATE TABLE table1(col0 int, col1 string) CLUSTER BY (col0);

-- Using a CTAS statement
CREATE EXTERNAL TABLE table2 CLUSTER BY (col0)  -- specify clustering after table name, not in subquery
LOCATION 'table_location'
AS SELECT * FROM table1;

-- Using a LIKE statement to copy configurations
CREATE TABLE table3 LIKE table1;

Python

# Create an empty table
(DeltaTable.create()
  .tableName("table1")
  .addColumn("col0", dataType = "INT")
  .addColumn("col1", dataType = "STRING")
  .clusterBy("col0")
  .execute())

# Using a CTAS statement
df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")

# CTAS using DataFrameWriterV2
df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()

Scala

// Create an empty table
DeltaTable.create()
  .tableName("table1")
  .addColumn("col0", dataType = "INT")
  .addColumn("col1", dataType = "STRING")
  .clusterBy("col0")
  .execute()

// Using a CTAS statement
val df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")

// CTAS using DataFrameWriterV2
val df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()

В Databricks Runtime 16.0 и более поздних версиях можно создавать таблицы с поддержкой liquid clustering, используя записи структурированной потоковой передачи, как в следующих примерах:

Python

(spark.readStream.table("source_table")
  .writeStream
  .clusterBy("column_name")
  .option("checkpointLocation", checkpointPath)
  .toTable("target_table")
)

Scala

spark.readStream.table("source_table")
  .writeStream
  .clusterBy("column_name")
  .option("checkpointLocation", checkpointPath)
  .toTable("target_table")

Предупреждение

Таблицы, созданные с поддержкой отказоустойчивой кластеризации, включают множество функций разностной таблицы при создании и использовании delta writer версии 7 и средства чтения версии 3. Вы можете переопределить включение некоторых из этих функций. См. раздел "Переопределение функций по умолчанию" (необязательно).

Версии протокола таблицы не могут быть понижены, а таблицы с поддержкой кластеризации недоступны для чтения клиентами Delta Lake, которые не поддерживают все функции таблицы протоколов delta reader. См. статью Как Azure Databricks управляет совместимостью функций Delta Lake?.

Вы можете включить кластеризацию жидкости в существующей непартиментной таблице Delta с помощью следующего синтаксиса:

ALTER TABLE <table_name>
CLUSTER BY (<clustering_columns>)

Внимание

Поведение по умолчанию не применяет кластеризацию к ранее записанным данным. Чтобы принудительно перекластеризовать все записи, необходимо использовать OPTIMIZE FULL. См. раздел Принудительная перекластеризация для всех записей.

Переопределение включения компонентов по умолчанию (необязательно)

Вы можете переопределить поведение по умолчанию, которое включает функции разностной таблицы во время включения кластеризации жидкости. Это предотвращает обновление протоколов чтения и записи, связанных с этими функциями таблицы. Чтобы выполнить следующие действия, необходимо создать существующую таблицу:

  1. Используется ALTER TABLE для задания свойства таблицы, которое отключает одну или несколько функций. Например, чтобы отключить векторы удаления, выполните следующие действия:

    ALTER TABLE table_name SET TBLPROPERTIES ('delta.enableDeletionVectors' = false);
    
  2. Включите кластеризацию жидкости в таблице, выполнив следующие действия:

    ALTER TABLE <table_name>
    CLUSTER BY (<clustering_columns>)
    

В следующей таблице приведены сведения о функциях Delta, которые можно переопределить и как включение влияет на совместимость с версиями среды выполнения Databricks.

Функция Delta Совместимость среды выполнения Свойство для переопределения включения Влияние отключения на кластеризацию жидкости
Векторы удаления Для чтения и записи требуется Databricks Runtime 12.2 lTS и более поздних версий. 'delta.enableDeletionVectors' = false Параллелизм на уровне строк отключен, что делает транзакции и операции кластеризации более вероятными для конфликта. См. статью "Конфликты записи с параллелизмом на уровне строк".

DELETE, MERGEи UPDATE команды могут выполняться медленнее.
Отслеживание строк Записи требуют Databricks Runtime 13.3 LTS и более поздних версий. Можно считывать из любой версии Databricks Runtime. 'delta.enableRowTracking' = false Параллелизм на уровне строк отключен, что делает транзакции и операции кластеризации более вероятными для конфликта. См. статью "Конфликты записи с параллелизмом на уровне строк".
Контрольные точки версии 2 Для чтения и записи требуется Databricks Runtime 13.3 LTS и более поздних версий. 'delta.checkpointPolicy' = 'classic' Не влияет на поведение кластеризации жидкости.

Выбор ключей кластеризации

Databricks рекомендует выбирать ключи кластеризации на основе часто используемых фильтров запросов. Ключи кластеризации можно определить в любом порядке. Если два столбца коррелируются, необходимо добавить только один из них в качестве ключа кластеризации.

Можно указать до 4 столбцов в качестве ключей кластеризации. Для ключей кластеризации можно указывать лишь столбцы, в которых производится сбор статистики. По умолчанию в таблице Delta сбор статистики производится для первых 32 столбцов. См. раздел "Указание столбцов статистики delta".

Кластеризация поддерживает следующие типы данных для кластеризации ключей.

  • Дата
  • Метка времени
  • TimestampNTZ (требуется Databricks Runtime 14.3 LTS или более поздней версии)
  • Строка
  • Целое число
  • Long
  • Короткие
  • Тип с плавающей запятой
  • Двойной
  • Десятичное число
  • Байт

Если вы преобразуете существующую таблицу, рассмотрите следующие рекомендации:

Текущий метод оптимизации данных Рекомендация по кластеризации ключей
Секционирование в стиле Hive Используйте столбцы секций в качестве ключей кластеризации.
Индексирование Z-порядка ZORDER BY Используйте столбцы в качестве ключей кластеризации.
Секционирование в стиле Hive и порядок Z Используйте как столбцы секционирования, так и ZORDER BY столбцы в качестве ключей кластеризации.
Созданные столбцы для уменьшения кратности (например, дата для метки времени) Используйте исходный столбец в качестве ключа кластеризации и не создавайте созданный столбец.

Запись данных в кластеризованную таблицу

Необходимо использовать клиент записи Delta, поддерживающий все функции таблицы протоколов разностной записи, используемые при кластеризации жидкости. В Azure Databricks необходимо использовать Databricks Runtime 13.3 LTS и выше.

К операциям, которые кластер выполняет запись, относятся следующие:

  • INSERT INTO операционный
  • Операторы CTAS и RTAS
  • COPY INTO из формата Parquet
  • spark.write.mode("append")

Структурированная потоковая передача записи никогда не активирует кластеризацию при записи. Применяются дополнительные ограничения. См . ограничения.

Кластеризация при записи активируется только в том случае, если данные в транзакции соответствуют пороговой величине. Эти пороговые значения зависят от количества столбцов кластеризации и ниже для управляемых таблиц каталога Unity, чем другие таблицы Delta.

Количество столбцов кластеризации Пороговое значение для управляемых таблиц каталога Unity Пороговый размер для других таблиц Delta
1 64 МБ 256 МБ
2 256 МБ 1 ГБ
3 512 МБ 2 ГБ
4 1 ГБ 4 ГБ

Так как не все операции применяют кластеризацию жидкости, Databricks рекомендует часто выполняться OPTIMIZE , чтобы обеспечить эффективную кластеризацию всех данных.

Активация кластеризации

Прогнозная оптимизация автоматически выполняет OPTIMIZE команды для включенных таблиц. См . статью прогнозной оптимизации для управляемых таблиц каталога Unity.

Чтобы активировать кластеризацию, необходимо использовать Databricks Runtime 13.3 LTS или более поздней версии. OPTIMIZE Используйте команду в таблице, как показано в следующем примере:

OPTIMIZE table_name;

Отказоустойчивая кластеризация является добавочной, что означает, что данные перезаписываются только при необходимости для размещения данных, которые должны быть кластеризованы. Файлы данных с ключами кластеризации, которые не соответствуют данным, которые должны быть кластеризованы, не перезаписываются.

Для повышения производительности Databricks рекомендует планировать регулярные OPTIMIZE задания в кластерные данные. Для таблиц с большим количеством обновлений или вставок Databricks рекомендует планировать OPTIMIZE задание каждые один или два часа. Так как кластеризация жидкости увеличивается, большинство OPTIMIZE заданий для кластеризованных таблиц выполняются быстро.

Принудительная перекластеризация для всех записей

В Databricks Runtime 16.0 и более поздних версиях можно принудительно выполнить повторение всех записей в таблице со следующим синтаксисом:

OPTIMIZE table_name FULL;

Внимание

При необходимости выполнение OPTIMIZE FULL перекластеризует все существующие данные. Для больших таблиц, которые ранее не были кластеризованы по указанным ключам, эта операция может занять несколько часов.

Запустите OPTIMIZE FULL при первом включении кластеризации или изменении ключей кластеризации. Если вы ранее выполнили OPTIMIZE FULL и не было изменений в ключах кластеризации, OPTIMIZE FULL выполняется так же, как и OPTIMIZE. Всегда используйте OPTIMIZE FULL, чтобы убедиться, что макет данных отражает текущие ключи кластеризации.

Чтение данных из кластеризованной таблицы

Данные в кластеризованной таблице можно считывать с помощью любого клиента Delta Lake, поддерживающего чтение векторов удаления. Для получения наилучших результатов запроса включите ключи кластеризации в фильтры запросов, как показано в следующем примере:

SELECT * FROM table_name WHERE cluster_key_column_name = "some_value";

Изменение ключей кластеризации

Ключи кластеризации для таблицы можно изменить в любое время, выполнив ALTER TABLE команду, как показано в следующем примере:

ALTER TABLE table_name CLUSTER BY (new_column1, new_column2);

При изменении ключей кластеризации последующие OPTIMIZE операции и операции записи используют новый подход кластеризации, но существующие данные не перезаписываются.

Кроме того, можно отключить кластеризацию, задав ключи NONEследующим образом:

ALTER TABLE table_name CLUSTER BY NONE;

Настройка ключей NONE кластера не перезаписывает данные, которые уже кластеризованы, но не позволяет будущим OPTIMIZE операциям использовать ключи кластеризации.

Узнайте, как кластеризована таблица

Команды можно использовать DESCRIBE для просмотра ключей кластеризации для таблицы, как показано в следующих примерах:

DESCRIBE TABLE table_name;

DESCRIBE DETAIL table_name;

Совместимость таблиц с отказоустойчивой кластеризации

Таблицы, созданные с помощью отказоустойчивой кластеризации в Databricks Runtime 14.1 и выше, используют контрольные точки версии 2 по умолчанию. Таблицы можно считывать и записывать с помощью контрольных точек версии 2 в Databricks Runtime 13.3 LTS и выше.

Вы можете отключить контрольные точки версии 2 и протоколы таблиц более ранней версии для чтения таблиц с ликвидной кластеризации в Databricks Runtime 12.2 LTS и более поздних версий. См. сведения о функциях таблицы Drop Delta.

Ограничения

Применяются следующие ограничения:

  • В Databricks Runtime 15.1 и ниже кластеризация при записи не поддерживает исходные запросы, включающие фильтры, соединения или агрегаты.
  • Нагрузки структурированных потоков не поддерживают кластеризацию при записи.
  • В Databricks Runtime 15.4 LTS и ниже нельзя создать таблицу с поддержкой кластеризации жидкости с помощью записи структурированной потоковой передачи. Структурированная потоковая передача можно использовать для записи данных в существующую таблицу с включенным кластеризированием жидкости.