Udostępnij za pośrednictwem


Utwórz potok katalogu Unity poprzez sklonowanie potoku metastore Hive

W tym artykule opisano żądanie clone a pipeline interfejsu API REST usługi Databricks oraz jak go użyć do skopiowania istniejącego potoku publikowanego w metasklepie Hive do nowego potoku publikowanego w katalogu Unity. Po wywołaniu żądania clone a pipeline, ono:

  • Kopiuje kod źródłowy i konfigurację z istniejącego potoku do nowego, stosując wszelkie określone nadpisania konfiguracji.
  • Aktualizuje definicje i odwołania zmaterializowanych widoków oraz tabel przesyłania strumieniowego, wprowadzając wymagane zmiany, tak aby tymi obiektami mógł zarządzać Unity Catalog.
  • Uruchamia aktualizację potoku w celu migracji istniejących danych i metadanych, takich jak punkty kontrolne, dla wszystkich tabel przesyłania strumieniowego w potoku. Dzięki temu te tabele przesyłania strumieniowego mogą wznowić przetwarzanie w tym samym punkcie co oryginalny potok.

Po zakończeniu operacji klonowania zarówno oryginalne, jak i nowe potoki mogą być uruchamiane niezależnie.

Ten artykuł zawiera przykłady bezpośredniego wywoływania żądania interfejsu API oraz za pomocą skryptu Pythona z notatnika Databricks.

Przed rozpoczęciem

Przed sklonowaniem potoku wymagane są następujące elementy:

  • Aby sklonować pipeline magazynu metadanych Hive, tabele i widoki zdefiniowane w pipeline muszą publikować tabele do docelowego schematu. Aby dowiedzieć się, jak dodać schemat docelowy do potoku, zobacz Configure a pipeline to publish to Hive metastore.

  • Odwołania do zarządzanych tabel lub widoków Hive metastore w potoku klonowania muszą być w pełni kwalifikowane z wykazem (hive_metastore), schematem i nazwą tabeli. Na przykład w poniższym kodzie tworzącym zestaw danych customers argument nazwy tabeli musi zostać zaktualizowany na hive_metastore.sales.customers:

    @dlt.table
    def customers():
      return spark.read.table("sales.customers").where(...)
    
  • Nie edytuj kodu źródłowego w źródłowym potoku magazynu metadanych Hive, gdy trwa operacja klonowania, w tym notebooki skonfigurowane w ramach potoku i wszystkie moduły przechowywane w folderach Git lub plikach obszaru roboczego.

  • Źródłowy potok magazynu metadanych Hive nie może być uruchomiony przy rozpoczynaniu operacji klonowania. Jeśli aktualizacja jest uruchomiona, zatrzymaj ją lub zaczekaj na jej ukończenie.

Oto inne ważne zagadnienia przed sklonowaniem pipeline’u:

  • Jeśli tabele w potoku magazynu metadanych Hive określ lokalizację magazynu przy użyciu argumentu path w języku Python lub LOCATION w języku SQL, przekaż konfigurację "pipelines.migration.ignoreExplicitPath": "true" do żądania klonowania. Ustawienie tej konfiguracji jest zawarte w poniższych instrukcjach.
  • Jeśli potok magazynu metadanych Hive zawiera źródło Auto Loader, które określa wartość opcji cloudFiles.schemaLocation, a potok magazynu metadanych Hive pozostanie operacyjny po utworzeniu klonu Unity Catalog, należy ustawić opcję mergeSchema na true zarówno w potoku magazynu metadanych Hive, jak i w sklonowanym potoku Unity Catalog. Dodanie tej opcji do potoku magazynu metadanych Hive przed sklonowaniem spowoduje skopiowanie opcji do nowego potoku.

Klonowanie pipeline'u za pomocą interfejsu API REST usługi Databricks

W poniższym przykładzie użyto polecenia curl w celu wywołania żądania clone a pipeline w interfejsie API REST usługi Databricks:

