Delta Live Tables パイプラインで更新を実行する
この記事では、パイプラインの更新について説明し、更新プログラムをトリガーする方法について詳しく説明します。
パイプラインの更新とは
パイプラインを作成し、実行する準備ができたら、"更新" を開始します。 パイプラインの更新では、次のことが実行されます。
- 正しい構成でクラスターを起動します。
- 定義されているすべてのテーブルとビューを検出し、無効な列名、不足している依存関係、構文エラーなどの分析エラーを確認します。
- 使用可能な最新のデータを含むテーブルおよびビューを作成または更新します。
有効な更新プログラムを使用するとテーブルの作成や更新を待たずに、パイプラインのソース コードで問題を確認できます。 この機能は、パイプライン内の誤ったテーブル名や列名などのエラーをすばやく見つけて修正できるため、パイプラインを開発またはテストするときに役立ちます。
パイプラインの更新はどのようにトリガーされますか?
パイプラインの更新を開始するには、次のいずれかのオプションを使用します。
トリガーの更新 | 細部 |
---|---|
手動 | パイプライン UI、パイプラインの一覧、またはパイプラインにアタッチされたノートブックから、パイプラインの更新を手動でトリガーできます。 「 |
スケジュール済み | ジョブを使用して、パイプラインの更新をスケジュールできます。 「ジョブの Delta Live Tables パイプライン タスク」を参照してください。 |
プログラム的な | サード パーティ製のツール、API、および CLI を使用して、プログラムによって更新プログラムをトリガーできます。 ワークフロー と |
パイプラインの更新を手動でトリガーする
パイプラインの更新を手動でトリガーするには、次のいずれかのオプションを使用します。
- パイプラインの詳細ページで
ボタンをクリックします。
- パイプラインの一覧で、
Right Arrow Icon[アクション]列にある をクリックします。
Note
手動でトリガーされるパイプライン更新の既定の動作は、パイプラインで定義されているすべてのデータセットを更新することです。
パイプライン更新セマンティクス
次の表では、既定の更新と完全更新のための具体化されたビューとストリーミング テーブルの動作について説明します。
更新の種類 | 具体化されたビュー セマンティクス | ストリーミング テーブルのセマンティクス |
---|---|---|
更新 (既定値) | 定義クエリの現在の結果を反映するように結果を更新します。 | ストリーミング テーブルとフローで定義されたロジックを使用して、新しいレコードを処理します。 |
完全更新 | 定義クエリの現在の結果を反映するように結果を更新します。 | ストリーミング テーブルからデータをクリアし、フローから状態情報 (チェックポイント) をクリアし、データ ソースからすべてのレコードを再処理します。 |
既定では、パイプライン内のすべての具体化されたビューとストリーミング テーブルは、更新ごとに更新されます。 必要に応じて、次の機能を使用して、更新プログラムからテーブルを省略できます。
- 更新テーブルの選択: 更新を実行する前に、この UI を使用して具体化されたビューとストリーミング テーブルを追加または削除します。 「選択したテーブルのパイプライン更新を開始する」を参照してください。
- 失敗したテーブルの更新: 失敗した具体化されたビューとストリーミング テーブル (ダウンストリームの依存関係を含む) の更新を開始します。 失敗したテーブルについては、「
パイプラインの更新を開始する」を参照してください。
これらの機能はどちらも、既定の更新セマンティクスまたは完全更新をサポートしています。 必要に応じて、[更新 テーブルの選択] ダイアログを使用して、失敗したテーブルの更新を実行するときに追加のテーブルを除外できます。
完全更新を使用する必要がありますか?
Databricks では、必要な場合にのみ完全更新を実行することをお勧めします。 完全更新では、データセットを定義するロジックを使用して、指定したデータ ソースのすべてのレコードが常に再処理されます。 完全更新を完了するための時間とリソースは、ソース データのサイズに関連付けられます。
具体化されたビューは、既定の更新と完全更新のどちらを使用した場合でも、同じ結果を返します。 ストリーミング テーブルで完全更新を使用すると、すべての状態処理とチェックポイント情報がリセットされ、入力データが使用できなくなった場合にレコードが削除される可能性があります。
Databricks では、入力データ ソースに、テーブルまたはビューの目的の状態を再作成するために必要なデータが含まれている場合にのみ、完全な更新が推奨されます。 入力ソース データが使用できなくなった次のシナリオと、完全な更新を実行した結果を考えてみましょう。
データ ソース | 入力データが存在しない理由 | 完全更新の結果 |
---|---|---|
カフカ | 短い保存期間のしきい値 | Kafka ソースに存在しなくなったレコードは、ターゲット テーブルから削除されます。 |
オブジェクト ストレージ内のファイル | ライフサイクル ポリシー | ソース ディレクトリに存在しなくなったデータ ファイルは、ターゲット テーブルから削除されます。 |
テーブル内のレコード | コンプライアンスのために削除済み | ソース テーブルに存在するレコードのみが処理されます。 |
テーブルまたはビューで完全な更新が実行されないようにするには、テーブルのプロパティ pipelines.reset.allowed
を false
に設定します。 「Delta Live Tables テーブルのプロパティ」を参照してください。 追加フローを使って、完全な更新の必要なしに、データを既存のストリーミング テーブルに追加することもできます。
選択したテーブルのパイプライン更新を開始する
必要に応じて、パイプライン内の選択したテーブルのデータのみを再処理できます。 たとえば、開発中は、1 つのテーブルのみを変更し、テスト時間を短縮したり、パイプラインの更新が失敗したり、 失敗したテーブルのみを更新したりします。
Note
選択的更新は、トリガーされたパイプラインでのみ使用できます。
選択したテーブルのみを更新する更新プログラムを開始するには、[パイプラインの詳細] ページで次の手順を実行します。
[更新するテーブルの選択] をクリックして更新します。 [更新するテーブルの選択] ダイアログボックスが表示されます。
[更新するテーブルの選択] ボタンが表示されない場合は、Pipeline の詳細ページに最新の更新プログラムが表示され、更新が完了したことを確認します。 たとえば、更新に失敗したために DAG が最新の更新プログラムに対して表示されない場合、 更新のテーブルの選択 ボタンは表示されません。
更新するテーブルを選択するには、各テーブルをクリックします。 選択したテーブルが強調表示され、ラベルが付けられます。 更新プログラムからテーブルを削除するには、もう一度テーブルをクリックします。
[選択の更新] をクリックします。
Note
選択の更新 ボタンには、選択したテーブルの数がかっこで囲まれて表示されます。
選択したテーブルに対して既に取り込まれているデータを再処理するには、Blue Down CaretRefresh selection ボタンの横にあるをクリックし、[完全に更新] をクリック。
失敗したテーブルのパイプライン更新を開始する
パイプライン グラフ内の 1 つ以上のテーブルのエラーが原因でパイプラインの更新が失敗した場合は、失敗したテーブルとダウンストリームの依存関係のみに関する更新を開始できます。
Note
除外されたテーブルは、失敗したテーブルに依存している場合でも更新されません。
失敗したテーブルを更新するには、[パイプラインの詳細] ページで [失敗したテーブルの更新] をクリックします。
選択済みの失敗したテーブルのみを更新するには
Button Down[失敗したテーブルの更新] ボタンの横にある をクリックし、[更新するテーブルを選択] をクリックします。 [更新するテーブルの選択] ダイアログボックスが表示されます。
更新するテーブルを選択するには、各テーブルをクリックします。 選択したテーブルが強調表示され、ラベルが付けられます。 更新プログラムからテーブルを削除するには、もう一度テーブルをクリックします。
[選択の更新] をクリックします。
Note
選択の更新 ボタンには、選択したテーブルの数がかっこで囲まれて表示されます。
選択したテーブルに対して既に取り込まれているデータを再処理するには、Blue Down CaretRefresh selection ボタンの横にあるをクリックし、[完全に更新] をクリック。
テーブルの更新を待たずにパイプラインにエラーがないかどうかを確認する
重要
Delta Live Tables の Validate
更新機能は、パブリック プレビュー段階です。
完全な更新を実行せずにパイプラインのソース コードが有効であるかどうかを確認するには、Validate を使用します。 Validate
更新では、パイプラインで定義されたデータセットとフローの定義が解決されますが、データセットの具体化または発行は行われません。 検証中に見つかったエラー (間違ったテーブル名や列名など) は、UI で報告されます。
Validate
更新を実行するには、Blue Down CaretStart の横にあるパイプラインの詳細ページでをクリックし、Validate をクリックします。
Validate
の更新が完了すると、イベント ログにはValidate
更新にのみ関連するイベントが表示され、DAG にはメトリックは表示されません。 エラーが検出された場合、詳細をイベント ログで確認できます。
最新の Validate
更新に関する結果のみが表示されます。 Validate
更新が、最後に実行された更新である場合は、[更新履歴] でそれを選択すると、結果を確認できます。 その結果は、Validate
更新の後に別の更新が実行されると UI で確認できなくなります。
開発モードと実稼働モード
開発モードと実稼働モードを切り替えることによって、パイプラインの実行を最適化できます。 これら 2 つのモードを切り替えるには、Pipelines UI の ボタンを使用します。 既定では、パイプラインは開発モードで実行されます。
パイプラインを開発モードで実行する場合、Delta Live Tables システムによって次の処理が実行されます。
- クラスターを再利用して、再起動のオーバーヘッドを回避します。 既定では、開発モードが有効になっている場合、クラスターは 2 時間実行されます。 これは、デルタ ライブ テーブル パイプラインの
pipelines.clusterShutdown.delay
の設定で変更できます。 - エラーを即座に検出して修正できるように、パイプラインの再試行を無効にします。
運用モードの場合、Delta Live Tables システムによって次の処理が実行されます。
- メモリ リークや古い資格情報など、特定の回復可能なエラーが発生した場合にクラスターを再起動します。
- クラスターの起動に失敗するなど、特定のエラーが発生した場合に実行を再試行します。
Note
開発モードと実稼働モードの切り替えで制御されるのは、クラスターとパイプラインの実行動作のみです。 発行テーブル用のカタログ内のストレージの場所とターゲット スキーマは、パイプライン設定の一部として構成する必要があり、モードを切り替えても影響を受けません。