Creación de una canalización de Unity Catalog mediante la clonación de una canalización de metastore de Hive
Importante
La solicitud de clone a pipeline
de la API REST de Delta Live Tables se encuentra en versión preliminar pública.
En este artículo se describe la solicitud de clone a pipeline
en la API Rest de Databricks y cómo puedes usarla para copiar una canalización existente que se publica en el Metastore de Hive a una nueva canalización que se publica en el Catálogo de Unity. Al llamar a la solicitud de clone a pipeline
, ocurre lo siguiente:
- Copia el código fuente y la configuración de la canalización existente a una nueva canalización, aplicando las modificaciones de configuración que haya especificado.
- Actualiza las definiciones de vistas materializadas y de tablas de transmisión, así como las referencias con los cambios necesarios para que Unity Catalog administre esos objetos.
- Inicia una actualización de canalización para migrar los datos y metadatos existentes, como los puntos de control, para las tablas de streaming de la canalización. Esto permite que esas tablas de streaming reanuden el procesamiento en el mismo punto que la canalización original.
Una vez completada la operación de clonación, tanto las canalizaciones originales como las nuevas se pueden ejecutar de forma independiente.
En este artículo se incluyen ejemplos de llamada a la solicitud de API directamente y a través de un script de Python desde un cuaderno de Databricks.
Antes de empezar
Se requieren lo siguiente antes de clonar una canalización:
Para clonar una canalización de metastore de Hive, las tablas y vistas definidas en la canalización deben publicar tablas en un esquema de destino. Consulta Procedimientos para publicar conjuntos de datos de Delta Live Tables en el metastore de Hive heredado a fin de obtener información sobre cómo agregar un esquema de destino a una canalización.
Las referencias a las tablas o vistas administradas del metastore de Hive, en la canalización que se va a clonar, deben estar completamente calificadas con el catálogo (
hive_metastore
), el esquema y el nombre de la tabla. Por ejemplo, en el código siguiente que crea un conjunto de datos decustomers
, el argumento de nombre de tabla debe actualizarse ahive_metastore.sales.customers
:@dlt.table def customers(): return spark.read.table("sales.customers").where(...)
No edite el código fuente de la canalización de metastore de Hive de origen mientras una operación de clonación está en curso, incluidos los cuadernos configurados como parte de la canalización y los módulos almacenados en carpetas de Git o archivos del área de trabajo.
La canalización de metastore de Hive de origen no debe ejecutarse al iniciar la operación de clonación. Si se está ejecutando una actualización, deténgala o espere a que se complete.
A continuación se muestran otras consideraciones importantes antes de clonar una canalización:
- Si las tablas de la canalización de metastore de Hive especifican una ubicación de almacenamiento mediante el argumento
path
en Python oLOCATION
en SQL, pase la configuración de"pipelines.migration.ignoreExplicitPath": "true"
a la solicitud de clonación. La configuración de esta configuración se incluye en las instrucciones siguientes. - Si la canalización de metastore de Hive incluye un origen de Cargador automático que especifica un valor para la opción
cloudFiles.schemaLocation
y la canalización de metastore de Hive permanece operativa después de crear el clon de Unity Catalog, debes establecer la opciónmergeSchema
entrue
tanto en la canalización de metastore de Hive como en la canalización de Unity Catalog clonada. Agregar esta opción a la canalización del metastore de Hive antes de clonar hará que la opción se copie a la nueva canalización.
Clonación de una canalización con la API REST de Databricks
En el ejemplo siguiente se usa el comando curl
para llamar a la solicitud de clone a pipeline
en la API rest de Databricks:
curl -X POST \
--header "Authorization: Bearer <personal-access-token>" \
<databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
--data @clone-pipeline.json
Reemplazar:
<personal-access-token>
por un token de acceso personal de Databricks.<databricks-instance>
con el nombre de instancia del área de trabajo de Azure Databricks , por ejemploadb-1234567890123456.7.azuredatabricks.net
<pipeline-id>
por el identificador único de la canalización de metastore de Hive que se va a clonar. Puedes encontrar el id. de canalización en la interfaz de usuario de Delta Live Tables.
clone-pipeline.json:
{
"catalog": "<target-catalog-name>",
"target": "<target-schema-name>",
"name": "<new-pipeline-name>"
"clone_mode": "MIGRATE_TO_UC",
"configuration": {
"pipelines.migration.ignoreExplicitPath": "true"
}
}
Reemplazar:
<target-catalog-name>
por el nombre de un catálogo en Unity Catalog, en el que se debe publicar la nueva canalización. Debe ser un catálogo existente.<target-schema-name>
por el nombre de un esquema en el Unity Catalog en el que la nueva canalización se debe publicar si es diferente al del esquema actual. Este parámetro es opcional y, si no se especifica, se usa el nombre de esquema existente.<new-pipeline-name>
con un nombre opcional para la nueva canalización. Si no se especifica, la nueva canalización se denomina mediante el nombre de canalización de origen con[UC]
anexado.
clone_mode
especifica el modo que se va a usar para la operación de clonación. MIGRATE_TO_UC
es la única opción admitida.
Use el campo configuration
para especificar configuraciones en la nueva canalización. Los valores establecidos aquí invalidan las configuraciones de la canalización original.
La respuesta de la solicitud de API REST de clone
es el id. de canalización de la nueva canalización de Unity Catalog.
Clonación de una canalización desde un cuaderno de Databricks
En el ejemplo siguiente se llama a la solicitud create a pipeline
desde un script de Python. Puede usar un cuaderno de Databricks para ejecutar este script:
- Cree un cuaderno de notas para el script. Consulta Creación de un cuaderno.
- Copie el siguiente script de Python en la primera celda del cuaderno.
- Actualiza los valores de marcador de posición en el script reemplazando:
<databricks-instance>
con el nombre de instancia de área de trabajo de Azure Databricks; por ejemplo,adb-1234567890123456.7.azuredatabricks.net
.<pipeline-id>
por el identificador único de la canalización de metastore de Hive que se va a clonar. Puedes encontrar el id. de canalización en la interfaz de usuario de Delta Live Tables.<target-catalog-name>
por el nombre de un catálogo en Unity Catalog, en el que se debe publicar la nueva canalización. Debe ser un catálogo existente.<target-schema-name>
por el nombre de un esquema en el Unity Catalog en el que la nueva canalización se debe publicar si es diferente al del esquema actual. Este parámetro es opcional y, si no se especifica, se usa el nombre de esquema existente.<new-pipeline-name>
con un nombre opcional para la nueva canalización. Si no se especifica, la nueva canalización se denomina mediante el nombre de canalización de origen con[UC]
anexado.
- Ejecute el script. Consulta Ejecución de cuadernos de Databricks.
import requests
# Your Databricks workspace URL, with no trailing spaces
WORKSPACE = "<databricks-instance>"
# The pipeline ID of the Hive metastore pipeline to clone
SOURCE_PIPELINE_ID = "<pipeline-id>"
# The target catalog name in Unity Catalog
TARGET_CATALOG = "<target-catalog-name>"
# (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used
TARGET_SCHEMA = "<target-schema-name>"
# (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]"
CLONED_PIPELINE_NAME = "<new-pipeline-name>"
# This is the only supported clone mode in this preview
CLONE_MODE = "MIGRATE_TO_UC"
# Specify override configurations
OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"}
def get_token():
ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
return getattr(ctx, "apiToken")().get()
def check_source_pipeline_exists():
data = requests.get(
f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}",
headers={"Authorization": f"Bearer {get_token()}"},
)
assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!"
def request_pipeline_clone():
payload = {
"catalog": TARGET_CATALOG,
"clone_mode": CLONE_MODE,
}
if TARGET_SCHEMA != "":
payload["target"] = TARGET_SCHEMA
if CLONED_PIPELINE_NAME != "":
payload["name"] = CLONED_PIPELINE_NAME
if OVERRIDE_CONFIGS:
payload["configuration"] = OVERRIDE_CONFIGS
data = requests.post(
f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone",
headers={"Authorization": f"Bearer {get_token()}"},
json=payload,
)
response = data.json()
return response
check_source_pipeline_exists()
request_pipeline_clone()
Limitaciones
A continuación se muestran las limitaciones de la solicitud de API delta Live Tables clone a pipeline
:
- Solo se admite la clonación desde una canalización configurada para usar el metastore de Hive en una canalización de Unity Catalog.
- Puedes crear un clon solo en la misma área de trabajo de Azure Databricks desde la que se va a clonar la canalización.
- La tubería que estás clonando solo puede incluir las siguientes fuentes de streaming:
- Fuentes de Delta
- Cargador automático, incluidos los orígenes de datos que admite el Cargador automático. Consulte Carga de archivos desde el almacenamiento de objetos en la nube.
- Apache Kafka con streaming estructurado. Sin embargo, el origen de Kafka no se puede configurar para usar la opción
kafka.group.id
. Consulta Procesamiento de flujos con Apache Kafka y Azure Databricks. - Amazon Kinesis con Transmisión Estructurada. Pero no se puede configurar el origen de Kinesis para establecer
consumerMode
enefo
.
- Si la canalización de metastore de Hive que va a clonar usa el modo de notificación de archivos del Cargador Automático, Databricks recomienda no ejecutar la canalización de metastore de Hive después de la clonación. Esto se debe a que la ejecución de la canalización de metastore de Hive conlleva la eliminación de algunos eventos de notificación de archivos del clon de Unity Catalog. Si la canalización de metastore de Hive de origen se ejecuta después completarse la operación de clonación, puedes reponer los archivos que faltan mediante el Cargador automático con la opción
cloudFiles.backfillInterval
. Para obtener información sobre el modo de notificación de archivos del cargador automático, consulte ¿Qué es el modo de notificación de archivos del cargador automático?. Para obtener información sobre cómo rellenar archivos con Auto Loader, consulte Inicie reposiciones regulares utilizando cloudFiles.backfillInterval y Opciones comunes de Auto Loader. - Las tareas de mantenimiento de canalización se pausan automáticamente para ambas canalizaciones mientras la clonación está en curso.
- Lo siguiente se aplica a las consultas de viaje en el tiempo en las tablas de la canalización clonada de Unity Catalog:
- Si una versión de tabla se escribió originalmente en un objeto administrado de metastore de Hive, las consultas de viaje en el tiempo que usan una cláusula
timestamp_expression
no están definidas al consultar el objeto de Unity Catalog clonado. - Sin embargo, si la versión de la tabla se escribió en el objeto del Catálogo de Unity clonado, las consultas de retroceso en el tiempo que usan una cláusula
timestamp_expression
funcionan correctamente. - Las consultas temporales que usan una cláusula
version
funcionan correctamente al consultar un objeto del Catálogo de Unity clonado, incluso cuando la versión se escribió originalmente en el objeto gestionado por el metastore de Hive.
- Si una versión de tabla se escribió originalmente en un objeto administrado de metastore de Hive, las consultas de viaje en el tiempo que usan una cláusula
- Para ver otras limitaciones al usar Delta Live Tables con Unity Catalog, consulta Limitaciones de canalización de Unity Catalog.
- Para conocer las limitaciones de Unity Catalog, consulta Limitaciones de Unity Catalog.