Delta Live Tables パイプラインで更新を実行する
この記事では、Delta Live Tables パイプラインの更新とは何か、およびその実行方法について説明します。
パイプラインを作成し、実行する準備ができたら、"更新" を開始します。 パイプラインの更新では、次のことが実行されます。
- 正しい構成でクラスターを起動します。
- 定義されているすべてのテーブルとビューを検出し、無効な列名、不足している依存関係、構文エラーなどの分析エラーを確認します。
- 使用可能な最新のデータを含むテーブルおよびビューを作成または更新します。
有効な更新プログラムを使用するとテーブルの作成や更新を待たずに、パイプラインのソース コードで問題を確認できます。 この機能は、パイプラインを開発またはテストするときに役立ちます。パイプライン内の誤ったテーブル名や列名などのエラーをすばやく見つけて修正できるためです。
パイプラインを作成する方法については、「 Delta Live Tables パイプラインを構成するを参照してください。
Databricks ジョブまたはその他のツールを使用して、パイプラインの更新を調整できます。 「 ワークフローでデルタ ライブ テーブル パイプラインを実行するを参照してください。
パイプラインの更新を開始する
Azure Databricks には、パイプラインの更新を開始するための次のようないくつかのオプションがあります。
- Delta Live Tables UI には、次のオプションがあります。
- パイプラインの詳細ページで ボタンをクリックします。
- パイプラインの一覧で、[アクション]列にある をクリックします。
- ノートブックで更新を開始するには、構成されたパイプラインにノートブックをアタッチし、 [開始] をクリックします。 「ノートブックでの Delta Live Tables パイプラインの開発とデバッグ」を参照してください。
- API または CLI を使用して、プログラムでパイプラインをトリガーできます。 Pipeline API を参照してください。
- Delta Live Tables UI またはジョブ UI を使用してパイプラインをジョブとしてスケジュールできます。 「パイプラインのスケジュールを設定する」を参照してください。
Note
これらのメソッドのいずれかを使用して手動でトリガーされるパイプライン更新の既定の動作は、すべて更新することです。
Delta Live Tables でテーブルおよびビューを更新する方法
重要
ストリーミング テーブルまたは具体化されたビューを完全に更新すると、テーブルまたはビューが切り捨てられ、再計算されて、入力データ ソースの現在の状態が反映されます。 ストリーミング テーブルの場合、チェックポイントもリセットされます。 データ 保持ポリシー、手動削除、Kafka などの短い保持期間を持つソースなど、データ ソースからレコードが削除されている場合、完全更新後のテーブルまたはビューの状態が以前の状態と異なる場合があります。 さらに、完全な更新を完了するための時間とリソースは、ソース データのサイズに関連付けられます。
Databricks では、必要な場合、および入力データ ソースにテーブルまたはビューの状態を再作成するデータが含まれている場合にのみ、完全更新を実行することをお勧めします。 テーブルまたはビューで完全な更新が実行されないようにするには、table プロパティの pipelines.reset.allowed
を false
に設定します。 「デルタ ライブ テーブル テーブルのプロパティを参照してください。 追加フロー を使用して、完全な更新を必要とせずに既存のストリーミング テーブルにデータを追加することもできます。
更新されたテーブルとビュー、およびそれらのテーブルとビューの更新方法は、更新の種類によって異なります。
- すべて更新: すべてのテーブルは、その入力データ ソースの現在の状態を反映するように更新されます。 ストリーミング テーブルの場合、新しい行がテーブルに追加されます。
- すべて完全更新: すべてのテーブルは、その入力データ ソースの現在の状態を反映するように更新されます。 ストリーミング テーブルの場合、Delta Live Tables は、各テーブルからのすべてのデータを消去し、ストリーミング ソースからのすべてのデータを読み込もうとします。
- 更新の選択:
refresh selection
の動作はrefresh all
と同じですが、選択したテーブルのみを更新できます。 選択したテーブルは、その入力データ ソースの現在の状態を反映するように更新されます。 ストリーミング テーブルの場合、新しい行がテーブルに追加されます。 - 完全な更新の選択:
full refresh selection
の動作はfull refresh all
と同じですが、選択したテーブルのみの完全な更新を実行できます。 選択したテーブルは、その入力データ ソースの現在の状態を反映するように更新されます。 ストリーミング テーブルの場合、Delta Live Tables は各テーブルからすべてのデータをクリアし、ストリーミング ソースからすべてのデータを読み込もうとします。
既存の具体化されたビューの場合、更新は、具体化されたビューの SQL REFRESH
と同じ動作になります。 新しい具体化されたビューの場合、動作は SQL CREATE
操作と同じです。
選択したテーブルのパイプライン更新を開始する
必要に応じて、パイプライン内の選択したテーブルのデータのみを再処理できます。 たとえば、開発中は、1 つのテーブルのみを変更し、テスト時間を短縮したり、パイプラインの更新が失敗したり、 失敗したテーブルのみを更新したりします。
Note
選択的更新は、トリガーされたパイプラインでのみ使用できます。
選択したテーブルのみを更新する更新プログラムを開始するには、[パイプラインの詳細] ページで次の手順を実行します。
[更新するテーブルの選択] をクリックして更新します。 [更新するテーブルの選択] ダイアログボックスが表示されます。
[更新するテーブルの選択] ボタンが表示されない場合は、Pipeline の詳細ページに最新の更新プログラムが表示され、更新が完了したことを確認します。 たとえば、更新に失敗したために DAG が最新の更新プログラムに対して表示されない場合、 更新のテーブルの選択 ボタンは表示されません。
更新するテーブルを選択するには、各テーブルをクリックします。 選択したテーブルが強調表示され、ラベルが付けられます。 更新プログラムからテーブルを削除するには、もう一度テーブルをクリックします。
[選択の更新] をクリックします。
Note
選択の更新 ボタンには、選択したテーブルの数がかっこで囲まれて表示されます。
選択したテーブルに対して既に取り込まれているデータを再処理するには、Refresh selection ボタンの横にあるをクリックし、[完全に更新] をクリック。
失敗したテーブルのパイプライン更新を開始する
パイプライン グラフ内の 1 つ以上のテーブルのエラーが原因でパイプラインの更新が失敗した場合は、失敗したテーブルとダウンストリームの依存関係のみに関する更新を開始できます。
Note
除外されたテーブルは、失敗したテーブルに依存している場合でも更新されません。
失敗したテーブルを更新するには、[パイプラインの詳細] ページで [失敗したテーブルの更新] をクリックします。
選択済みの失敗したテーブルのみを更新するには
[失敗したテーブルの更新] ボタンの横にある をクリックし、[更新するテーブルを選択] をクリックします。 [更新するテーブルの選択] ダイアログボックスが表示されます。
更新するテーブルを選択するには、各テーブルをクリックします。 選択したテーブルが強調表示され、ラベルが付けられます。 更新プログラムからテーブルを削除するには、もう一度テーブルをクリックします。
[選択の更新] をクリックします。
Note
選択の更新 ボタンには、選択したテーブルの数がかっこで囲まれて表示されます。
選択したテーブルに対して既に取り込まれているデータを再処理するには、Refresh selection ボタンの横にあるをクリックし、[完全に更新] をクリック。
テーブルの更新を待たずにパイプラインにエラーがないかどうかを確認する
重要
Delta Live Tables の Validate
更新機能は、パブリック プレビュー段階です。
完全な更新を実行せずにパイプラインのソース コードが有効であるかどうかを確認するには、Validate を使用します。 Validate
更新では、パイプラインで定義されたデータセットとフローの定義が解決されますが、データセットの具体化または発行は行われません。 検証中に見つかったエラー (間違ったテーブル名や列名など) は、UI で報告されます。
Validate
更新を実行するには、Start の横にあるパイプラインの詳細ページでをクリックし、Validate をクリックします。
Validate
の更新が完了すると、イベント ログにはValidate
更新にのみ関連するイベントが表示され、DAG にはメトリックは表示されません。 エラーが検出された場合、詳細をイベント ログで確認できます。
最新の Validate
更新に関する結果のみが表示されます。 Validate
更新が、最後に実行された更新である場合は、[更新履歴] でそれを選択すると、結果を確認できます。 その結果は、Validate
更新の後に別の更新が実行されると UI で確認できなくなります。
パイプライン境界を選択する方法
Delta Live Tables のパイプラインは、1 つのテーブル、依存リレーションシップを持つ多数のテーブル、リレーションシップがない多数のテーブル、依存リレーションシップを持つテーブルの複数の独立フローに対して更新プログラムを処理できます。 このセクションでは、パイプラインの分割方法を決定するために役立つ考慮事項について説明します。
より大きな Delta Live Tables パイプラインには、いくつかの利点があります。 コーディネートは次のとおりです。
- クラスター リソースをより効率的に使用します。
- ワークスペースの中のパイプラインの数を減らします。
- ワークフロー オーケストレーションの複雑さを減らします。
処理パイプラインの分割方法に関する一般的な推奨事項としては、次のようなものがあります。
- チームの境界で機能を分割します。 たとえば、データ チームはデータを変換するためのパイプラインを維持し、データ アナリストは変換されたデータを分析するパイプラインを維持します。
- アプリケーション固有の境界で機能を分割して結合を減らし、共通機能の再利用を容易にします。
開発モードと実稼働モード
開発モードと実稼働モードを切り替えることによって、パイプラインの実行を最適化できます。 これら 2 つのモードを切り替えるには、Pipelines UI の ボタンを使用します。 既定では、パイプラインは開発モードで実行されます。
パイプラインを開発モードで実行する場合、Delta Live Tables システムによって次の処理が実行されます。
- クラスターを再利用して、再起動のオーバーヘッドを回避します。 既定では、開発モードが有効になっている場合、クラスターは 2 時間実行されます。 これは、デルタ ライブ テーブル パイプラインの
pipelines.clusterShutdown.delay
の設定で変更できます。 - エラーを即座に検出して修正できるように、パイプラインの再試行を無効にします。
運用モードの場合、Delta Live Tables システムによって次の処理が実行されます。
- メモリ リークや古い資格情報など、特定の回復可能なエラーが発生した場合にクラスターを再起動します。
- クラスターの起動に失敗するなど、特定のエラーが発生した場合に実行を再試行します。
Note
開発モードと実稼働モードの切り替えで制御されるのは、クラスターとパイプラインの実行動作のみです。 発行テーブル用のカタログ内のストレージの場所とターゲット スキーマは、パイプライン設定の一部として構成する必要があり、モードを切り替えても影響を受けません。
パイプラインのスケジュールを設定する
トリガーされるパイプラインを手動で開始することも、Azure Databricks ジョブを使用してスケジュールに基づいてパイプラインを実行することもできます。 Delta Live Tables UI で直接 1 つのパイプライン タスクを含むジョブを作成し、スケジュール設定できます。または、ジョブ UI のマルチタスク ワークフローにパイプライン タスクを追加できます。 「ジョブの Delta Live Tables パイプライン タスク」を参照してください。
Delta Live Tables UI で単一タスク ジョブとそのジョブのスケジュールを作成するには、次のようにします。
- [スケジュール] > [スケジュールの追加] をクリックします。 パイプラインが 1 つ以上のスケジュールされたジョブに含まれている場合、 Schedule ボタンが更新され、既存のスケジュールの数が表示されます (たとえば、 Schedule (5))。
- [ジョブ名] フィールドに、ジョブの名前を入力します。
- [スケジュール] を [スケジュール済み] に設定します。
- 期間、開始時刻、タイム ゾーンを指定します。
- パイプラインの開始、成功、または失敗に関するアラートを受信するように、1 つ以上のメール アドレスを構成します。
- [作成] をクリックします。