Partilhar via


Atualizar pipelines para SDK v2

No SDK v2, os "pipelines" são consolidados em trabalhos.

Um trabalho tem um tipo. A maioria dos trabalhos são trabalhos de comando que executam um command, como python main.py. O que é executado em um trabalho é agnóstico a qualquer linguagem de programação, então você pode executar bash scripts, invocar python intérpretes, executar um monte de curl comandos ou qualquer outra coisa.

A pipeline é outro tipo de trabalho, que define trabalhos filhos que podem ter relações de entrada/saída, formando um gráfico acíclico dirigido (DAG).

Para atualizar, você precisará alterar seu código para definir e enviar os pipelines para o SDK v2. O que você executa no trabalho filho não precisa ser atualizado para o SDK v2. No entanto, é recomendável remover qualquer código específico do Azure Machine Learning de seus scripts de treinamento de modelo. Essa separação permite uma transição mais fácil entre local e nuvem e é considerada uma prática recomendada para MLOps maduros. Na prática, isso significa remover azureml.* linhas de código. O registro em log do modelo e o código de rastreamento devem ser substituídos pelo MLflow. Para obter mais informações, consulte como usar o MLflow na v2.

Este artigo fornece uma comparação de cenário(s) no SDK v1 e SDK v2. Nos exemplos a seguir, construiremos três etapas (treinar, pontuar e avaliar) em um trabalho de pipeline fictício. Isso demonstra como criar trabalhos de pipeline usando SDK v1 e SDK v2 e como consumir dados e transferir dados entre as etapas.