curl -X POST \
     --header "Authorization: Bearer <personal-access-token>"  \
     <databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
     --data @clone-pipeline.json

Zastąpić:

  • <personal-access-token> z osobistym tokenem dostępu Databricks .
  • <databricks-instance> z nazwą wystąpienia obszaru roboczego usługi Azure Databricks , na przykład adb-1234567890123456.7.azuredatabricks.net
  • <pipeline-id> z unikalnym identyfikatorem potoku danych metastore Hive do sklonowania. Identyfikator pipeline można znaleźć w interfejsie użytkownika DLT.

clone-pipeline.json:

{
  "catalog": "<target-catalog-name>",
  "target": "<target-schema-name>",
  "name": "<new-pipeline-name>"
  "clone_mode": "MIGRATE_TO_UC",
  "configuration": {
    "pipelines.migration.ignoreExplicitPath": "true"
  }
}

Zastąpić:

  • <target-catalog-name> z nazwą katalogu w katalogu Unity, do którego powinien zostać opublikowany nowy potok danych. Musi to być istniejący wykaz.
  • <target-schema-name> z nazwą schematu w katalogu Unity, do którego nowy pipeline powinien zostać opublikowany, jeśli jest inny niż bieżąca nazwa schematu. Ten parametr jest opcjonalny i jeśli nie zostanie określony, używana jest istniejąca nazwa schematu.
  • <new-pipeline-name> z opcjonalną nazwą nowego pipeline'u. Jeśli nie zostanie określona nazwa, nowy potok otrzymuje nazwę przy użyciu nazwy potoku źródłowego z dodatkiem [UC].

clone_mode określa tryb do użycia dla operacji klonowania. MIGRATE_TO_UC jest jedyną obsługiwaną opcją.

Użyj pola configuration, aby określić konfiguracje w nowym potoku. Wartości ustawione tutaj zastępują konfiguracje w oryginalnym potoku.

Odpowiedź z żądania interfejsu API REST clone to identyfikator nowego potoku katalogu Unity.

Sklonuj pipeline z notebooka Databricks

Poniższy przykład wywołuje żądanie create a pipeline ze skryptu języka Python. Możesz użyć notesu Databricks, aby uruchomić ten skrypt.

  1. Utwórz nowy notes dla skryptu. Zobacz Tworzenie notesu.
  2. Skopiuj następujący skrypt języka Python do pierwszej komórki notesu.
  3. Zaktualizuj wartości symboli zastępczych w skrypcie, zastępując następujące wartości:
    • <databricks-instance> z nazwą wystąpienia obszaru roboczego Azure Databricks , na przykład adb-1234567890123456.7.azuredatabricks.net
    • <pipeline-id> z unikatowym identyfikatorem potoku magazynu metadanych Hive do sklonowania. Identyfikator linii przetwarzania można znaleźć w interfejsie użytkownika DLT.
    • <target-catalog-name> z nazwą katalogu w Unity Catalog, do którego nowy potok powinien być opublikowany. Musi to być istniejący wykaz.
    • <target-schema-name> z nazwą schematu w Unity Catalog, do którego należy opublikować nowy potok, jeśli różni się od bieżącej nazwy schematu. Ten parametr jest opcjonalny i jeśli nie zostanie określony, używana jest istniejąca nazwa schematu.
    • <new-pipeline-name> z opcjonalną nazwą nowego potoku. Jeśli nie zostanie określony, nowy potok ma nazwę na podstawie nazwy potoku źródłowego z dołączonym [UC].
  4. Uruchom skrypt. Zobacz Uruchamianie notesów usługi Databricks.
import requests

# Your Databricks workspace URL, with no trailing spaces
WORKSPACE = "<databricks-instance>"

# The pipeline ID of the Hive metastore pipeline to clone
SOURCE_PIPELINE_ID = "<pipeline-id>"
# The target catalog name in Unity Catalog
TARGET_CATALOG = "<target-catalog-name>"
# (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used
TARGET_SCHEMA = "<target-schema-name>"
# (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]"
CLONED_PIPELINE_NAME = "<new-pipeline-name>"

