Compartilhar via


Atualizar pipelines para o SDK v2

No SDK v2, "pipelines" são consolidados em trabalhos.

Um trabalho tem um tipo. A maioria dos trabalhos são trabalhos de comando que executam um command, como python main.py. O que é executado em um trabalho é independente de qualquer linguagem de programação, ou seja, você possa executar scripts bash, invocar interpretadores python, executar vários comandos curl ou qualquer outra coisa.

Outro tipo de trabalho é pipeline, que define os trabalhos filho que podem ter relações de entrada/saída, formando um DAG (grafo direcionado acíclico).

Para fazer upgrade, você precisará alterar o código para definir e enviar os pipelines para o SDK v2. O que você executa dentro do trabalho filho não precisa de upgrade para o SDK v2. No entanto, recomendamos remover qualquer código específico do Azure Machine Learning dos scripts de treinamento de modelo. Essa separação permite uma transição mais fácil entre o local e a nuvem e é considerada a melhor prática para um MLOps maduro. Na prática, isso significa remover as linhas de código azureml.*. O log e o código de acompanhamento do modelo devem ser substituídos pelo MLflow. Para obter mais informações, confira Como usar o MLflow na v2.

Este artigo fornece uma comparação de cenários no SDK v1 e no SDK v2. Nos exemplos a seguir, criaremos três etapas (treinar, pontuar e avaliar) em um trabalho de pipeline fictício. Isso demonstra como criar trabalhos de pipeline usando o SDK v1 e o SDK v2 e como consumir dados e transferi-los entre as etapas.

