다음을 통해 공유


Hive 메타스토어 파이프라인을 복제하여 Unity 카탈로그 파이프라인 만들기

이 문서에서는 Databricks REST API의 clone a pipeline 요청과 이를 사용하여 Hive 메타스토어에 게시하는 기존 파이프라인을 Unity 카탈로그에 게시하는 새 파이프라인에 복사하는 방법을 설명합니다. clone a pipeline 요청을 호출하면 다음을 수행합니다.

  • 기존 파이프라인에서 새 파이프라인으로 소스 코드 및 구성을 복사하여 지정한 구성 재정의를 적용합니다.
  • 구체화된 뷰와 스트리밍 테이블 정의 및 참조를 Unity 카탈로그에서 관리될 수 있도록 필요한 변경 사항으로 업데이트합니다.
  • 파이프라인의 모든 스트리밍 테이블에 대한 기존 데이터 및 메타데이터(예: 검사점)를 마이그레이션하기 위한 파이프라인 업데이트를 시작합니다. 이렇게 하면 해당 스트리밍 테이블이 원래 파이프라인과 동일한 지점에서 처리를 다시 시작할 수 있습니다.

복제 작업이 완료되면 원래 파이프라인과 새 파이프라인을 모두 독립적으로 실행할 수 있습니다.

이 문서에는 Databricks Notebook에서 Python 스크립트를 통해 직접 API 요청을 호출하는 예제가 포함되어 있습니다.

시작하기 전에

파이프라인을 복제하기 전에 다음이 필요합니다.

  • Hive 메타스토어 파이프라인을 복제하려면 파이프라인에 정의된 테이블 및 뷰가 대상 스키마에 테이블을 게시해야 합니다. 파이프라인에 대상 스키마를 추가하는 방법을 알아보려면 Hive 메타스토어게시하도록 파이프라인 구성을 참조하세요.

  • 복제할 파이프라인의 Hive 메타스토어 관리 테이블 또는 뷰에 대한 참조는 카탈로그(hive_metastore), 스키마 및 테이블 이름으로 정규화되어야 합니다. 예를 들어 customers 데이터 세트를 만드는 다음 코드에서 테이블 이름 인수를 hive_metastore.sales.customers업데이트해야 합니다.

    @dlt.table
    def customers():
      return spark.read.table("sales.customers").where(...)
    
  • 복제 작업이 진행되는 동안, 파이프라인의 일부로 구성된 노트북과 Git 폴더 또는 작업 영역 파일에 저장된 모듈을 포함하여 원본 Hive 메타스토어 파이프라인의 소스 코드를 편집하지 마세요.

  • 복제 작업을 시작할 때 원본 Hive 메타스토어 파이프라인이 실행되지 않아야 합니다. 업데이트가 실행 중인 경우 업데이트를 중지하거나 완료되기를 기다립니다.

파이프라인을 복제하기 전에 고려해야 할 다른 중요한 고려 사항은 다음과 같습니다.

  • Hive 메타스토어 파이프라인의 테이블이 Python의 path 인수를 사용하여 스토리지 위치를 지정하거나, SQL의 LOCATION 인수를 사용할 경우, "pipelines.migration.ignoreExplicitPath": "true" 구성을 복제 요청에 전달합니다. 이 구성 설정은 아래 지침에 포함되어 있습니다.
  • Hive 메타스토어 파이프라인에 cloudFiles.schemaLocation 옵션의 값을 지정하는 자동 로더 원본이 포함되어 있고, Unity 카탈로그 클론을 만든 후에도 Hive 메타스토어 파이프라인이 계속 작동하는 경우 Hive 메타스토어 파이프라인과 복제된 Unity 카탈로그 파이프라인 모두에서 mergeSchematrue 옵션을 설정해야 합니다. 복제하기 전에 Hive 메타스토어 파이프라인에 이 옵션을 추가하면 옵션이 새 파이프라인에 복사됩니다.

Databricks REST API를 사용하여 파이프라인 복제

다음 예제에서는 curl 명령을 사용하여 Databricks REST API에서 clone a pipeline 요청을 호출합니다.

curl -X POST \
     --header "Authorization: Bearer <personal-access-token>"  \
     <databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
     --data @clone-pipeline.json

교체:

  • Databricks <personal-access-token>를 사용하여 .
  • Azure Databricks <databricks-instance>, 예를 들어 (adb-1234567890123456.7.azuredatabricks.net).
  • Hive 메타스토어 파이프라인을 복제할 때 사용할 고유 식별자인 <pipeline-id>. Delta Live Tables UI에서 파이프라인 ID를 찾을 수 있습니다.

clone-pipeline.json:

{
  "catalog": "<target-catalog-name>",
  "target": "<target-schema-name>",
  "name": "<new-pipeline-name>"
  "clone_mode": "MIGRATE_TO_UC",
  "configuration": {
    "pipelines.migration.ignoreExplicitPath": "true"
  }
}

교체:

  • <target-catalog-name>은 새 파이프라인을 게시할 Unity 카탈로그에서 사용될 카탈로그 이름입니다. 기존 카탈로그여야 합니다.
  • 새 파이프라인이 현재 스키마 이름과 다른 경우 게시해야 하는 Unity 카탈로그의 스키마 이름으로 <target-schema-name>. 이 매개 변수는 선택 사항이며 지정하지 않으면 기존 스키마 이름이 사용됩니다.
  • <new-pipeline-name>에 새 파이프라인을 위한 선택적 이름을 지정하십시오. 지정하지 않으면 [UC] 추가된 원본 파이프라인 이름을 사용하여 새 파이프라인의 이름을 지정합니다.

clone_mode 복제 작업에 사용할 모드를 지정합니다. MIGRATE_TO_UC 유일하게 지원되는 옵션입니다.

configuration 필드를 사용하여 새 파이프라인에 구성을 지정합니다. 여기에 설정된 값은 원래 파이프라인의 구성을 재정의합니다.

clone REST API 요청의 응답은 새 Unity 카탈로그 파이프라인의 파이프라인 ID입니다.

Databricks 노트북에서 파이프라인 복제하기

다음 예제에서는 Python 스크립트에서 create a pipeline 요청을 호출합니다. Databricks Notebook을 사용하여 이 스크립트를 실행할 수 있습니다.

  1. 스크립트에 대한 새 Notebook을 만듭니다. 을(를) 참조하여전자 필기장을 만드세요.
  2. 다음 Python 스크립트를 Notebook의 첫 번째 셀에 복사합니다.
  3. 다음을 대체하여 스크립트의 자리 표시자 값을 업데이트합니다.
    • Azure Databricks <databricks-instance>, 예를 들어 (adb-1234567890123456.7.azuredatabricks.net).
    • Hive 메타스토어 파이프라인을 복제할 때 사용할 고유 식별자인 <pipeline-id>. Delta Live Tables UI에서 파이프라인 ID를 찾을 수 있습니다.
    • <target-catalog-name>은 새 파이프라인을 게시할 Unity 카탈로그에서 사용될 카탈로그 이름입니다. 기존 카탈로그여야 합니다.
    • 새 파이프라인이 현재 스키마 이름과 다른 경우 게시해야 하는 Unity 카탈로그의 스키마 이름으로 <target-schema-name>. 이 매개 변수는 선택 사항이며 지정하지 않으면 기존 스키마 이름이 사용됩니다.
    • <new-pipeline-name>에 새 파이프라인을 위한 선택적 이름을 지정하십시오. 지정하지 않으면 [UC] 추가된 원본 파이프라인 이름을 사용하여 새 파이프라인의 이름을 지정합니다.
  4. 스크립트를 실행합니다. Databricks Notebook 실행참조하세요.
import requests

# Your Databricks workspace URL, with no trailing spaces
WORKSPACE = "<databricks-instance>"

