Delta テーブルに Liquid Clustering クラスタリングを使用する
Delta Lake リキッド クラスタリングでは、テーブルのパーティション分割と ZORDER
が置き換えられ、データ レイアウトの決定が簡略化され、クエリのパフォーマンスが最適化されます。 リキッド クラスタリングを使用すると、既存のデータを書き換えることなくクラスタリング キーを柔軟に再定義できるため、時間の経過と共に分析ニーズに沿ってデータ レイアウトを発展させることができます。
重要
Databricks は、リキッド クラスタリングが有効になっているすべてのテーブルに対して Databricks Runtime 15.2 以上を使用することを推奨しています。 制限付きのパブリック プレビュー サポートは、Databricks Runtime 13.3 LTS 以上で利用できます。
Note
Databricks Runtime 13.3 LTS 以上では、リキッド クラスタリングが有効になっているテーブルでは、行レベルのコンカレンシーがサポートされます。 Databricks Runtime 14.2 以降では、削除ベクトルが有効になっているすべてのテーブルに対して、行レベルのコンカレンシーが一般提供されています。 Azure Databricks での分離レベルと書き込みの競合に関するページを参照してください。
リキッド クラスタリングの用途
Databricks では、すべての新しい Delta テーブルに対してリキッド クラスタリングをお勧めしています。 クラスタリングのメリットが得られるシナリオの例を次に示します。
- カーディナリティの高い列でフィルター処理されることが多いテーブル。
- データ分散に大きな偏りがあるテーブル。
- 急速に大きくなり、メンテナンスとチューニング作業が必要なテーブル。
- 同時書き込みの要件を持つテーブル。
- 時間の経過と共に変化するアクセス パターンを持つテーブル。
- 一般的なパーティション キーでは、パーティションが多すぎるか、少なすぎるテーブルが残ってしまう可能性があるテーブル。
リキッド クラスタリングを有効にする
既存のテーブルで、またはテーブルの作成時に、リキッド クラスタリングを有効にすることができます。 クラスタリングはパーティション分割または ZORDER
と互換性がありません。また、Azure Databricks を使用して、テーブル内のデータに対するすべてのレイアウト操作と最適化操作を管理する必要があります。 リキッド クラスタリングが有効になった後、OPTIMIZE
ジョブを通常どおり実行して、データを増分でクラスター化します。 「クラスタリングをトリガーする方法」を参照してください。
リキッド クラスタリングを有効にするには、次の例のように、テーブル作成ステートメントに CLUSTER BY
フレーズを追加します。
Note
Databricks Runtime 14.2 以上では、Python または Scala で DataFrame API と DeltaTable API を使用して、リキッド クラスタリングを有効にすることができます。
SQL
-- Create an empty table
CREATE TABLE table1(col0 int, col1 string) CLUSTER BY (col0);
-- Using a CTAS statement
CREATE EXTERNAL TABLE table2 CLUSTER BY (col0) -- specify clustering after table name, not in subquery
LOCATION 'table_location'
AS SELECT * FROM table1;
-- Using a LIKE statement to copy configurations
CREATE TABLE table3 LIKE table1;
Python
# Create an empty table
(DeltaTable.create()
.tableName("table1")
.addColumn("col0", dataType = "INT")
.addColumn("col1", dataType = "STRING")
.clusterBy("col0")
.execute())
# Using a CTAS statement
df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")
# CTAS using DataFrameWriterV2
df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()
Scala
// Create an empty table
DeltaTable.create()
.tableName("table1")
.addColumn("col0", dataType = "INT")
.addColumn("col1", dataType = "STRING")
.clusterBy("col0")
.execute()
// Using a CTAS statement
val df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")
// CTAS using DataFrameWriterV2
val df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()
警告
リキッド クラスタリングを有効にして作成されたテーブルでは、作成時に多数の Delta テーブル機能が有効になり、Delta ライター (バージョン 7) とリーダー (バージョン 3) が使用されます。 これらの機能の一部の有効化をオーバーライドできます。 「既定の機能の有効化をオーバーライドする (省略可能)」を参照してください。
テーブル プロトコルのバージョンをダウングレードすることはできません。また、有効になっている Delta リーダー プロトコル テーブルの機能がすべてサポートされていない Delta Lake クライアントでは、クラスタリングが有効になっているテーブルを読み取ることはできません。 「Azure Databricks で Delta Lake 機能の互換性を管理する方法は?」を参照してください。
次の構文を使用して、既存のパーティション分割されていない Delta テーブルでリキッド クラスタリングを有効にできます。
ALTER TABLE <table_name>
CLUSTER BY (<clustering_columns>)
既定の機能の有効化をオーバーライドする (省略可能)
リキッド クラスタリングの有効化中に Delta テーブル機能を有効にする既定の動作をオーバーライドできます。 これにより、これらのテーブル機能に関連付けられているリーダー プロトコルとライター プロトコルがアップグレードされなくなります。 次の手順を実行するには、既存のテーブルが必要です。
ALTER TABLE
を使用して、1 つ以上の機能を無効にするテーブル プロパティを設定します。 たとえば、削除ベクトルを無効にするには、次を実行します。ALTER TABLE table_name SET TBLPROPERTIES ('delta.enableDeletionVectors' = false);
次を実行して、テーブルでリキッド クラスタリングを有効にします。
ALTER TABLE <table_name> CLUSTER BY (<clustering_columns>)
次の表に、オーバーライドできる Delta 機能と、有効化が Databricks Runtime バージョンとの互換性に与える影響に関する情報を示します。
Delta 機能 | ランタイムの互換性 | 有効化をオーバーライドするプロパティ | リキッド クラスタリングに対する無効化の影響 |
---|---|---|---|
削除ベクトル | 読み取りと書き込みには、Databricks Runtime 12.2 LTS 以降が必要です。 | 'delta.enableDeletionVectors' = false |
行レベルのコンカレンシーが無効になっているため、トランザクションとクラスタリング操作が競合する可能性が高くなります。 「行レベルのコンカレンシーでの書き込みの競合」を参照してください。DELETE 、MERGE 、および UPDATE コマンドの実行速度が低下する場合があります。 |
行追跡 | 書き込みには、Databricks Runtime 13.3 LTS 以降が必要です。 任意の Databricks Runtime バージョンから読み取ることができます。 | 'delta.enableRowTracking' = false |
行レベルのコンカレンシーが無効になっているため、トランザクションとクラスタリング操作が競合する可能性が高くなります。 「行レベルのコンカレンシーでの書き込みの競合」を参照してください。 |
チェックポイント V2 | 読み取りと書き込みには、Databricks Runtime 13.3 LTS 以降が必要です。 | 'delta.checkpointPolicy' = 'classic' |
リキッド クラスタリングの動作には影響ありません。 |
クラスタリング キーを選択する
Databricks では、一般的に使用されるクエリ フィルターに基づいてクラスタリング キーを選択することをお勧めしています。 クラスタリング キーは任意の順序で定義できます。 2 つの列が関連付けられた場合、必要なのは、いずれかの列をクラスタリング キーとして追加することだけです。
最大で 4 列をクラスタリング キーとして指定できます。 クラスタリング キーには、収集された統計を含む列のみを指定できます。 既定では、Delta テーブルの最初の 32 列で統計が収集されます。 「Delta 統計の列を指定する」を参照してください。
クラスタリングでは、クラスタリング キーについて次のデータ型がサポートされています。
- 日
- Timestamp
- TimestampNTZ (Databricks Runtime 14.3 LTS 以降が必要です)
- String
- Integer
- Long
- Short
- Float
- Double
- 10 進法
- Byte
既存のテーブルを変換する場合は、次の推奨事項を考慮してください。
現在のデータ最適化手法 | クラスタリング キーの推奨事項 |
---|---|
Hive スタイルのパーティション分割 | パーティション列をクラスタリング キーとして使用します。 |
Z オーダーのインデックス作成 | ZORDER BY 列をクラスタリング キーとして使用します。 |
Hive スタイルのパーティション分割と Z オーダー | パーティション列と ZORDER BY 列の両方をクラスタリング キーとして使用します。 |
カーディナリティを減らすために生成された列 (タイムスタンプの日付など) | 元の列をクラスタリング キーとして使用し、生成された列を作成しないでください。 |
クラスター化されたテーブルにデータを書き込む
リキッド クラスタリングで使用されるすべての Delta 書き込みプロトコル テーブル機能がサポートされている Delta ライター クライアントを使用する必要があります。 Azure Databricks では、Databricks Runtime 13.3 LTS 以降を使用する必要があります。
書き込み時にクラスター化される操作は次のとおりです。
INSERT INTO
操作CTAS
とRTAS
のステートメント- Parquet 形式からの
COPY INTO
spark.write.mode("append")
構造化ストリーミング書き込みでは、書き込み時にクラスタリングがトリガーされることはありません。 その他にも適用される制限があります。 「制限事項」を参照してください。
書き込み時のクラスタリングがトリガーされるのは、トランザクション内のデータがサイズのしきい値を超えた場合だけです。 これらのしきい値はクラスタリング列の数によって異なり、Unity Catalog マネージド テーブルにおいてはその他の Delta テーブルにおいてよりも低くなります。
クラスタリング列の数 | Unity Catalog マネージド テーブルのしきい値サイズ | その他の Delta テーブルのしきい値サイズ |
---|---|---|
1 | 64 MB | 256 MB |
2 | 256 MB | 1 GB |
3 | 512 MB | 2 GB |
4 | 1 GB | 4 GB |
すべての操作にリキッド クラスタリングが適用されるわけではないため、Databricks では、すべてのデータが効率的にクラスター化されるように、OPTIMIZE
を頻繁に実行することをお勧めします。
クラスタリングをトリガーする方法
予測最適化では、有効なテーブルに対して OPTIMIZE
コマンドが自動的に実行されます。 「Unity Catalog 管理テーブルの予測最適化」を参照してください。
クラスタリングをトリガーするには、Databricks Runtime 13.3 LTS 以降を使用する必要があります。 次の例のように、テーブルで OPTIMIZE
コマンドを使用します。
OPTIMIZE table_name;
リキッド クラスタリングは増分です。つまり、クラスター化する必要があるデータに対応するために、必要に応じてデータのみが書き換えられます。 クラスター化するデータと一致しないクラスタリング キーを持つデータ ファイルは書き換えられません。
最適なパフォーマンスを得るために、Databricks では、通常の OPTIMIZE
ジョブをクラスター データにスケジュールすることをお勧めしています。 多数の更新または挿入が発生しているテーブルの場合、Databricks では、1 時間または 2 時間ごとに OPTIMIZE
ジョブをスケジュールすることをお勧めしています。 リキッド クラスタリングは増分であるため、クラスター化されたテーブルのほとんどの OPTIMIZE
ジョブは迅速に実行されます。
クラスター化されたテーブルからデータを読み取る
削除ベクトルの読み取りがサポートされている Delta Lake クライアントを使用して、クラスター化されたテーブル内のデータを読み取ることができます。 最適なクエリ結果を得るには、次の例のように、クエリ フィルターにクラスタリング キーを含めます。
SELECT * FROM table_name WHERE cluster_key_column_name = "some_value";
クラスタリング キーを変更する
テーブルのクラスタリング キーは、次の例のように、ALTER TABLE
コマンドを実行することでいつでも変更できます。
ALTER TABLE table_name CLUSTER BY (new_column1, new_column2);
クラスタリング キーを変更すると、後続の OPTIMIZE
操作と書き込み操作では新しいクラスタリング アプローチが使用されますが、既存のデータは書き換えられません。
次の例のように、キーを NONE
に設定してクラスタリングを無効にすることもできます。
ALTER TABLE table_name CLUSTER BY NONE;
クラスター キーを NONE
に設定しても、既にクラスター化されているデータは書き換えられませんが、今後の OPTIMIZE
操作ではクラスタリング キーを使用できなくなります。
テーブルがクラスター化される方法を確認する
次の例のように、DESCRIBE
コマンドを使用して、テーブルのクラスタリング キーを確認できます。
DESCRIBE TABLE table_name;
DESCRIBE DETAIL table_name;
リキッド クラスタリングありのテーブルの互換性
Databricks Runtime 14.1 以降でリキッド クラスタリングありで作成されたテーブルでは、既定で v2 チェックポイントが使用されます。 Databricks Runtime 13.3 LTS 以降では、v2 チェックポイントを使用してテーブルを読み書きできます。
Databricks Runtime 12.2 LTS 以降では、v2 チェックポイントを無効にし、テーブル プロトコルをダウングレードすることでリキッド クラスタリングありのテーブルを読み取ることができます。 「Delta テーブル機能の削除」を参照してください。
制限事項
次の制限があります。
- Databricks Runtime 15.1 以下では、書き込み時のクラスタリングは、フィルター、結合、集計を含むソース クエリをサポートしていません。
- 構造化ストリーミング ワークロードでは、書き込み時のクラスタリングはサポートされていません。
- 構造化ストリーミング書き込みを使用して、リキッド クラスタリングが有効になっているテーブルを作成することはできません。 構造化ストリーミングを使用して、リキッド クラスタリングが有効になっている既存のテーブルにデータを書き込むことはできます。