Delta Lake テーブル履歴の処理
Delta テーブルを変更する操作を行うたびに、テーブルの新しいバージョンが作成されます。 履歴情報を使用し、タイム トラベルを使用することで、特定の時点での操作を監査したり、テーブルをロールバックしたり、テーブルに対してクエリを実行したりできます。
注意
Databricks では、データ アーカイブの長期的なバックアップ ソリューションとして Delta Lake テーブル履歴を使用することはお勧めしていません。 データとログの保持の両方の構成を大きな値に設定していない限り、Databricks では、タイム トラベル操作には過去 7 日間のみを使用することをお勧めしています。
Delta テーブルの履歴を取得する
history
コマンドを実行すると、Delta テーブルへの書き込みごとに操作、ユーザー、タイムスタンプなどに関する情報を取得できます。 操作は、逆の時系列順で返されます。
テーブル履歴の保持期間は、テーブル設定 delta.logRetentionDuration
によって決まり、既定では 30 日間です。
注意
タイム トラベルとテーブル履歴は、さまざまな保持しきい値によって制御されます。 「Delta Lake のタイム トラベルとは」を参照してください。
DESCRIBE HISTORY table_name -- get the full history of the table
DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only
Spark SQL 構文の詳細については、「DESCRIBE HISTORY」を参照してください。
Scala、Java、Python 構文の詳細については、Delta Lake API ドキュメントを参照してください。
Catalog Explorer には、Delta テーブルのこの詳細なテーブル情報と履歴が表示されます。 テーブル スキーマとサンプル データに加えて、[履歴] タブをクリックして DESCRIBE HISTORY
で表示されるテーブル履歴を確認することもできます。
履歴スキーマ
history
操作の出力には、次の列があります。
列 | タイプ | 説明 |
---|---|---|
version | long | 操作によって生成されたテーブルのバージョン。 |
timestamp | timestamp | このバージョンがコミットされた時点。 |
userId | string | 操作を実行したユーザーの ID。 |
userName | string | 操作を実行したユーザーの名前。 |
operation | string | 操作の名前。 |
operationParameters | map | 操作のパラメーター (述語など)。 |
ジョブ (job) | struct | 操作を実行したジョブの詳細。 |
ノートブック | struct | 操作が実行されたノートブックの詳細。 |
clusterId | string | 操作が実行されたクラスターの ID。 |
readVersion | long | 書き込み操作を実行するために読み取ったテーブルのバージョン。 |
isolationLevel | string | この操作に使用された分離レベル。 |
isBlindAppend | Boolean | この操作によってデータが追加されたかどうか。 |
operationMetrics | map | 操作のメトリック (変更された行数やファイル数など)。 |
userMetadata | string | ユーザー定義のコミット メタデータ (指定されている場合) |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
Note
- 次の方法を使用して Delta テーブルに書き込む場合、他の列のいくつかは使用できません。
- 将来追加される列は、常に最後の列の後に追加されます。
操作メトリックのキー
history
操作は、operationMetrics
列マップ内の操作メトリックのコレクションを返します。
次の表は、マップのキー定義を操作別に示しています。
操作 | メトリックの名前 | 説明 |
---|---|---|
WRITE、CREATE TABLE AS SELECT、REPLACE TABLE AS SELECT、COPY INTO | ||
numFiles | 書き込まれたファイルの数。 | |
numOutputBytes | 書き込まれたコンテンツのサイズ (バイト単位)。 | |
numOutputRows | 書き込まれた行の数。 | |
STREAMING UPDATE | ||
numAddedFiles | 追加されたファイルの数。 | |
numRemovedFiles | 削除されたファイルの数。 | |
numOutputRows | 書き込まれた行の数。 | |
numOutputBytes | 書き込みのサイズ (バイト単位)。 | |
DELETE | ||
numAddedFiles | 追加されたファイルの数。 テーブルのパーティションが削除された場合には提供されません。 | |
numRemovedFiles | 削除されたファイルの数。 | |
numDeletedRows | 削除された行の数。 テーブルのパーティションが削除された場合には提供されません。 | |
numCopiedRows | ファイルの削除処理中にコピーされた行の数。 | |
executionTimeMs | 操作全体の実行にかかった時間。 | |
scanTimeMs | ファイルの一致をスキャンするのに要した時間。 | |
rewriteTimeMs | 一致したファイルの書き換えに要した時間。 | |
TRUNCATE | ||
numRemovedFiles | 削除されたファイルの数。 | |
executionTimeMs | 操作全体の実行にかかった時間。 | |
MERGE | ||
numSourceRows | ソース データフレーム内の行の数。 | |
numTargetRowsInserted | ターゲット テーブルに挿入された行の数。 | |
numTargetRowsUpdated | ターゲット テーブル内で更新された行の数。 | |
numTargetRowsDeleted | ターゲット テーブル内で削除された行の数。 | |
numTargetRowsCopied | コピーされたターゲット行の数。 | |
numOutputRows | 書き出された行数の合計。 | |
numTargetFilesAdded | シンク (ターゲット) に追加されたファイルの数。 | |
numTargetFilesRemoved | シンク (ターゲット) から削除されたファイルの数。 | |
executionTimeMs | 操作全体の実行にかかった時間。 | |
scanTimeMs | ファイルの一致をスキャンするのに要した時間。 | |
rewriteTimeMs | 一致したファイルの書き換えに要した時間。 | |
UPDATE | ||
numAddedFiles | 追加されたファイルの数。 | |
numRemovedFiles | 削除されたファイルの数。 | |
numUpdatedRows | 更新された行の数。 | |
numCopiedRows | ファイルの更新処理中にコピーされた行の数。 | |
executionTimeMs | 操作全体の実行にかかった時間。 | |
scanTimeMs | ファイルの一致をスキャンするのに要した時間。 | |
rewriteTimeMs | 一致したファイルの書き換えに要した時間。 | |
FSCK | numRemovedFiles | 削除されたファイルの数。 |
CONVERT | numConvertedFiles | 変換された Parquet ファイルの数。 |
OPTIMIZE | ||
numAddedFiles | 追加されたファイルの数。 | |
numRemovedFiles | 最適化されたファイルの数。 | |
numAddedBytes | テーブルが最適化された後に追加されたバイト数。 | |
numRemovedBytes | 削除されたバイト数。 | |
minFileSize | テーブルが最適化された後の最小ファイル サイズ。 | |
p25FileSize | テーブルが最適化された後の 25 パーセンタイル ファイルのサイズ。 | |
p50FileSize | テーブルが最適化された後のファイル サイズの中央値。 | |
p75FileSize | テーブルが最適化された後の 75 パーセンタイル ファイルのサイズ。 | |
maxFileSize | テーブルが最適化された後の最大ファイル サイズ。 | |
CLONE | ||
sourceTableSize | 複製されたバージョンのソース テーブルのサイズ (バイト単位)。 | |
sourceNumOfFiles | 複製されたバージョンのソース テーブル内のファイルの数。 | |
numRemovedFiles | 前の Delta テーブルが置き換えられた場合に、ターゲット テーブルから削除されたファイルの数。 | |
removedFilesSize | 前の Delta テーブルが置き換えられた場合に、ターゲット テーブルから削除されたファイルの合計サイズ (バイト単位)。 | |
numCopiedFiles | 新しい場所にコピーされたファイルの数。 シャロー複製の場合は 0。 | |
copiedFilesSize | 新しい場所にコピーされたファイルの合計サイズ (バイト単位)。 シャロー複製の場合は 0。 | |
RESTORE | ||
tableSizeAfterRestore | 復元後のテーブル サイズ (バイト単位)。 | |
numOfFilesAfterRestore | 復元後のテーブル内のファイルの数。 | |
numRemovedFiles | 復元操作によって削除されたファイルの数。 | |
numRestoredFiles | 復元の結果として追加されたファイルの数。 | |
removedFilesSize | 復元によって削除されたファイルのサイズ (バイト単位)。 | |
restoredFilesSize | 復元によって追加されたファイルのサイズ (バイト単位)。 | |
VACUUM | ||
numDeletedFiles | 削除されたファイルの数。 | |
numVacuumedDirectories | バキューム処理されたディレクトリの数。 | |
numFilesToDelete | 削除するファイルの数。 |
Delta Lake のタイム トラベルとは
Delta Lake のタイム トラベルは、タイムスタンプまたはテーブルのバージョン (トランザクション ログに記録されているもの) に基づいて、テーブルの以前のバージョンに対してクエリを実行することをサポートしています。 タイム トラベルは、次のようなアプリケーションに使用できます。
- 分析、レポート、または出力 (機械学習モデルの出力など) を再作成する。 これは、特に規制対象の業界でデバッグや監査を行う際に役立ちます。
- 複雑なテンポラル クエリを記述する。
- データの間違いを修正する。
- 急速に変化するテーブルに対応する一連のクエリに対してスナップショット分離を提供する。
重要
タイム トラベルでアクセスできるテーブル バージョンは、トランザクション ログ ファイルの保持しきい値と、VACUUM
操作の頻度と指定された保持期間の組み合わせによって決まります。 既定値を使用して VACUUM
を毎日実行する場合は、7 日間のデータをタイム トラベルで使用できます。
Delta のタイム トラベル構文
タイム トラベルを使用して Delta テーブルにクエリを実行するには、テーブル名の指定の後に句を追加します。
timestamp_expression
には次のいずれかを指定できます。'2018-10-18T22:15:12.013Z'
、つまり、タイムスタンプにキャストできる文字列ですcast('2018-10-18 13:36:32 CEST' as timestamp)
'2018-10-18'
、つまり、日付文字列ですcurrent_timestamp() - interval 12 hours
date_sub(current_date(), 1)
- タイムスタンプにキャストされる (できる) その他の式
version
は、DESCRIBE HISTORY table_spec
の出力から取得できる long 型の値です。
timestamp_expression
も version
もサブクエリにすることはできません。
日付またはタイムスタンプ文字列のみが使用できます。 たとえば、"2019-01-01"
や "2019-01-01T00:00:00.000Z"
す。 構文の例については、次のコードを参照してください。
SQL
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
Python
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")
また、@
構文を使用して、タイムスタンプまたはバージョンをテーブル名の一部として指定することもできます。 タイムスタンプは yyyyMMddHHmmssSSS
形式である必要があります。 バージョンの前に v
を付加することで、@
の後にバージョンを指定できます。 構文の例については、次のコードを参照してください。
SQL
SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123
Python
spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")
トランザクション ログ チェックポイントとは
Delta Lake を使用すると、テーブルのバージョンが _delta_log
ディレクトリ内の JSON ファイルとして記録され、テーブル データと共に格納されます。 チェックポイント クエリを最適化するために、Delta Lake によってテーブルのバージョンが Parquet チェックポイント ファイルに集計されるため、テーブル履歴の JSON バージョンをすべて読み取る必要がなくなります。 Azure Databricks を使用すると、データ サイズとワークロードのチェックポイント処理の頻度が最適化されます。 ユーザーはチェックポイントを直接操作する必要はありません。 チェックポイント処理の頻度は、予告なく変更される場合があります。
タイム トラベル クエリのデータ保持を構成する
以前のテーブル バージョンに対してクエリを実行するには、そのバージョンのログとデータのファイルの "両方" を保持している必要があります。
データ ファイルは、テーブルに対して VACUUM
を実行すると削除されます。 Delta Lake では、テーブルのバージョンをチェックポイント処理した後、ログ ファイルの削除が自動的に管理されます。
ほとんどの Delta テーブルに対して VACUUM
が定期的に実行されるため、ポイントインタイム クエリでは VACUUM
の保持しきい値 (既定では 7 日) を考慮する必要があります。
Delta テーブルのデータ保持しきい値を大きくするには、次のテーブル プロパティを構成する必要があります。
delta.logRetentionDuration = "interval <interval>"
: テーブルの履歴を保持する期間を制御します。 既定では、interval 30 days
です。delta.deletedFileRetentionDuration = "interval <interval>"
: 現在のテーブル バージョンで参照されなくなったデータ ファイルを削除するためにVACUUM
で使用するしきい値を決定します。 既定では、interval 7 days
です。
テーブルの作成時に "Delta" プロパティを指定する、または ALTER TABLE
ステートメントを使用して設定することもできます。 「Delta テーブル プロパティのリファレンス」を参照してください。
注意
これらの両方のプロパティを設定して、頻繁な VACUUM
操作が行われるテーブルのテーブル履歴が、長期間保持されるようにする必要があります。 たとえば、30 日間の履歴データにアクセスするには、delta.deletedFileRetentionDuration = "interval 30 days"
を設定します (これは delta.logRetentionDuration
の既定の設定と一致します)。
データ保持のしきい値を大きくすると、より多くのデータ ファイルが維持されるため、ストレージ コストが増加する可能性があります。
Delta テーブルを以前の状態に復元する
RESTORE
コマンドを使用して、Delta テーブルを以前の状態に復元できます。 Delta テーブルは、以前の状態に復元できるようにするために、テーブルの履歴バージョンを内部的に保持しています。
RESTORE
コマンドのオプションとして、以前の状態に対応するバージョンまたは以前の状態が作成されたときのタイムスタンプがサポートされています。
重要
- 既に復元されたテーブルを復元できます。
- 複製されたテーブルを復元できます。
- 復元するテーブルに対する
MODIFY
アクセス許可が必要です。 - 手動または
vacuum
によってデータ ファイルが削除されている場合、以前のバージョンにテーブルを復元することはできません。 それでも、spark.sql.files.ignoreMissingFiles
がtrue
に設定されている場合は、このバージョンに部分的に復元できます。 - 以前の状態に復元するためのタイムスタンプ形式は
yyyy-MM-dd HH:mm:ss
です。 日付 (yyyy-MM-dd
) 文字列のみを指定することもできます。
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;
構文の詳細については、「RESTORE」を参照してください。
重要
復元は、データ変更操作と認識されます。 RESTORE
コマンドによって追加された Delta Lake ログ エントリには、true に設定された dataChange が含まれます。 Delta Lake テーブルの更新を処理する 構造化ストリーミング ジョブなどのダウンストリーム アプリケーションがある場合、復元操作によって追加されたデータ変更ログ エントリは、新しいデータ更新と見なされ、それらの処理によってデータが重複する可能性があります。
次に例を示します。
テーブルのバージョン | 操作 | 差分ログの更新 | データ変更ログの更新のレコード |
---|---|---|---|
0 | INSERT | AddFile(/path/to/file-1, dataChange = true) | (name = Viktor, age = 29, (name = George, age = 55) |
1 | INSERT | AddFile(/path/to/file-2, dataChange = true) | (name = George, age = 39) |
2 | OPTIMIZE | AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) | (最適化圧縮はテーブル内のデータを変更しないため、レコードはありません) |
3 | RESTORE(version=1) | RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) | (name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39) |
前の例では、RESTORE
コマンドの結果、Delta テーブル バージョン 0 と 1 を読み取るときに既に表示されていた更新プログラムが表示されます。 ストリーミング クエリでこのテーブルが読み取られていた場合、これらのファイルは新しく追加されたデータと見なされ、再び処理されます。
復元のメトリック
RESTORE
操作が完了すると、次のメトリックが単一行データフレームとして報告されます。
table_size_after_restore
: 復元後のテーブルのサイズ。num_of_files_after_restore
: 復元後のテーブル内のファイルの数。num_removed_files
: テーブルから削除された (論理的に削除された) ファイルの数。num_restored_files
: ロールバックによって復元されたファイルの数。removed_files_size
: テーブルから削除されたファイルの合計サイズ (バイト単位)。restored_files_size
: 復元されたファイルの合計サイズ (バイト単位)。
Delta Lake のタイム トラベルの使用例
ユーザー
111
のテーブルの誤った削除を修正する:INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111
テーブルの誤った更新を修正する:
MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *
先週に追加された新しい顧客の数を照会する:
SELECT count(distinct userId) FROM my_table - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
Spark セッションで最後のコミットのバージョンを見つけるには、どうすればよいですか?
すべてのスレッドとすべてのテーブルの全体で、現在の SparkSession
によって書き込まれた最後のコミットのバージョン番号を取得するには、SQL 構成 spark.databricks.delta.lastCommitVersionInSession
のクエリを実行します。
SQL
SET spark.databricks.delta.lastCommitVersionInSession
Python
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Scala
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
SparkSession
によるコミットが行われていない場合は、キーのクエリを実行すると空の値が返されます。
注意
複数のスレッド間で同じ SparkSession
を共有する場合、それは複数のスレッド間で変数を共有するのと似ています。構成値が同時に更新されると、競合状態が発生する可能性があります。