Compartir a través de


Cómo usar Apache Spark (con tecnología de Azure Synapse Analytics) en la canalización de aprendizaje automático (en desuso)

SE APLICA A: Azure ML del SDK de Python v1

Advertencia

La integración de Azure Synapse Analytics con Azure Machine Learning, disponible en el SDK de Python v1, está en desuso. Los usuarios todavía pueden usar el área de trabajo de Synapse, registrada con Azure Machine Learning como servicio vinculado. Sin embargo, ya no se puede registrar una nueva área de trabajo de Synapse con Azure Machine Learning como servicio vinculado. Se recomienda usar el proceso de Spark sin servidor y los grupos de Synapse Spark conectados, disponibles en la CLI v2 y el SDK de Python v2. Para más información, visite https://aka.ms/aml-spark.

En este artículo, aprenderá a usar grupos de Apache Spark, con tecnología de Azure Synapse Analytics como destino de proceso en un paso de preparación de datos en una canalización de Azure Machine Learning. Aprenderá cómo una sola canalización puede usar recursos de proceso adecuados para el paso concreto, por ejemplo, la preparación de datos o el entrenamiento. También aprenderá a preparar los datos para el paso de Spark y cómo pasa al siguiente paso.

Requisitos previos

Los grupos de Apache Spark se crean y administran en las áreas de trabajo de Azure Synapse Analytics. Para integrar un grupo de Apache Spark en un área de trabajo de Azure Machine Learning, debe crear un vínculo al área de trabajo de Azure Synapse Analytics. Una vez que vincule el área de trabajo de Azure Machine Learning y las áreas de trabajo de Azure Synapse Analytics, puede asociar un grupo de Apache Spark con

  • Estudio de Azure Machine Learning

  • SDK de Python, como se explica más adelante

  • Plantilla de Azure Resource Manager (ARM). Para obtener más información, visite Ejemplo de plantilla de ARM

    • Puede usar la línea de comandos para seguir la plantilla de ARM, agregar el servicio vinculado y conectar el grupo de Apache Spark con este ejemplo de código:
    az deployment group create --name --resource-group <rg_name> --template-file "azuredeploy.json" --parameters @"azuredeploy.parameters.json"
    

Importante

Para vincular correctamente el área de trabajo de Synapse, debe tener el rol Propietario en ella. Compruebe el acceso en Azure Portal.

El servicio vinculado recibirá una identidad asignada por el sistema (SAI) en el momento de la creación. A esta SAI del servicio vinculado se le debe asignar el rol "Administrador de Apache Spark de Synapse" desde Synapse Studio, para que pueda enviar el trabajo de Spark (consulte Administración de asignaciones de roles de RBAC de Synapse en Synapse Studio).

También debe proporcionar al usuario del área de trabajo de Azure Machine Learning el rol "colaborador" desde el portal de administración de recursos de Azure.

Este código muestra cómo recuperar servicios vinculados en el área de trabajo:

from azureml.core import Workspace, LinkedService, SynapseWorkspaceLinkedServiceConfiguration

ws = Workspace.from_config()

for service in LinkedService.list(ws) : 
    print(f"Service: {service}")

# Retrieve a known linked service
linked_service = LinkedService.get(ws, 'synapselink1')

En primer lugar, Workspace.from_config() accede al área de trabajo de Azure Machine Learning mediante la configuración del archivo config.json. (Para obtener más información, visite Crear un archivo de configuración del área de trabajo). A continuación, el código imprime todos los servicios vinculados disponibles en el área de trabajo. Finalmente, LinkedService.get() recupera un servicio vinculado denominado 'synapselink1'.

Conexión del grupo de Apache Spark como destino de proceso de Azure Machine Learning

Para usar el grupo de Apache Spark para realizar un paso de la canalización de aprendizaje automático, debe conectarlo como objeto ComputeTarget en el paso de canalización, tal y como se muestra en este ejemplo de código:

from azureml.core.compute import SynapseCompute, ComputeTarget

attach_config = SynapseCompute.attach_configuration(
        linked_service = linked_service,
        type="SynapseSpark",
        pool_name="spark01") # This name comes from your Synapse workspace

synapse_compute=ComputeTarget.attach(
        workspace=ws,
        name='link1-spark01',
        attach_configuration=attach_config)

synapse_compute.wait_for_completion()

El código primero configura el SynapseCompute. El argumento linked_service es el objeto LinkedService que creó o recuperó en el paso anterior. El argumento type debe ser SynapseSpark. El argumento pool_name de SynapseCompute.attach_configuration() debe coincidir con el de un grupo existente en el área de trabajo de Azure Synapse Analytics. Para más información sobre la creación de un grupo de Apache Spark en el área de trabajo de Azure Synapse Analytics, visite Inicio rápido: creación de un grupo de Apache Spark sin servidor mediante Synapse Studio. El tipo attach_config es ComputeTargetAttachConfiguration.

Después de crear la configuración, cree un ComputeTarget de aprendizaje automático pasando el Workspace, los valores de ComputeTargetAttachConfiguration y el nombre por el que desea hacer referencia al proceso dentro del área de trabajo de Machine Learning. La llamada a ComputeTarget.attach() es asincrónica, por lo que el ejemplo se bloquea hasta que se completa la llamada.

Creación de un elemento SynapseSparkStep que usa el grupo de Apache Spark vinculado

El trabajo de Spark del grupo de Apache Spark del cuaderno de ejemplo define una canalización de aprendizaje automático simple. En primer lugar, el cuaderno define un paso de preparación de datos, con tecnología del synapse_compute definido en el paso anterior. A continuación, el cuaderno define un paso de entrenamiento con tecnología de un destino de proceso más adecuado para el entrenamiento. El cuaderno de ejemplo usa la base de datos de supervivencia del Titanic para mostrar la entrada y salida de los datos. Realmente no limpia los datos ni crea un modelo predictivo. Dado que este ejemplo no implica realmente el entrenamiento, el paso de entrenamiento usa un recurso de proceso económico basado en CPU.

Los datos fluyen a una canalización de aprendizaje automático a través de objetos DatasetConsumptionConfig, que pueden contener conjuntos de archivos o datos tabulares. Los datos a menudo proceden de archivos de Blob Storage en un almacén de datos del área de trabajo. Este ejemplo de código muestra el código típico que crea la entrada para una canalización de aprendizaje automático:

from azureml.core import Dataset

datastore = ws.get_default_datastore()
file_name = 'Titanic.csv'

titanic_tabular_dataset = Dataset.Tabular.from_delimited_files(path=[(datastore, file_name)])
step1_input1 = titanic_tabular_dataset.as_named_input("tabular_input")

# Example only: it wouldn't make sense to duplicate input data, especially one as tabular and the other as files
titanic_file_dataset = Dataset.File.from_files(path=[(datastore, file_name)])
step1_input2 = titanic_file_dataset.as_named_input("file_input").as_hdfs()

En el ejemplo de código se supone que el archivo Titanic.csv está en el almacenamiento de blobs. El código muestra cómo leer ambos archivos como un TabularDataset y un FileDataset. Este código solo sirve como demostración, ya que sería confuso duplicar las entradas o interpretar un único origen de datos de dos maneras: como un recurso que contiene la tabla y simplemente como un archivo.

Importante

Para usar un FileDataset como entrada, necesita una versión azureml-core de al menos 1.20.0. Puede especificar esto con la clase Environment, como se describe más adelante. Cuando se complete un paso, puede almacenar los datos de salida, como se muestra en este ejemplo de código:

from azureml.data import HDFSOutputDatasetConfig
step1_output = HDFSOutputDatasetConfig(destination=(datastore,"test")).register_on_complete(name="registered_dataset")

En este ejemplo de código, el datastore almacenaría los datos en un archivo denominado test. Los datos estarían disponibles en el área de trabajo de Machine Learning como un Dataset con el nombre registered_dataset.

Además de los datos, un paso de canalización puede tener dependencias de Python por paso. Además, los objetos SynapseSparkStep individuales también pueden especificar su configuración precisa de Azure Synapse Apache Spark. Para mostrar esto, el siguiente ejemplo de código especifica que la versión del paquete de azureml-core debe ser al menos 1.20.0. Como se mencionó anteriormente, este requisito del paquete de azureml-core es necesario para usar FileDataset como entrada.

from azureml.core.environment import Environment
from azureml.pipeline.steps import SynapseSparkStep

env = Environment(name="myenv")
env.python.conda_dependencies.add_pip_package("azureml-core>=1.20.0")

step_1 = SynapseSparkStep(name = 'synapse-spark',
                          file = 'dataprep.py',
                          source_directory="./code", 
                          inputs=[step1_input1, step1_input2],
                          outputs=[step1_output],
                          arguments = ["--tabular_input", step1_input1, 
                                       "--file_input", step1_input2,
                                       "--output_dir", step1_output],
                          compute_target = 'link1-spark01',
                          driver_memory = "7g",
                          driver_cores = 4,
                          executor_memory = "7g",
                          executor_cores = 2,
                          num_executors = 1,
                          environment = env)

Este código especifica un solo paso en la canalización de Azure Machine Learning. El valor environment de este código establece una versión de azureml-core específica y el código puede agregar otras dependencias de conda o pip según sea necesario.

El SynapseSparkStep comprime y carga el subdirectorio ./code desde el equipo local. Ese directorio se vuelve a crear en el servidor de proceso y el paso ejecuta el script dataprep.py desde ese directorio. Los inputs y outputs de ese paso son los objetos step1_input1, step1_input2 y step1_output descritos anteriormente. La forma más sencilla de acceder a esos valores dentro del script dataprep.py es asociarlos con el parámetro arguments con nombre.

El siguiente conjunto de argumentos para los constructores SynapseSparkStep controla Apache Spark. El parámetro compute_target es el objeto 'link1-spark01' que asociamos anteriormente como destino de proceso. Los otros parámetros especifican la memoria y los núcleos que nos gustaría usar.

El cuaderno de ejemplo usa este código para dataprep.py:

import os
import sys
import azureml.core
from pyspark.sql import SparkSession
from azureml.core import Run, Dataset

print(azureml.core.VERSION)
print(os.environ)

import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--tabular_input")
parser.add_argument("--file_input")
parser.add_argument("--output_dir")
args = parser.parse_args()

# use dataset sdk to read tabular dataset
run_context = Run.get_context()
dataset = Dataset.get_by_id(run_context.experiment.workspace,id=args.tabular_input)
sdf = dataset.to_spark_dataframe()
sdf.show()

# use hdfs path to read file dataset
spark= SparkSession.builder.getOrCreate()
sdf = spark.read.option("header", "true").csv(args.file_input)
sdf.show()

sdf.coalesce(1).write\
.option("header", "true")\
.mode("append")\
.csv(args.output_dir)

Este script de "preparación de datos" no realiza ninguna transformación de datos real, pero muestra cómo recuperarlos, convertirlos en una trama de datos de Spark y manipularlos de forma básica en Apache Spark. Para buscar la salida en el estudio de Azure Machine Learning, abra el trabajo secundario, elija la pestaña Salidas y registros y abra el archivo logs/azureml/driver/stdout, como se muestra en esta captura de pantalla:

Captura de pantalla de Studio que muestra la pestaña stdout de un trabajo secundario

Uso de SynapseSparkStep en una canalización

En el siguiente ejemplo se usa la salida de SynapseSparkStep creada en la sección anterior. Otros pasos de la canalización pueden tener sus propios entornos únicos y pueden ejecutarse en distintos recursos de proceso adecuados para la tarea en cuestión. El cuaderno de ejemplo ejecuta el "paso de entrenamiento" en un clúster de CPU pequeño:

from azureml.core.compute import AmlCompute

cpu_cluster_name = "cpucluster"

if cpu_cluster_name in ws.compute_targets:
    cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
    print('Found existing cluster, use it.')
else:
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D2_V2', max_nodes=1)
    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)
    print('Allocating new CPU compute cluster')

cpu_cluster.wait_for_completion(show_output=True)

step2_input = step1_output.as_input("step2_input").as_download()

step_2 = PythonScriptStep(script_name="train.py",
                          arguments=[step2_input],
                          inputs=[step2_input],
                          compute_target=cpu_cluster_name,
                          source_directory="./code",
                          allow_reuse=False)

Este código crea el nuevo recurso de proceso si es necesario. A continuación, convierte el resultado de step1_output en la entrada del paso de entrenamiento. La opción as_download() significa que los datos se mueven al recurso de proceso, lo que da lugar a un acceso más rápido. Si los datos eran tan grandes que no encajaban en el disco duro de proceso local, tendría que usar la opción de as_mount() para transmitir los datos con el sistema de archivos FUSE. El parámetro compute_target de este segundo paso es 'cpucluster', no el recurso 'link1-spark01' que usó en el paso de preparación de datos. En este paso se usa un programa sencillo, el script train.py, en lugar del script dataprep.py que usó en el paso anterior. El cuaderno de ejemplo tiene detalles del script train.py.

Después de definir todos los pasos, puede crear y ejecutar la canalización.

from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[step_1, step_2])
pipeline_run = pipeline.submit('synapse-pipeline', regenerate_outputs=True)

Este código crea una canalización que consta del paso de preparación de datos en grupos de Apache Spark con tecnología de Azure Synapse Analytics (step_1) y el paso de entrenamiento (step_2). Azure examina las dependencias de datos entre los pasos para calcular el gráfico de ejecución. En este caso, solo hay una dependencia sencilla. Aquí, step2_input necesariamente requiere step1_output.

La llamada pipeline.submit crea, si es necesario, un experimento denominado synapse-pipeline e inicia asincrónicamente un trabajo dentro de él. Los pasos individuales dentro de la canalización se ejecutan como trabajos secundarios de este trabajo principal y la página Experimentos de Studio puede supervisar y revisar esos pasos.

Pasos siguientes