Executar um pipeline

  • SDK v1

    # import required libraries
    import os
    import azureml.core
    from azureml.core import (
        Workspace,
        Dataset,
        Datastore,
        ComputeTarget,
        Experiment,
        ScriptRunConfig,
    )
    from azureml.pipeline.steps import PythonScriptStep
    from azureml.pipeline.core import Pipeline
    
    # check core SDK version number
    print("Azure Machine Learning SDK Version: ", azureml.core.VERSION)
    
    # load workspace
    workspace = Workspace.from_config()
    print(
        "Workspace name: " + workspace.name,
        "Azure region: " + workspace.location,
        "Subscription id: " + workspace.subscription_id,
        "Resource group: " + workspace.resource_group,
        sep="\n",
    )
    
    # create an ML experiment
    experiment = Experiment(workspace=workspace, name="train_score_eval_pipeline")
    
    # create a directory
    script_folder = "./src"
    
    # create compute
    from azureml.core.compute import ComputeTarget, AmlCompute
    from azureml.core.compute_target import ComputeTargetException
    
    # Choose a name for your CPU cluster
    amlcompute_cluster_name = "cpu-cluster"
    
    # Verify that cluster does not exist already
    try:
        aml_compute = ComputeTarget(workspace=workspace, name=amlcompute_cluster_name)
        print('Found existing cluster, use it.')
    except ComputeTargetException:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS12_V2',
                                                               max_nodes=4)
        aml_compute = ComputeTarget.create(ws, amlcompute_cluster_name, compute_config)
    
    aml_compute.wait_for_completion(show_output=True)
    
    # define data set
    data_urls = ["wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv"]
    input_ds = Dataset.File.from_files(data_urls)
    
    # define steps in pipeline
    from azureml.data import OutputFileDatasetConfig
    model_output = OutputFileDatasetConfig('model_output')
    train_step = PythonScriptStep(
        name="train step",
        script_name="train.py",
        arguments=['--training_data', input_ds.as_named_input('training_data').as_mount() ,'--max_epocs', 5, '--learning_rate', 0.1,'--model_output', model_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    score_output = OutputFileDatasetConfig('score_output')
    score_step = PythonScriptStep(
        name="score step",
        script_name="score.py",
        arguments=['--model_input',model_output.as_input('model_input'), '--test_data', input_ds.as_named_input('test_data').as_mount(), '--score_output', score_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    eval_output = OutputFileDatasetConfig('eval_output')
    eval_step = PythonScriptStep(
        name="eval step",
        script_name="eval.py",
        arguments=['--scoring_result',score_output.as_input('scoring_result'), '--eval_output', eval_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    # built pipeline
    from azureml.pipeline.core import Pipeline
    
    pipeline_steps = [train_step, score_step, eval_step]
    
    pipeline = Pipeline(workspace = workspace, steps=pipeline_steps)
    print("Pipeline is built.")
    
    pipeline_run = experiment.submit(pipeline, regenerate_outputs=False)
    
    print("Pipeline submitted for execution.")
    
    
  • SDK v2. Link do exemplo completo

    # import required libraries
    from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
    
    from azure.ai.ml import MLClient, Input
    from azure.ai.ml.dsl import pipeline
    
    try:
        credential = DefaultAzureCredential()
        # Check if given credential can get token successfully.
        credential.get_token("https://management.azure.com/.default")
    except Exception as ex:
        # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
        credential = InteractiveBrowserCredential()
    
    # Get a handle to workspace
    ml_client = MLClient.from_config(credential=credential)
    
    # Retrieve an already attached Azure Machine Learning Compute.
    cluster_name = "cpu-cluster"
    print(ml_client.compute.get(cluster_name))
    
    # Import components that are defined with Python function
    with open("src/components.py") as fin:
        print(fin.read())
    
    # You need to install mldesigner package to use command_component decorator.
    # Option 1: install directly
    # !pip install mldesigner
    
    # Option 2: install as an extra dependency of azure-ai-ml
    # !pip install azure-ai-ml[designer]
    
    # import the components as functions
    from src.components import train_model, score_data, eval_model
    
    cluster_name = "cpu-cluster"
    # define a pipeline with component
    @pipeline(default_compute=cluster_name)
    def pipeline_with_python_function_components(input_data, test_data, learning_rate):
        """E2E dummy train-score-eval pipeline with components defined via Python function components"""
    
        # Call component obj as function: apply given inputs & parameters to create a node in pipeline
        train_with_sample_data = train_model(
            training_data=input_data, max_epochs=5, learning_rate=learning_rate
        )
    
        score_with_sample_data = score_data(
            model_input=train_with_sample_data.outputs.model_output, test_data=test_data
        )
    
        eval_with_sample_data = eval_model(
            scoring_result=score_with_sample_data.outputs.score_output
        )
    
        # Return: pipeline outputs
        return {
            "eval_output": eval_with_sample_data.outputs.eval_output,
            "model_output": train_with_sample_data.outputs.model_output,
        }
    
    
    pipeline_job = pipeline_with_python_function_components(
        input_data=Input(
            path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file"
        ),
        test_data=Input(
            path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file"
        ),
        learning_rate=0.1,
    )
    
    # submit job to workspace
    pipeline_job = ml_client.jobs.create_or_update(
        pipeline_job, experiment_name="train_score_eval_pipeline"
    )
    

Mapeamento da funcionalidade de chave no SDK v1 e no SDK v2

Funcionalidade no SDK v1 Mapeamento aproximado no SDK v2
azureml.pipeline.core.Pipeline azure.ai.ml.dsl.pipeline
OutputDatasetConfig Saída
conjunto de dados as_mount Entrada
Sequência de etapas Dependência de dados

Mapeamento de tipo de etapa e de tipo de trabalho/componente

etapa no SDK v1 tipo de trabalho no SDK v2 tipo de componente no SDK v2
adla_step Nenhum Nenhum
automl_step trabalho automl componente automl
azurebatch_step Nenhum Nenhum
command_step command trabalho command componente
data_transfer_step Nenhum Nenhum
databricks_step Nenhum Nenhum
estimator_step command trabalho command componente
hyper_drive_step sweep trabalho Nenhum
kusto_step Nenhum Nenhum
module_step Nenhum command componente
mpi_step command trabalho command componente
parallel_run_step trabalho Parallel componente Parallel
python_script_step command trabalho command componente
r_script_step command trabalho command componente
synapse_spark_step spark trabalho spark componente

Pipelines publicados

Quando um pipeline estiver em funcionamento, você poderá publicá-lo para que ele seja executado com diferentes entradas. Anteriormente, isso era chamado de Pipelines Publicados. O Ponto de Extremidade em Lote propõe uma maneira ainda mais poderosa de lidar com vários ativos em execução em uma API durável, razão pela qual a funcionalidade Pipelines publicados foi movida para Implantações de componentes de pipeline em pontos de extremidade em lote.

Os pontos de extremidade em lote separam a interface (ponto de extremidade) da implementação real (implantação) e permitem que o usuário decida qual implantação atende à implementação padrão do ponto de extremidade. As Implantações de componentes de pipeline em pontos de extremidade em lote permitem que os usuários implantem componentes de pipeline em vez de pipelines, o que faz um melhor uso de ativos reutilizáveis para essas organizações que buscam simplificar sua prática de MLOps.

A tabela a seguir mostra uma comparação entre cada um dos conceitos:

Conceito SDK v1 SDK v2
Ponto de extremidade REST do pipeline para invocação Ponto de extremidade de pipeline Ponto de extremidade em lote
Versão específica do pipeline no ponto de extremidade Pipeline publicado Implantação do componente de pipeline
Argumentos do pipeline sobre invocação Parâmetro do pipeline Entradas de trabalho
Trabalho gerado a partir de um pipeline publicado Trabalho de pipeline Trabalho em lotes

Consulte Atualizar pontos de extremidade de pipeline para o SDK v2 para obter diretrizes específicas sobre como migrar para pontos de extremidade em lote.

Para obter mais informações, confira esta documentação: