Uppgradera pipelines till SDK v2
I SDK v2 konsolideras "pipelines" till jobb.
Ett jobb har en typ. De flesta jobb är kommandojobb som kör en command
, till exempel python main.py
. Det som körs i ett jobb är oberoende för alla programmeringsspråk, så du kan köra bash
skript, anropa python
tolkar, köra en massa curl
kommandon eller något annat.
A pipeline
är en annan typ av jobb, som definierar underordnade jobb som kan ha indata-/utdatarelationer som bildar en riktad acyklisk graf (DAG).
Om du vill uppgradera måste du ändra koden för att definiera och skicka pipelines till SDK v2. Det du kör i det underordnade jobbet behöver inte uppgraderas till SDK v2. Vi rekommenderar dock att du tar bort kod som är specifik för Azure Machine Learning från dina modellträningsskript. Den här separationen möjliggör en enklare övergång mellan lokalt och moln och anses vara bästa praxis för mogna MLOps. I praktiken innebär detta att rader med kod tas bort azureml.*
. Modellloggning och spårningskod bör ersättas med MLflow. Mer information finns i hur du använder MLflow i v2.
Den här artikeln ger en jämförelse av scenarion i SDK v1 och SDK v2. I följande exempel skapar vi tre steg (träna, poängsätta och utvärdera) till ett dummy-pipelinejobb. Detta visar hur du skapar pipelinejobb med SDK v1 och SDK v2 och hur du använder data och överför data mellan stegen.
Kör en pipeline
SDK v1
# import required libraries import os import azureml.core from azureml.core import ( Workspace, Dataset, Datastore, ComputeTarget, Experiment, ScriptRunConfig, ) from azureml.pipeline.steps import PythonScriptStep from azureml.pipeline.core import Pipeline # check core SDK version number print("Azure Machine Learning SDK Version: ", azureml.core.VERSION) # load workspace workspace = Workspace.from_config() print( "Workspace name: " + workspace.name, "Azure region: " + workspace.location, "Subscription id: " + workspace.subscription_id, "Resource group: " + workspace.resource_group, sep="\n", ) # create an ML experiment experiment = Experiment(workspace=workspace, name="train_score_eval_pipeline") # create a directory script_folder = "./src" # create compute from azureml.core.compute import ComputeTarget, AmlCompute from azureml.core.compute_target import ComputeTargetException # Choose a name for your CPU cluster amlcompute_cluster_name = "cpu-cluster" # Verify that cluster does not exist already try: aml_compute = ComputeTarget(workspace=workspace, name=amlcompute_cluster_name) print('Found existing cluster, use it.') except ComputeTargetException: compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS12_V2', max_nodes=4) aml_compute = ComputeTarget.create(ws, amlcompute_cluster_name, compute_config) aml_compute.wait_for_completion(show_output=True) # define data set data_urls = ["wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv"] input_ds = Dataset.File.from_files(data_urls) # define steps in pipeline from azureml.data import OutputFileDatasetConfig model_output = OutputFileDatasetConfig('model_output') train_step = PythonScriptStep( name="train step", script_name="train.py", arguments=['--training_data', input_ds.as_named_input('training_data').as_mount() ,'--max_epocs', 5, '--learning_rate', 0.1,'--model_output', model_output], source_directory=script_folder, compute_target=aml_compute, allow_reuse=True, ) score_output = OutputFileDatasetConfig('score_output') score_step = PythonScriptStep( name="score step", script_name="score.py", arguments=['--model_input',model_output.as_input('model_input'), '--test_data', input_ds.as_named_input('test_data').as_mount(), '--score_output', score_output], source_directory=script_folder, compute_target=aml_compute, allow_reuse=True, ) eval_output = OutputFileDatasetConfig('eval_output') eval_step = PythonScriptStep( name="eval step", script_name="eval.py", arguments=['--scoring_result',score_output.as_input('scoring_result'), '--eval_output', eval_output], source_directory=script_folder, compute_target=aml_compute, allow_reuse=True, ) # built pipeline from azureml.pipeline.core import Pipeline pipeline_steps = [train_step, score_step, eval_step] pipeline = Pipeline(workspace = workspace, steps=pipeline_steps) print("Pipeline is built.") pipeline_run = experiment.submit(pipeline, regenerate_outputs=False) print("Pipeline submitted for execution.")
SDK v2. Fullständig exempellänk
# import required libraries from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential from azure.ai.ml import MLClient, Input from azure.ai.ml.dsl import pipeline try: credential = DefaultAzureCredential() # Check if given credential can get token successfully. credential.get_token("https://management.azure.com/.default") except Exception as ex: # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work credential = InteractiveBrowserCredential() # Get a handle to workspace ml_client = MLClient.from_config(credential=credential) # Retrieve an already attached Azure Machine Learning Compute. cluster_name = "cpu-cluster" print(ml_client.compute.get(cluster_name)) # Import components that are defined with Python function with open("src/components.py") as fin: print(fin.read()) # You need to install mldesigner package to use command_component decorator. # Option 1: install directly # !pip install mldesigner # Option 2: install as an extra dependency of azure-ai-ml # !pip install azure-ai-ml[designer] # import the components as functions from src.components import train_model, score_data, eval_model cluster_name = "cpu-cluster" # define a pipeline with component @pipeline(default_compute=cluster_name) def pipeline_with_python_function_components(input_data, test_data, learning_rate): """E2E dummy train-score-eval pipeline with components defined via Python function components""" # Call component obj as function: apply given inputs & parameters to create a node in pipeline train_with_sample_data = train_model( training_data=input_data, max_epochs=5, learning_rate=learning_rate ) score_with_sample_data = score_data( model_input=train_with_sample_data.outputs.model_output, test_data=test_data ) eval_with_sample_data = eval_model( scoring_result=score_with_sample_data.outputs.score_output ) # Return: pipeline outputs return { "eval_output": eval_with_sample_data.outputs.eval_output, "model_output": train_with_sample_data.outputs.model_output, } pipeline_job = pipeline_with_python_function_components( input_data=Input( path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file" ), test_data=Input( path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file" ), learning_rate=0.1, ) # submit job to workspace pipeline_job = ml_client.jobs.create_or_update( pipeline_job, experiment_name="train_score_eval_pipeline" )
Mappning av viktiga funktioner i SDK v1 och SDK v2
Funktioner i SDK v1 | Grov mappning i SDK v2 |
---|---|
azureml.pipeline.core.Pipeline | azure.ai.ml.dsl.pipeline |
OutputDatasetConfig | Output |
as_mount | Indata |
StepSequence | Databeroende |
Steg- och jobb-/komponenttypmappning
steg i SDK v1 | jobbtyp i SDK v2 | komponenttyp i SDK v2 |
---|---|---|
adla_step |
Ingen | Ingen |
automl_step |
automl jobb |
automl komponent |
azurebatch_step |
Ingen | Ingen |
command_step |
command jobb |
command komponent |
data_transfer_step |
Ingen | Inga |
databricks_step |
Inga | Ingen |
estimator_step |
command jobb |
command komponent |
hyper_drive_step |
sweep jobb |
Ingen |
kusto_step |
Inga | Inga |
module_step |
Ingen | command komponent |
mpi_step |
command jobb |
command komponent |
parallel_run_step |
Parallel jobb |
Parallel komponent |
python_script_step |
command jobb |
command komponent |
r_script_step |
command jobb |
command komponent |
synapse_spark_step |
spark jobb |
spark komponent |
Publicerade pipelines
När du har en pipeline igång kan du publicera en pipeline så att den körs med olika indata. Detta kallades publicerade pipelines. Batch-slutpunkten föreslår ett liknande men kraftfullare sätt att hantera flera tillgångar som körs under ett beständiga API. Därför har funktionen Publicerade pipelines flyttats till distributioner av pipelinekomponenter i batchslutpunkter.
Batch-slutpunkter frikopplar gränssnittet (slutpunkten) från den faktiska implementeringen (distributionen) och låter användaren bestämma vilken distribution som ska användas för standardimplementeringen av slutpunkten. Distributioner av pipelinekomponenter i batchslutpunkter gör det möjligt för användare att distribuera pipelinekomponenter i stället för pipelines, vilket bättre använder återanvändbara tillgångar för de organisationer som vill effektivisera sin MLOps-praxis.
Följande tabell visar en jämförelse av vart och ett av begreppen:
Koncept | SDK v1 | SDK v2 |
---|---|---|
Pipelinens REST-slutpunkt för anrop | Pipelineslutpunkt | Batch-slutpunkt |
Pipelinens specifika version under slutpunkten | Publicerad pipeline | Distribution av pipelinekomponenter |
Pipelinens argument om anrop | Pipeline-parameter | Jobbindata |
Jobb som genererats från en publicerad pipeline | Pipelinejobb | Batchjobb |
Relaterade dokument
Mer information finns i dokumentationen här: