Hive メタストア パイプラインを複製して Unity Catalog パイプラインを作成する
この記事では、Databricks REST API の clone a pipeline
要求と、それを使用して Hive メタストアに発行する既存のパイプラインを、Unity カタログに発行する新しいパイプラインにコピーする方法について説明します。 clone a pipeline
要求を呼び出すと、次が行なわれます。
- ソース コードと構成を既存のパイプラインから新しいパイプラインにコピーし、指定された構成のオーバーライドを適用します。
- 具体化されたビューおよびストリーミング テーブルの定義と参照を、これらのオブジェクトが Unity Catalog によって管理されるのに必要となる変更で更新します。
- パイプラインの更新を開始して、パイプライン内のすべてのストリーミング テーブルに対し、既存のデータとメタデータ (チェックポイントなど) を移行します。 これで、これらのストリーミング テーブルは、元のパイプラインと同じポイントで処理を再開できるようになります。
クローン操作の完了後は、元のパイプラインと新しいパイプラインの両方が独立して実行できます。
この記事では、API 要求の 呼び出しを直接的に、また、 Databricks ノートブックから Python スクリプトを介して行う例を示します。
開始する前に
パイプラインを複製する前に、次が必要です。
Hive メタストア パイプラインをクローンするには、パイプラインで定義されているテーブルとビューが、ターゲット スキーマにテーブルを公開する必要があります。 パイプラインにターゲット スキーマを追加する方法については、「Hive メタストアに発行するようにパイプラインを構成する」を参照してください。
クローンするパイプライン内の Hive メタストアで管理されたテーブルまたはビューへの参照は、カタログ (
hive_metastore
)、スキーマ、およびテーブル名で完全修飾されている必要があります。 たとえば、customers
データセットを作成する次のコードでは、テーブル名の引数をhive_metastore.sales.customers
に更新する必要があります。@dlt.table def customers(): return spark.read.table("sales.customers").where(...)
パイプラインの一部として構成されたノートブックや、Git フォルダーまたはワークスペース ファイルに格納されているいずれかのモジュールを含む、複製元 Hive メタストア パイプラインのソース コードは、クローン操作の進行中に編集しないでください。
クローン操作を開始するときに、ソース Hive メタストア パイプラインを実行しないでください。 更新が実行されている場合は、それを停止するか完了するまで待機します。
パイプラインを複製する前に考慮すべき、その他の重要な事項を次に示します。
- Hive メタストア パイプライン内のテーブルで、Python の
path
引数または SQL のLOCATION
引数を使用してストレージの場所を指定している場合は、クローン要求に"pipelines.migration.ignoreExplicitPath": "true"
構成を渡してください。 この構成の設定は、以下の手順に含まれています。 - Hive メタストア パイプラインに
cloudFiles.schemaLocation
オプションの値を指定するオート ローダー ソースが含まれており、Unity Catalog クローンの作成後も Hive メタストア パイプラインが引き続き動作する場合は、Hive メタストア パイプラインとクローンされた Unity Catalog パイプラインの両方で、mergeSchema
オプションをtrue
に設定する必要があります。 複製される前に、このオプションを Hive メタストア パイプラインに追加することで、新しいパイプラインにオプションがコピーされます。
Databricks REST API を使用してパイプラインをクローンする
次の例では、curl
コマンドを使用して、Databricks REST API で clone a pipeline
要求を呼び出します。
curl -X POST \
--header "Authorization: Bearer <personal-access-token>" \
<databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
--data @clone-pipeline.json
次のように置き換えます。
<personal-access-token>
を、Databricks 個人用アクセス トークンに。- Azure Databricks
<databricks-instance>
に を設定します。例えば、adb-1234567890123456.7.azuredatabricks.net
です。 <pipeline-id>
を、クローンする Hive メタストア パイプラインの一意識別子に。 パイプライン ID は、Delta Live Tables UI で確認できます。
clone-pipeline.json:
{
"catalog": "<target-catalog-name>",
"target": "<target-schema-name>",
"name": "<new-pipeline-name>"
"clone_mode": "MIGRATE_TO_UC",
"configuration": {
"pipelines.migration.ignoreExplicitPath": "true"
}
}
次のように置き換えます。
<target-catalog-name>
には、新しいパイプラインの発行先である Unity Catalog 内のカタログ名を使用します。 これは既存のカタログである必要があります。<target-schema-name>
を、現在のスキーマ名と異なる場合に新しいパイプラインの発行先となる Unity Catalog 内のスキーマ名に。 このパラメーターはオプションで、指定しない場合は既存のスキーマ名が使用されます。- 新しいパイプラインの名前として
<new-pipeline-name>
を使用することもできます。 指定しない場合、新しいパイプラインには、ソース パイプライン名に[UC]
を追加した名前が付けられ ます。
clone_mode
は、クローン操作で使用するモードを指定します。 サポートされている唯一のオプションは、MIGRATE_TO_UC
です。
新しいパイプラインの構成を指定するには、configuration
フィールドを使用します。 ここで設定した値は、元のパイプラインの構成をオーバーライドします。
clone
REST API 要求からの応答は、新しい Unity Catalog パイプラインのパイプライン ID です。
Databricks ノートブックからパイプラインをクローンする
次の例では、Python スクリプトから create a pipeline
要求を呼び出します。 このスクリプトは、Databricks ノートブックを使用して実行できます。
- スクリプト用に新しいノートブックを作成します。 「ノートブックを作成する」を参照してください。
- 次の Python スクリプトを、ノートブックの最初のセルにコピーします。
- スクリプト内のプレースホルダーの値を、次の値に置き換えて更新します。
- Azure Databricks
<databricks-instance>
に を設定します。例えば、adb-1234567890123456.7.azuredatabricks.net
です。 <pipeline-id>
を、クローンする Hive メタストア パイプラインの一意識別子に。 パイプライン ID は、Delta Live Tables UI で確認できます。<target-catalog-name>
には、新しいパイプラインの発行先である Unity Catalog 内のカタログ名を使用します。 これは既存のカタログである必要があります。<target-schema-name>
を、現在のスキーマ名と異なる場合に新しいパイプラインの発行先となる Unity Catalog 内のスキーマ名に。 このパラメーターはオプションで、指定しない場合は既存のスキーマ名が使用されます。- 新しいパイプラインの名前として
<new-pipeline-name>
を使用することもできます。 指定しない場合、新しいパイプラインには、ソース パイプライン名に[UC]
を追加した名前が付けられ ます。
- Azure Databricks
- スクリプトを実行します。 「Databricks ノートブックを実行する」を参照してください。
import requests
# Your Databricks workspace URL, with no trailing spaces
WORKSPACE = "<databricks-instance>"
# The pipeline ID of the Hive metastore pipeline to clone
SOURCE_PIPELINE_ID = "<pipeline-id>"
# The target catalog name in Unity Catalog
TARGET_CATALOG = "<target-catalog-name>"
# (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used
TARGET_SCHEMA = "<target-schema-name>"
# (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]"
CLONED_PIPELINE_NAME = "<new-pipeline-name>"
# This is the only supported clone mode
CLONE_MODE = "MIGRATE_TO_UC"
# Specify override configurations
OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"}
def get_token():
ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
return getattr(ctx, "apiToken")().get()
def check_source_pipeline_exists():
data = requests.get(
f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}",
headers={"Authorization": f"Bearer {get_token()}"},
)
assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!"
def request_pipeline_clone():
payload = {
"catalog": TARGET_CATALOG,
"clone_mode": CLONE_MODE,
}
if TARGET_SCHEMA != "":
payload["target"] = TARGET_SCHEMA
if CLONED_PIPELINE_NAME != "":
payload["name"] = CLONED_PIPELINE_NAME
if OVERRIDE_CONFIGS:
payload["configuration"] = OVERRIDE_CONFIGS
data = requests.post(
f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone",
headers={"Authorization": f"Bearer {get_token()}"},
json=payload,
)
response = data.json()
return response
check_source_pipeline_exists()
request_pipeline_clone()
制限事項
以下は、Delta Live Tables の clone a pipeline
API 要求における制限事項です。
- Hive メタストアを使用するように構成されたパイプラインから Unity Catalog パイプラインへの複製のみがサポートされます。
- クローンは、複製元のパイプラインと同じ Azure Databricks ワークスペース内にのみ作成できます。
- 複製するパイプラインには、次のストリーミング ソースのみを含めることができます。
- デルタソース
- オート ローダー (オート ローダーでサポートされている、すべてのデータ ソースを含む)。 「クラウド オブジェクト ストレージからファイルを読み込む」を参照してください。
- 構造化ストリーミングを使用する Apache Kafka。 ただし、Kafka ソースでは、
kafka.group.id
オプションを使用する構成すはできません。 「Apache Kafka と Azure Databricks を使用したストリーム処理」を参照してください。 - 構造化ストリーミングを使用する Amazon Kinesis。 ただし、Kinesis ソースでは、
consumerMode
をefo
に設定する構成はできません。
- 複製する Hive メタストア パイプラインでオート ローダー ファイル通知モードが使用されている場合、Databricks では、複製後に Hive メタストア パイプラインを実行しないことを推奨しています。 これは、Hive メタストア パイプラインを実行することで、Unity Catalog クローンから一部のファイル通知イベントがドロップされるためです。 クローン操作が完了した後にソースの Hive メタストア パイプラインが実行されているのであれば、オート ローダーで
cloudFiles.backfillInterval
オプションを使用して、不足しているファイルをバックフィルできます。 オート ローダー ファイル通知モードの詳細については、「オート ローダー ファイル通知モードとは」を参照してください。 オート ローダーを使用したファイルのバックフィルの詳細については、「cloudFiles.backfillInterval を使用して定期的なバックフィルをトリガーする」、および「オート ローダーの一般的なオプション」を参照してください。 - 複製の進行中、パイプラインのメンテナンス タスクは、両方のパイプラインについて自動的に一時停止されます。
- クローンされた Unity Catalog パイプライン内のテーブルに対するタイム トラベル クエリには、次のことが適用されます。
- テーブル バージョンが、元の Hive メタストアで管理されたオブジェクトに書き込まれている場合、複製された Unity Catalog オブジェクトに対してクエリを実行する際に、
timestamp_expression
句を使用したタイム トラベル クエリは未定義になります。 - ただし、複製された Unity Catalog オブジェクトにテーブル バージョンが書き込まれていれば、
timestamp_expression
句を使用したタイム トラベル クエリは正しく機能します。 - バージョンが、元の Hive メタストアで管理されたオブジェクトに書き込まれている場合でも、
version
句を使用したタイム トラベル クエリは、複製された Unity Catalog オブジェクトに対するクエリの実行で正しく機能します。
- テーブル バージョンが、元の Hive メタストアで管理されたオブジェクトに書き込まれている場合、複製された Unity Catalog オブジェクトに対してクエリを実行する際に、
- Unity Catalog で Delta Live Tables を使用する場合のその他の制限事項については、「 Unity Catalog パイプラインの制限事項」を参照してください。
- Unity カタログの制限事項については、「Unity Catalog の制限事項」を参照してください。