# This is the only supported clone mode
CLONE_MODE = "MIGRATE_TO_UC"

# Specify override configurations
OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"}

def get_token():
    ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    return getattr(ctx, "apiToken")().get()

def check_source_pipeline_exists():
    data = requests.get(
        f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}",
        headers={"Authorization": f"Bearer {get_token()}"},
    )

    assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!"

def request_pipeline_clone():
    payload = {
      "catalog": TARGET_CATALOG,
      "clone_mode": CLONE_MODE,
    }
    if TARGET_SCHEMA != "":
      payload["target"] = TARGET_SCHEMA
    if CLONED_PIPELINE_NAME != "":
      payload["name"] = CLONED_PIPELINE_NAME
    if OVERRIDE_CONFIGS:
      payload["configuration"] = OVERRIDE_CONFIGS

    data = requests.post(
        f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone",
        headers={"Authorization": f"Bearer {get_token()}"},
        json=payload,
    )
    response = data.json()
    return response

check_source_pipeline_exists()
request_pipeline_clone()

Ograniczenia

Poniżej przedstawiono ograniczenia żądania API clone a pipeline DLT:

  • Obsługiwane jest tylko klonowanie z potoku skonfigurowanego do używania metastore Hive do potoku Unity Catalog.
  • Klon można utworzyć tylko w tym samym obszarze roboczym usługi Azure Databricks co potok, z którego klonujesz.
  • Klonowany przepływ danych może zawierać tylko następujące źródła przesyłania strumieniowego:
  • Jeśli potok magazynu metadanych Hive, który klonujesz, używa trybu powiadamiania o plikach w Auto Loaderze, Databricks zaleca, aby po sklonowaniu nie uruchamiać tego potoku. Dzieje się tak dlatego, że uruchomienie potoku metadanych Hive skutkuje pominięciem niektórych zdarzeń powiadomień dotyczących plików z klonu Unity Catalog. Jeśli źródłowy potok metadanych Hive zostanie uruchomiony po zakończeniu operacji klonowania, możesz uzupełnić brakujące pliki przy użyciu Auto Loader z opcją cloudFiles.backfillInterval. Aby dowiedzieć się więcej na temat trybu powiadomień pliku w module automatycznego ładowania, zobacz Co to jest tryb powiadomień pliku automatycznego ładowania?. Aby dowiedzieć się więcej na temat uzupełniania plików za pomocą Auto Loader, zobacz Wyzwalanie regularnego uzupełniania przy użyciu cloudFiles.backfillInterval i Typowe opcje Auto Loader.
  • Zadania konserwacji potoku są automatycznie wstrzymywane dla obu potoków podczas klonowania.
  • Poniższe zasady dotyczą zapytań o podróże w czasie dla tabel w sklonowanym pipelinie Unity Catalog.
    • Jeśli wersja tabeli została pierwotnie zapisana w obiekcie zarządzanym metadanych Hive, zapytania dotyczące podróży w czasie przy użyciu klauzuli timestamp_expression są niezdefiniowane podczas wykonywania zapytań względem skopiowanego obiektu Unity Catalog.
    • Jednakże, jeśli wersja tabeli została zapisana w sklonowanym obiekcie Katalogu Unity, zapytania punktu w czasie przy użyciu klauzuli timestamp_expression działają poprawnie.
    • Zapytania podróży w czasie przy użyciu klauzuli version działają poprawnie podczas wykonywania zapytań względem sklonowanego obiektu Unity Catalog, nawet jeśli wersja została pierwotnie zapisana w obiekcie zarządzanym przez magazyn metadanych Hive.
  • Aby zapoznać się z innymi ograniczeniami dotyczącymi używania DLT z Unity Catalog, zobacz Ograniczenia potoku w Unity Catalog.
  • Aby zapoznać się z ograniczeniami katalogu Unity, zobacz Ograniczenia katalogu Unity.