Executar um pipeline

  • SDK v1

    # import required libraries
    import os
    import azureml.core
    from azureml.core import (
        Workspace,
        Dataset,
        Datastore,
        ComputeTarget,
        Experiment,
        ScriptRunConfig,
    )
    from azureml.pipeline.steps import PythonScriptStep
    from azureml.pipeline.core import Pipeline
    
    # check core SDK version number
    print("Azure Machine Learning SDK Version: ", azureml.core.VERSION)
    
    # load workspace
    workspace = Workspace.from_config()
    print(
        "Workspace name: " + workspace.name,
        "Azure region: " + workspace.location,
        "Subscription id: " + workspace.subscription_id,
        "Resource group: " + workspace.resource_group,
        sep="\n",
    )
    
    # create an ML experiment
    experiment = Experiment(workspace=workspace, name="train_score_eval_pipeline")
    
    # create a directory
    script_folder = "./src"
    
    # create compute
    from azureml.core.compute import ComputeTarget, AmlCompute
    from azureml.core.compute_target import ComputeTargetException
    
    # Choose a name for your CPU cluster
    amlcompute_cluster_name = "cpu-cluster"
    
    # Verify that cluster does not exist already
    try:
        aml_compute = ComputeTarget(workspace=workspace, name=amlcompute_cluster_name)
        print('Found existing cluster, use it.')
    except ComputeTargetException:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS12_V2',
                                                               max_nodes=4)
        aml_compute = ComputeTarget.create(ws, amlcompute_cluster_name, compute_config)
    
    aml_compute.wait_for_completion(show_output=True)
    
    # define data set
    data_urls = ["wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv"]
    input_ds = Dataset.File.from_files(data_urls)
    
    # define steps in pipeline
    from azureml.data import OutputFileDatasetConfig
    model_output = OutputFileDatasetConfig('model_output')
    train_step = PythonScriptStep(
        name="train step",
        script_name="train.py",
        arguments=['--training_data', input_ds.as_named_input('training_data').as_mount() ,'--max_epocs', 5, '--learning_rate', 0.1,'--model_output', model_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    score_output = OutputFileDatasetConfig('score_output')
    score_step = PythonScriptStep(
        name="score step",
        script_name="score.py",
        arguments=['--model_input',model_output.as_input('model_input'), '--test_data', input_ds.as_named_input('test_data').as_mount(), '--score_output', score_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    eval_output = OutputFileDatasetConfig('eval_output')
    eval_step = PythonScriptStep(
        name="eval step",
        script_name="eval.py",
        arguments=['--scoring_result',score_output.as_input('scoring_result'), '--eval_output', eval_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    # built pipeline
    from azureml.pipeline.core import Pipeline
    
    pipeline_steps = [train_step, score_step, eval_step]
    
    pipeline = Pipeline(workspace = workspace, steps=pipeline_steps)
    print("Pipeline is built.")
    
    pipeline_run = experiment.submit(pipeline, regenerate_outputs=False)
    
    print("Pipeline submitted for execution.")
    
    
  • SDK v2. Link de exemplo completo

    # import required libraries
    from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
    
    from azure.ai.ml import MLClient, Input
    from azure.ai.ml.dsl import pipeline
    
    try:
        credential = DefaultAzureCredential()
        # Check if given credential can get token successfully.
        credential.get_token("https://management.azure.com/.default")
    except Exception as ex:
        # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
        credential = InteractiveBrowserCredential()
    
    # Get a handle to workspace
    ml_client = MLClient.from_config(credential=credential)
    
    # Retrieve an already attached Azure Machine Learning Compute.
    cluster_name = "cpu-cluster"
    print(ml_client.compute.get(cluster_name))
    
    # Import components that are defined with Python function
    with open("src/components.py") as fin:
        print(fin.read())
    
    # You need to install mldesigner package to use command_component decorator.
    # Option 1: install directly
    # !pip install mldesigner
    
    # Option 2: install as an extra dependency of azure-ai-ml
    # !pip install azure-ai-ml[designer]
    
    # import the components as functions
    from src.components import train_model, score_data, eval_model
    
    cluster_name = "cpu-cluster"
    # define a pipeline with component
    @pipeline(default_compute=cluster_name)
    def pipeline_with_python_function_components(input_data, test_data, learning_rate):
        """E2E dummy train-score-eval pipeline with components defined via Python function components"""
    
        # Call component obj as function: apply given inputs & parameters to create a node in pipeline
        train_with_sample_data = train_model(
            training_data=input_data, max_epochs=5, learning_rate=learning_rate
        )
    
        score_with_sample_data = score_data(
            model_input=train_with_sample_data.outputs.model_output, test_data=test_data
        )
    
        eval_with_sample_data = eval_model(
            scoring_result=score_with_sample_data.outputs.score_output
        )
    
        # Return: pipeline outputs
        return {
            "eval_output": eval_with_sample_data.outputs.eval_output,
            "model_output": train_with_sample_data.outputs.model_output,
        }
    
    
    pipeline_job = pipeline_with_python_function_components(
        input_data=Input(
            path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file"
        ),
        test_data=Input(
            path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file"
        ),
        learning_rate=0.1,
    )
    
    # submit job to workspace
    pipeline_job = ml_client.jobs.create_or_update(
        pipeline_job, experiment_name="train_score_eval_pipeline"
    )
    

Mapeamento das principais funcionalidades no SDK v1 e SDK v2

Funcionalidade no SDK v1 Mapeamento aproximado no SDK v2
azureml.pipeline.core.Pipeline azure.ai.ml.dsl.pipeline
OutputDatasetConfig Saída
as_mount do conjunto de dados Entrada
Sequência de passos Dependência de dados

Mapeamento de etapas e tipos de trabalho/componente

etapa no SDK v1 tipo de trabalho no SDK v2 tipo de componente no SDK v2
adla_step Nenhuma Nenhuma
automl_step automl emprego automl
azurebatch_step Nenhuma Nenhuma
command_step command emprego command componente
data_transfer_step Nenhuma None
databricks_step None Nenhuma
estimator_step command emprego command componente
hyper_drive_step sweep emprego Nenhuma
kusto_step None None
module_step Nenhuma command componente
mpi_step command emprego command componente
parallel_run_step Parallel emprego Parallel
python_script_step command emprego command componente
r_script_step command emprego command componente
synapse_spark_step spark emprego spark componente

Pipelines publicados

Depois de ter um pipeline instalado e em execução, você pode publicar um pipeline para que ele seja executado com entradas diferentes. Isso era conhecido como Pipelines Publicados. O Batch Endpoint propõe uma maneira semelhante, porém mais poderosa, de lidar com vários ativos executados sob uma API durável, e é por isso que a funcionalidade Pipelines publicados foi movida para implantações de componentes Pipeline em pontos de extremidade em lote.

Os pontos de extremidade em lote separam a interface (ponto de extremidade) da implementação real (implantação) e permitem que o usuário decida qual implantação serve a implementação padrão do ponto de extremidade. As implantações de componentes de pipeline em pontos de extremidade em lote permitem que os usuários implantem componentes de pipeline em vez de pipelines, que fazem um melhor uso de ativos reutilizáveis para as organizações que procuram simplificar sua prática de MLOps.

A tabela a seguir mostra uma comparação de cada um dos conceitos:

Conceito SDK v1 SDK v2
Ponto de extremidade REST do pipeline para invocação Ponto de extremidade do pipeline Ponto final em lote
Versão específica do pipeline sob o endpoint Pipeline publicado Implantação de componentes de pipeline
Argumentos da Pipeline sobre a invocação Parâmetro do pipeline Entradas de trabalho
Trabalho gerado a partir de um pipeline publicado Trabalho de pipeline Tarefa de lote

Consulte Atualizar pontos de extremidade de pipeline para SDK v2 para obter orientações específicas sobre como migrar para pontos de extremidade em lote.

Para obter mais informações, consulte a documentação aqui: