Atualizar pipelines para SDK v2
No SDK v2, os "pipelines" são consolidados em trabalhos.
Um trabalho tem um tipo. A maioria dos trabalhos são trabalhos de comando que executam um command
, como python main.py
. O que é executado em um trabalho é agnóstico a qualquer linguagem de programação, então você pode executar bash
scripts, invocar python
intérpretes, executar um monte de curl
comandos ou qualquer outra coisa.
A pipeline
é outro tipo de trabalho, que define trabalhos filhos que podem ter relações de entrada/saída, formando um gráfico acíclico dirigido (DAG).
Para atualizar, você precisará alterar seu código para definir e enviar os pipelines para o SDK v2. O que você executa no trabalho filho não precisa ser atualizado para o SDK v2. No entanto, é recomendável remover qualquer código específico do Azure Machine Learning de seus scripts de treinamento de modelo. Essa separação permite uma transição mais fácil entre local e nuvem e é considerada uma prática recomendada para MLOps maduros. Na prática, isso significa remover azureml.*
linhas de código. O registro em log do modelo e o código de rastreamento devem ser substituídos pelo MLflow. Para obter mais informações, consulte como usar o MLflow na v2.
Este artigo fornece uma comparação de cenário(s) no SDK v1 e SDK v2. Nos exemplos a seguir, construiremos três etapas (treinar, pontuar e avaliar) em um trabalho de pipeline fictício. Isso demonstra como criar trabalhos de pipeline usando SDK v1 e SDK v2 e como consumir dados e transferir dados entre as etapas.
Executar um pipeline
SDK v1
# import required libraries import os import azureml.core from azureml.core import ( Workspace, Dataset, Datastore, ComputeTarget, Experiment, ScriptRunConfig, ) from azureml.pipeline.steps import PythonScriptStep from azureml.pipeline.core import Pipeline # check core SDK version number print("Azure Machine Learning SDK Version: ", azureml.core.VERSION) # load workspace workspace = Workspace.from_config() print( "Workspace name: " + workspace.name, "Azure region: " + workspace.location, "Subscription id: " + workspace.subscription_id, "Resource group: " + workspace.resource_group, sep="\n", ) # create an ML experiment experiment = Experiment(workspace=workspace, name="train_score_eval_pipeline") # create a directory script_folder = "./src" # create compute from azureml.core.compute import ComputeTarget, AmlCompute from azureml.core.compute_target import ComputeTargetException # Choose a name for your CPU cluster amlcompute_cluster_name = "cpu-cluster" # Verify that cluster does not exist already try: aml_compute = ComputeTarget(workspace=workspace, name=amlcompute_cluster_name) print('Found existing cluster, use it.') except ComputeTargetException: compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS12_V2', max_nodes=4) aml_compute = ComputeTarget.create(ws, amlcompute_cluster_name, compute_config) aml_compute.wait_for_completion(show_output=True) # define data set data_urls = ["wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv"] input_ds = Dataset.File.from_files(data_urls) # define steps in pipeline from azureml.data import OutputFileDatasetConfig model_output = OutputFileDatasetConfig('model_output') train_step = PythonScriptStep( name="train step", script_name="train.py", arguments=['--training_data', input_ds.as_named_input('training_data').as_mount() ,'--max_epocs', 5, '--learning_rate', 0.1,'--model_output', model_output], source_directory=script_folder, compute_target=aml_compute, allow_reuse=True, ) score_output = OutputFileDatasetConfig('score_output') score_step = PythonScriptStep( name="score step", script_name="score.py", arguments=['--model_input',model_output.as_input('model_input'), '--test_data', input_ds.as_named_input('test_data').as_mount(), '--score_output', score_output], source_directory=script_folder, compute_target=aml_compute, allow_reuse=True, ) eval_output = OutputFileDatasetConfig('eval_output') eval_step = PythonScriptStep( name="eval step", script_name="eval.py", arguments=['--scoring_result',score_output.as_input('scoring_result'), '--eval_output', eval_output], source_directory=script_folder, compute_target=aml_compute, allow_reuse=True, ) # built pipeline from azureml.pipeline.core import Pipeline pipeline_steps = [train_step, score_step, eval_step] pipeline = Pipeline(workspace = workspace, steps=pipeline_steps) print("Pipeline is built.") pipeline_run = experiment.submit(pipeline, regenerate_outputs=False) print("Pipeline submitted for execution.")
SDK v2. Link de exemplo completo
# import required libraries from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential from azure.ai.ml import MLClient, Input from azure.ai.ml.dsl import pipeline try: credential = DefaultAzureCredential() # Check if given credential can get token successfully. credential.get_token("https://management.azure.com/.default") except Exception as ex: # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work credential = InteractiveBrowserCredential() # Get a handle to workspace ml_client = MLClient.from_config(credential=credential) # Retrieve an already attached Azure Machine Learning Compute. cluster_name = "cpu-cluster" print(ml_client.compute.get(cluster_name)) # Import components that are defined with Python function with open("src/components.py") as fin: print(fin.read()) # You need to install mldesigner package to use command_component decorator. # Option 1: install directly # !pip install mldesigner # Option 2: install as an extra dependency of azure-ai-ml # !pip install azure-ai-ml[designer] # import the components as functions from src.components import train_model, score_data, eval_model cluster_name = "cpu-cluster" # define a pipeline with component @pipeline(default_compute=cluster_name) def pipeline_with_python_function_components(input_data, test_data, learning_rate): """E2E dummy train-score-eval pipeline with components defined via Python function components""" # Call component obj as function: apply given inputs & parameters to create a node in pipeline train_with_sample_data = train_model( training_data=input_data, max_epochs=5, learning_rate=learning_rate ) score_with_sample_data = score_data( model_input=train_with_sample_data.outputs.model_output, test_data=test_data ) eval_with_sample_data = eval_model( scoring_result=score_with_sample_data.outputs.score_output ) # Return: pipeline outputs return { "eval_output": eval_with_sample_data.outputs.eval_output, "model_output": train_with_sample_data.outputs.model_output, } pipeline_job = pipeline_with_python_function_components( input_data=Input( path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file" ), test_data=Input( path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file" ), learning_rate=0.1, ) # submit job to workspace pipeline_job = ml_client.jobs.create_or_update( pipeline_job, experiment_name="train_score_eval_pipeline" )
Mapeamento das principais funcionalidades no SDK v1 e SDK v2
Funcionalidade no SDK v1 | Mapeamento aproximado no SDK v2 |
---|---|
azureml.pipeline.core.Pipeline | azure.ai.ml.dsl.pipeline |
OutputDatasetConfig | Saída |
as_mount do conjunto de dados | Entrada |
Sequência de passos | Dependência de dados |
Mapeamento de etapas e tipos de trabalho/componente
etapa no SDK v1 | tipo de trabalho no SDK v2 | tipo de componente no SDK v2 |
---|---|---|
adla_step |
Nenhuma | Nenhuma |
automl_step |
automl emprego |
automl |
azurebatch_step |
Nenhuma | Nenhuma |
command_step |
command emprego |
command componente |
data_transfer_step |
Nenhuma | None |
databricks_step |
None | Nenhuma |
estimator_step |
command emprego |
command componente |
hyper_drive_step |
sweep emprego |
Nenhuma |
kusto_step |
None | None |
module_step |
Nenhuma | command componente |
mpi_step |
command emprego |
command componente |
parallel_run_step |
Parallel emprego |
Parallel |
python_script_step |
command emprego |
command componente |
r_script_step |
command emprego |
command componente |
synapse_spark_step |
spark emprego |
spark componente |
Pipelines publicados
Depois de ter um pipeline instalado e em execução, você pode publicar um pipeline para que ele seja executado com entradas diferentes. Isso era conhecido como Pipelines Publicados. O Batch Endpoint propõe uma maneira semelhante, porém mais poderosa, de lidar com vários ativos executados sob uma API durável, e é por isso que a funcionalidade Pipelines publicados foi movida para implantações de componentes Pipeline em pontos de extremidade em lote.
Os pontos de extremidade em lote separam a interface (ponto de extremidade) da implementação real (implantação) e permitem que o usuário decida qual implantação serve a implementação padrão do ponto de extremidade. As implantações de componentes de pipeline em pontos de extremidade em lote permitem que os usuários implantem componentes de pipeline em vez de pipelines, que fazem um melhor uso de ativos reutilizáveis para as organizações que procuram simplificar sua prática de MLOps.
A tabela a seguir mostra uma comparação de cada um dos conceitos:
Conceito | SDK v1 | SDK v2 |
---|---|---|
Ponto de extremidade REST do pipeline para invocação | Ponto de extremidade do pipeline | Ponto final em lote |
Versão específica do pipeline sob o endpoint | Pipeline publicado | Implantação de componentes de pipeline |
Argumentos da Pipeline sobre a invocação | Parâmetro do pipeline | Entradas de trabalho |
Trabalho gerado a partir de um pipeline publicado | Trabalho de pipeline | Tarefa de lote |
Consulte Atualizar pontos de extremidade de pipeline para SDK v2 para obter orientações específicas sobre como migrar para pontos de extremidade em lote.
Documentos relacionados
Para obter mais informações, consulte a documentação aqui: