Dela via


Uppgradera pipelines till SDK v2

I SDK v2 konsolideras "pipelines" till jobb.

Ett jobb har en typ. De flesta jobb är kommandojobb som kör en command, till exempel python main.py. Det som körs i ett jobb är oberoende för alla programmeringsspråk, så du kan köra bash skript, anropa python tolkar, köra en massa curl kommandon eller något annat.

A pipeline är en annan typ av jobb, som definierar underordnade jobb som kan ha indata-/utdatarelationer som bildar en riktad acyklisk graf (DAG).

Om du vill uppgradera måste du ändra koden för att definiera och skicka pipelines till SDK v2. Det du kör i det underordnade jobbet behöver inte uppgraderas till SDK v2. Vi rekommenderar dock att du tar bort kod som är specifik för Azure Machine Learning från dina modellträningsskript. Den här separationen möjliggör en enklare övergång mellan lokalt och moln och anses vara bästa praxis för mogna MLOps. I praktiken innebär detta att rader med kod tas bort azureml.* . Modellloggning och spårningskod bör ersättas med MLflow. Mer information finns i hur du använder MLflow i v2.

Den här artikeln ger en jämförelse av scenarion i SDK v1 och SDK v2. I följande exempel skapar vi tre steg (träna, poängsätta och utvärdera) till ett dummy-pipelinejobb. Detta visar hur du skapar pipelinejobb med SDK v1 och SDK v2 och hur du använder data och överför data mellan stegen.