# The pipeline ID of the Hive metastore pipeline to clone
SOURCE_PIPELINE_ID = "<pipeline-id>"
# The target catalog name in Unity Catalog
TARGET_CATALOG = "<target-catalog-name>"
# (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used
TARGET_SCHEMA = "<target-schema-name>"
# (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]"
CLONED_PIPELINE_NAME = "<new-pipeline-name>"

# This is the only supported clone mode
CLONE_MODE = "MIGRATE_TO_UC"

# Specify override configurations
OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"}

def get_token():
    ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    return getattr(ctx, "apiToken")().get()

def check_source_pipeline_exists():
    data = requests.get(
        f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}",
        headers={"Authorization": f"Bearer {get_token()}"},
    )

    assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!"

def request_pipeline_clone():
    payload = {
      "catalog": TARGET_CATALOG,
      "clone_mode": CLONE_MODE,
    }
    if TARGET_SCHEMA != "":
      payload["target"] = TARGET_SCHEMA
    if CLONED_PIPELINE_NAME != "":
      payload["name"] = CLONED_PIPELINE_NAME
    if OVERRIDE_CONFIGS:
      payload["configuration"] = OVERRIDE_CONFIGS

    data = requests.post(
        f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone",
        headers={"Authorization": f"Bearer {get_token()}"},
        json=payload,
    )
    response = data.json()
    return response

check_source_pipeline_exists()
request_pipeline_clone()

제한

다음은 Delta Live Tables clone a pipeline API 요청의 제한 사항입니다.

  • Hive 메타스토어를 사용하도록 구성된 파이프라인에서 Unity 카탈로그 파이프라인으로의 복제만 지원됩니다.
  • 복제하는 파이프라인과 동일한 Azure Databricks 작업 영역에서만 복제본을 만들 수 있습니다.
  • 복제하려는 파이프라인에는 다음 스트리밍 원본만 포함될 수 있습니다.
  • 복제 중인 Hive 메타스토어 파이프라인이 자동 로더 파일 알림 모드를 사용하는 경우 Databricks는 복제 후 Hive 메타스토어 파이프라인을 실행하지 않는 것이 좋습니다. Hive 메타스토어 파이프라인을 실행하면 Unity 카탈로그 클론에서 일부 파일 알림 이벤트가 삭제되었기 때문입니다. 복제 작업이 완료된 후 원본 Hive 메타스토어 파이프라인이 실행되는 경우 cloudFiles.backfillInterval 옵션과 함께 자동 로더를 사용하여 누락된 파일을 백필할 수 있습니다. 자동 로더 파일 알림 모드에 대해 더 알고 싶다면 자동 로더 파일 알림 모드란?을 참조하세요. 자동 로더를 사용하여 파일을 백필하는 방법에 대한 자세한 내용은 cloudFiles.backfillInterval을 사용하여 정기적으로 백필 트리거하기일반적인 자동 로더 옵션를 참조하세요.
  • 복제가 진행되는 동안 두 파이프라인에 대해 파이프라인 유지 관리 작업이 자동으로 일시 중지됩니다.
  • 다음은 복제된 Unity 카탈로그 파이프라인의 테이블에 대한 시간 이동 쿼리에 적용됩니다.
    • 테이블 버전이 원래 Hive 메타스토어 관리 개체에 기록된 경우 복제된 Unity Catalog 개체를 쿼리할 때 timestamp_expression 절을 사용하는 시간 이동 쿼리가 정의되지 않습니다.
    • 그러나 테이블 버전이 복제된 Unity 카탈로그 개체에 기록된 경우 timestamp_expression 절을 사용하는 시간 이동 쿼리가 올바르게 작동합니다.
    • 버전이 원래 Hive 메타스토어 관리 개체에 기록된 경우에도 복제된 Unity 카탈로그 개체를 쿼리할 때 version 절을 사용하는 시간 이동 쿼리가 올바르게 작동합니다.
  • Unity 카탈로그와 함께 Delta Live Tables를 사용하는 경우 다른 제한 사항은 Unity 카탈로그 파이프라인 제한참조하세요.
  • Unity 카탈로그 제한 사항은 Unity 카탈로그 제한 사항을 참조하세요.