Использование отказоустойчивой кластеризации для таблиц Delta
Разностная кластеризация Delta Lake заменяет секционирование таблиц и ZORDER
упрощает решения по макету данных и оптимизирует производительность запросов. При "жидкой" кластеризации можно гибко переопределять ключи кластеризации, не переписывая существующие данные. Это дает возможность развивать макет данных со временем в соответствии с задачами аналитики.
Внимание
Databricks рекомендует использовать Databricks Runtime 15.2 и выше для всех таблиц с включенным кластеризациям жидкости. Поддержка общедоступной предварительной версии с ограничениями доступна в Databricks Runtime 13.3 LTS и выше.
Примечание.
Таблицы с поддержкой отказоустойчивой кластеризации поддерживают параллелизм на уровне строк в Databricks Runtime 13.3 LTS и выше. Параллелизм на уровне строк обычно доступен в Databricks Runtime 14.2 и выше для всех таблиц с включенными векторами удаления. См . сведения о уровнях изоляции и конфликтах записи в Azure Databricks.
Для чего используется кластеризация жидкости?
Databricks рекомендует кластеризацию жидкости для всех новых таблиц Delta. Ниже приведены примеры сценариев, в которых удобно использовать кластеризацию:
- Таблицы, которые часто фильтруются по столбцам высокой кратности.
- Таблицы со значительным неравномерным распределением данных.
- Таблицы, размер которых быстро увеличивается и которые нуждаются в активной настройке и поддержке.
- Таблицы, в которых требуются параллельные записи.
- Таблицы с меняющимися шаблонами доступа.
- Таблицы, в которых использование типичного ключа секции может привести к слишком большому или слишком малому числу секций.
Включение кластеризации жидкости
Вы можете включить кластеризацию жидкости в существующей таблице или во время создания таблицы. Кластеризация несовместима с секционированием или ZORDER
требует использования Azure Databricks для управления всеми операциями макета и оптимизации данных в таблице. После включения кластеризации жидкости запустите OPTIMIZE
задания как обычно для добавочных данных кластера. Узнайте , как активировать кластеризацию.
Чтобы включить кластеризацию жидкости, добавьте фразу CLUSTER BY
в инструкцию создания таблицы, как показано в приведенных ниже примерах:
Примечание.
В Databricks Runtime 14.2 и более поздних версиях можно использовать API кадра данных и API DeltaTable в Python или Scala для включения кластеризации жидкости.
SQL
-- Create an empty table
CREATE TABLE table1(col0 int, col1 string) CLUSTER BY (col0);
-- Using a CTAS statement
CREATE EXTERNAL TABLE table2 CLUSTER BY (col0) -- specify clustering after table name, not in subquery
LOCATION 'table_location'
AS SELECT * FROM table1;
-- Using a LIKE statement to copy configurations
CREATE TABLE table3 LIKE table1;
Python
# Create an empty table
(DeltaTable.create()
.tableName("table1")
.addColumn("col0", dataType = "INT")
.addColumn("col1", dataType = "STRING")
.clusterBy("col0")
.execute())
# Using a CTAS statement
df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")
# CTAS using DataFrameWriterV2
df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()
Scala
// Create an empty table
DeltaTable.create()
.tableName("table1")
.addColumn("col0", dataType = "INT")
.addColumn("col1", dataType = "STRING")
.clusterBy("col0")
.execute()
// Using a CTAS statement
val df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")
// CTAS using DataFrameWriterV2
val df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()
В Databricks Runtime 16.0 и более поздних версиях можно создавать таблицы с поддержкой liquid clustering, используя записи структурированной потоковой передачи, как в следующих примерах:
Python
(spark.readStream.table("source_table")
.writeStream
.clusterBy("column_name")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
)
Scala
spark.readStream.table("source_table")
.writeStream
.clusterBy("column_name")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
Предупреждение
Таблицы, созданные с поддержкой отказоустойчивой кластеризации, включают множество функций разностной таблицы при создании и использовании delta writer версии 7 и средства чтения версии 3. Вы можете переопределить включение некоторых из этих функций. См. раздел "Переопределение функций по умолчанию" (необязательно).
Версии протокола таблицы не могут быть понижены, а таблицы с поддержкой кластеризации недоступны для чтения клиентами Delta Lake, которые не поддерживают все функции таблицы протоколов delta reader. См. статью Как Azure Databricks управляет совместимостью функций Delta Lake?.
Вы можете включить кластеризацию жидкости в существующей непартиментной таблице Delta с помощью следующего синтаксиса:
ALTER TABLE <table_name>
CLUSTER BY (<clustering_columns>)
Внимание
Поведение по умолчанию не применяет кластеризацию к ранее записанным данным. Чтобы принудительно перекластеризовать все записи, необходимо использовать OPTIMIZE FULL
. См. раздел Принудительная перекластеризация для всех записей.
Переопределение включения компонентов по умолчанию (необязательно)
Вы можете переопределить поведение по умолчанию, которое включает функции разностной таблицы во время включения кластеризации жидкости. Это предотвращает обновление протоколов чтения и записи, связанных с этими функциями таблицы. Чтобы выполнить следующие действия, необходимо создать существующую таблицу:
Используется
ALTER TABLE
для задания свойства таблицы, которое отключает одну или несколько функций. Например, чтобы отключить векторы удаления, выполните следующие действия:ALTER TABLE table_name SET TBLPROPERTIES ('delta.enableDeletionVectors' = false);
Включите кластеризацию жидкости в таблице, выполнив следующие действия:
ALTER TABLE <table_name> CLUSTER BY (<clustering_columns>)
В следующей таблице приведены сведения о функциях Delta, которые можно переопределить и как включение влияет на совместимость с версиями среды выполнения Databricks.
Функция Delta | Совместимость среды выполнения | Свойство для переопределения включения | Влияние отключения на кластеризацию жидкости |
---|---|---|---|
Векторы удаления | Для чтения и записи требуется Databricks Runtime 12.2 lTS и более поздних версий. | 'delta.enableDeletionVectors' = false |
Параллелизм на уровне строк отключен, что делает транзакции и операции кластеризации более вероятными для конфликта. См. статью "Конфликты записи с параллелизмом на уровне строк".DELETE , MERGE и UPDATE команды могут выполняться медленнее. |
Отслеживание строк | Записи требуют Databricks Runtime 13.3 LTS и более поздних версий. Можно считывать из любой версии Databricks Runtime. | 'delta.enableRowTracking' = false |
Параллелизм на уровне строк отключен, что делает транзакции и операции кластеризации более вероятными для конфликта. См. статью "Конфликты записи с параллелизмом на уровне строк". |
Контрольные точки версии 2 | Для чтения и записи требуется Databricks Runtime 13.3 LTS и более поздних версий. | 'delta.checkpointPolicy' = 'classic' |
Не влияет на поведение кластеризации жидкости. |
Выбор ключей кластеризации
Databricks рекомендует выбирать ключи кластеризации на основе часто используемых фильтров запросов. Ключи кластеризации можно определить в любом порядке. Если два столбца коррелируются, необходимо добавить только один из них в качестве ключа кластеризации.
Можно указать до 4 столбцов в качестве ключей кластеризации. Для ключей кластеризации можно указывать лишь столбцы, в которых производится сбор статистики. По умолчанию в таблице Delta сбор статистики производится для первых 32 столбцов. См. раздел "Указание столбцов статистики delta".
Кластеризация поддерживает следующие типы данных для кластеризации ключей.
- Дата
- Метка времени
- TimestampNTZ (требуется Databricks Runtime 14.3 LTS или более поздней версии)
- Строка
- Целое число
- Long
- Короткие
- Тип с плавающей запятой
- Двойной
- Десятичное число
- Байт
Если вы преобразуете существующую таблицу, рассмотрите следующие рекомендации:
Текущий метод оптимизации данных | Рекомендация по кластеризации ключей |
---|---|
Секционирование в стиле Hive | Используйте столбцы секций в качестве ключей кластеризации. |
Индексирование Z-порядка |
ZORDER BY Используйте столбцы в качестве ключей кластеризации. |
Секционирование в стиле Hive и порядок Z | Используйте как столбцы секционирования, так и ZORDER BY столбцы в качестве ключей кластеризации. |
Созданные столбцы для уменьшения кратности (например, дата для метки времени) | Используйте исходный столбец в качестве ключа кластеризации и не создавайте созданный столбец. |
Запись данных в кластеризованную таблицу
Необходимо использовать клиент записи Delta, поддерживающий все функции таблицы протоколов разностной записи, используемые при кластеризации жидкости. В Azure Databricks необходимо использовать Databricks Runtime 13.3 LTS и выше.
К операциям, которые кластер выполняет запись, относятся следующие:
-
INSERT INTO
операционный - Операторы
CTAS
иRTAS
-
COPY INTO
из формата Parquet spark.write.mode("append")
Структурированная потоковая передача записи никогда не активирует кластеризацию при записи. Применяются дополнительные ограничения. См . ограничения.
Кластеризация при записи активируется только в том случае, если данные в транзакции соответствуют пороговой величине. Эти пороговые значения зависят от количества столбцов кластеризации и ниже для управляемых таблиц каталога Unity, чем другие таблицы Delta.
Количество столбцов кластеризации | Пороговое значение для управляемых таблиц каталога Unity | Пороговый размер для других таблиц Delta |
---|---|---|
1 | 64 МБ | 256 МБ |
2 | 256 МБ | 1 ГБ |
3 | 512 МБ | 2 ГБ |
4 | 1 ГБ | 4 ГБ |
Так как не все операции применяют кластеризацию жидкости, Databricks рекомендует часто выполняться OPTIMIZE
, чтобы обеспечить эффективную кластеризацию всех данных.
Активация кластеризации
Прогнозная оптимизация автоматически выполняет OPTIMIZE
команды для включенных таблиц. См . статью прогнозной оптимизации для управляемых таблиц каталога Unity.
Чтобы активировать кластеризацию, необходимо использовать Databricks Runtime 13.3 LTS или более поздней версии.
OPTIMIZE
Используйте команду в таблице, как показано в следующем примере:
OPTIMIZE table_name;
Отказоустойчивая кластеризация является добавочной, что означает, что данные перезаписываются только при необходимости для размещения данных, которые должны быть кластеризованы. Файлы данных с ключами кластеризации, которые не соответствуют данным, которые должны быть кластеризованы, не перезаписываются.
Для повышения производительности Databricks рекомендует планировать регулярные OPTIMIZE
задания в кластерные данные. Для таблиц с большим количеством обновлений или вставок Databricks рекомендует планировать OPTIMIZE
задание каждые один или два часа. Так как кластеризация жидкости увеличивается, большинство OPTIMIZE
заданий для кластеризованных таблиц выполняются быстро.
Принудительная перекластеризация для всех записей
В Databricks Runtime 16.0 и более поздних версиях можно принудительно выполнить повторение всех записей в таблице со следующим синтаксисом:
OPTIMIZE table_name FULL;
Внимание
При необходимости выполнение OPTIMIZE FULL
перекластеризует все существующие данные. Для больших таблиц, которые ранее не были кластеризованы по указанным ключам, эта операция может занять несколько часов.
Запустите OPTIMIZE FULL
при первом включении кластеризации или изменении ключей кластеризации. Если вы ранее выполнили OPTIMIZE FULL
и не было изменений в ключах кластеризации, OPTIMIZE FULL
выполняется так же, как и OPTIMIZE
. Всегда используйте OPTIMIZE FULL
, чтобы убедиться, что макет данных отражает текущие ключи кластеризации.
Чтение данных из кластеризованной таблицы
Данные в кластеризованной таблице можно считывать с помощью любого клиента Delta Lake, поддерживающего чтение векторов удаления. Для получения наилучших результатов запроса включите ключи кластеризации в фильтры запросов, как показано в следующем примере:
SELECT * FROM table_name WHERE cluster_key_column_name = "some_value";
Изменение ключей кластеризации
Ключи кластеризации для таблицы можно изменить в любое время, выполнив ALTER TABLE
команду, как показано в следующем примере:
ALTER TABLE table_name CLUSTER BY (new_column1, new_column2);
При изменении ключей кластеризации последующие OPTIMIZE
операции и операции записи используют новый подход кластеризации, но существующие данные не перезаписываются.
Кроме того, можно отключить кластеризацию, задав ключи NONE
следующим образом:
ALTER TABLE table_name CLUSTER BY NONE;
Настройка ключей NONE
кластера не перезаписывает данные, которые уже кластеризованы, но не позволяет будущим OPTIMIZE
операциям использовать ключи кластеризации.
Узнайте, как кластеризована таблица
Команды можно использовать DESCRIBE
для просмотра ключей кластеризации для таблицы, как показано в следующих примерах:
DESCRIBE TABLE table_name;
DESCRIBE DETAIL table_name;
Совместимость таблиц с отказоустойчивой кластеризации
Таблицы, созданные с помощью отказоустойчивой кластеризации в Databricks Runtime 14.1 и выше, используют контрольные точки версии 2 по умолчанию. Таблицы можно считывать и записывать с помощью контрольных точек версии 2 в Databricks Runtime 13.3 LTS и выше.
Вы можете отключить контрольные точки версии 2 и протоколы таблиц более ранней версии для чтения таблиц с ликвидной кластеризации в Databricks Runtime 12.2 LTS и более поздних версий. См. сведения о функциях таблицы Drop Delta.
Ограничения
Применяются следующие ограничения:
- В Databricks Runtime 15.1 и ниже кластеризация при записи не поддерживает исходные запросы, включающие фильтры, соединения или агрегаты.
- Нагрузки структурированных потоков не поддерживают кластеризацию при записи.
- В Databricks Runtime 15.4 LTS и ниже нельзя создать таблицу с поддержкой кластеризации жидкости с помощью записи структурированной потоковой передачи. Структурированная потоковая передача можно использовать для записи данных в существующую таблицу с включенным кластеризированием жидкости.