Kör en pipeline

  • SDK v1

    # import required libraries
    import os
    import azureml.core
    from azureml.core import (
        Workspace,
        Dataset,
        Datastore,
        ComputeTarget,
        Experiment,
        ScriptRunConfig,
    )
    from azureml.pipeline.steps import PythonScriptStep
    from azureml.pipeline.core import Pipeline
    
    # check core SDK version number
    print("Azure Machine Learning SDK Version: ", azureml.core.VERSION)
    
    # load workspace
    workspace = Workspace.from_config()
    print(
        "Workspace name: " + workspace.name,
        "Azure region: " + workspace.location,
        "Subscription id: " + workspace.subscription_id,
        "Resource group: " + workspace.resource_group,
        sep="\n",
    )
    
    # create an ML experiment
    experiment = Experiment(workspace=workspace, name="train_score_eval_pipeline")
    
    # create a directory
    script_folder = "./src"
    
    # create compute
    from azureml.core.compute import ComputeTarget, AmlCompute
    from azureml.core.compute_target import ComputeTargetException
    
    # Choose a name for your CPU cluster
    amlcompute_cluster_name = "cpu-cluster"
    
    # Verify that cluster does not exist already
    try:
        aml_compute = ComputeTarget(workspace=workspace, name=amlcompute_cluster_name)
        print('Found existing cluster, use it.')
    except ComputeTargetException:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS12_V2',
                                                               max_nodes=4)
        aml_compute = ComputeTarget.create(ws, amlcompute_cluster_name, compute_config)
    
    aml_compute.wait_for_completion(show_output=True)
    
    # define data set
    data_urls = ["wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv"]
    input_ds = Dataset.File.from_files(data_urls)
    
    # define steps in pipeline
    from azureml.data import OutputFileDatasetConfig
    model_output = OutputFileDatasetConfig('model_output')
    train_step = PythonScriptStep(
        name="train step",
        script_name="train.py",
        arguments=['--training_data', input_ds.as_named_input('training_data').as_mount() ,'--max_epocs', 5, '--learning_rate', 0.1,'--model_output', model_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    score_output = OutputFileDatasetConfig('score_output')
    score_step = PythonScriptStep(
        name="score step",
        script_name="score.py",
        arguments=['--model_input',model_output.as_input('model_input'), '--test_data', input_ds.as_named_input('test_data').as_mount(), '--score_output', score_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    eval_output = OutputFileDatasetConfig('eval_output')
    eval_step = PythonScriptStep(
        name="eval step",
        script_name="eval.py",
        arguments=['--scoring_result',score_output.as_input('scoring_result'), '--eval_output', eval_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    # built pipeline
    from azureml.pipeline.core import Pipeline
    
    pipeline_steps = [train_step, score_step, eval_step]
    
    pipeline = Pipeline(workspace = workspace, steps=pipeline_steps)
    print("Pipeline is built.")
    
    pipeline_run = experiment.submit(pipeline, regenerate_outputs=False)
    
    print("Pipeline submitted for execution.")
    
    
  • SDK v2. Fullständig exempellänk

    # import required libraries
    from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
    
    from azure.ai.ml import MLClient, Input
    from azure.ai.ml.dsl import pipeline
    
    try:
        credential = DefaultAzureCredential()
        # Check if given credential can get token successfully.
        credential.get_token("https://management.azure.com/.default")
    except Exception as ex:
        # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
        credential = InteractiveBrowserCredential()
    
    # Get a handle to workspace
    ml_client = MLClient.from_config(credential=credential)
    
    # Retrieve an already attached Azure Machine Learning Compute.
    cluster_name = "cpu-cluster"
    print(ml_client.compute.get(cluster_name))
    
    # Import components that are defined with Python function
    with open("src/components.py") as fin:
        print(fin.read())
    
    # You need to install mldesigner package to use command_component decorator.
    # Option 1: install directly
    # !pip install mldesigner
    
    # Option 2: install as an extra dependency of azure-ai-ml
    # !pip install azure-ai-ml[designer]
    
    # import the components as functions
    from src.components import train_model, score_data, eval_model
    
    cluster_name = "cpu-cluster"
    # define a pipeline with component
    @pipeline(default_compute=cluster_name)
    def pipeline_with_python_function_components(input_data, test_data, learning_rate):
        """E2E dummy train-score-eval pipeline with components defined via Python function components"""
    
        # Call component obj as function: apply given inputs & parameters to create a node in pipeline
        train_with_sample_data = train_model(
            training_data=input_data, max_epochs=5, learning_rate=learning_rate
        )
    
        score_with_sample_data = score_data(
            model_input=train_with_sample_data.outputs.model_output, test_data=test_data
        )
    
        eval_with_sample_data = eval_model(
            scoring_result=score_with_sample_data.outputs.score_output
        )
    
        # Return: pipeline outputs
        return {
            "eval_output": eval_with_sample_data.outputs.eval_output,
            "model_output": train_with_sample_data.outputs.model_output,
        }
    
    
    pipeline_job = pipeline_with_python_function_components(
        input_data=Input(
            path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file"
        ),
        test_data=Input(
            path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file"
        ),
        learning_rate=0.1,
    )
    
    # submit job to workspace
    pipeline_job = ml_client.jobs.create_or_update(
        pipeline_job, experiment_name="train_score_eval_pipeline"
    )
    

Mappning av viktiga funktioner i SDK v1 och SDK v2

Funktioner i SDK v1 Grov mappning i SDK v2
azureml.pipeline.core.Pipeline azure.ai.ml.dsl.pipeline
OutputDatasetConfig Output
as_mount Indata
StepSequence Databeroende

Steg- och jobb-/komponenttypmappning

steg i SDK v1 jobbtyp i SDK v2 komponenttyp i SDK v2
adla_step Ingen Ingen
automl_step automl jobb automl komponent
azurebatch_step Ingen Ingen
command_step command jobb command komponent
data_transfer_step Ingen Inga
databricks_step Inga Ingen
estimator_step command jobb command komponent
hyper_drive_step sweep jobb Ingen
kusto_step Inga Inga
module_step Ingen command komponent
mpi_step command jobb command komponent
parallel_run_step Parallel jobb Parallel komponent
python_script_step command jobb command komponent
r_script_step command jobb command komponent
synapse_spark_step spark jobb spark komponent

Publicerade pipelines

När du har en pipeline igång kan du publicera en pipeline så att den körs med olika indata. Detta kallades publicerade pipelines. Batch-slutpunkten föreslår ett liknande men kraftfullare sätt att hantera flera tillgångar som körs under ett beständiga API. Därför har funktionen Publicerade pipelines flyttats till distributioner av pipelinekomponenter i batchslutpunkter.

Batch-slutpunkter frikopplar gränssnittet (slutpunkten) från den faktiska implementeringen (distributionen) och låter användaren bestämma vilken distribution som ska användas för standardimplementeringen av slutpunkten. Distributioner av pipelinekomponenter i batchslutpunkter gör det möjligt för användare att distribuera pipelinekomponenter i stället för pipelines, vilket bättre använder återanvändbara tillgångar för de organisationer som vill effektivisera sin MLOps-praxis.

Följande tabell visar en jämförelse av vart och ett av begreppen:

Koncept SDK v1 SDK v2
Pipelinens REST-slutpunkt för anrop Pipelineslutpunkt Batch-slutpunkt
Pipelinens specifika version under slutpunkten Publicerad pipeline Distribution av pipelinekomponenter
Pipelinens argument om anrop Pipeline-parameter Jobbindata
Jobb som genererats från en publicerad pipeline Pipelinejobb Batchjobb

Mer information om hur du migrerar till batchslutpunkter finns i Uppgradera pipelineslutpunkter till SDK v2 .

Mer information finns i dokumentationen här: