Dela via


Uppgradera parallell körningssteg till SDK v2

I SDK v2 konsolideras "Parallell körningssteg" till jobbbegrepp som parallel job. Parallella jobb behåller samma mål för att ge användarna möjlighet att påskynda jobbkörningen genom att distribuera upprepade uppgifter på kraftfulla beräkningskluster med flera noder. Utöver parallella körningssteg ger parallella v2-jobb extra fördelar:

  • Ett flexibelt gränssnitt som gör att användaren kan definiera flera anpassade indata och utdata för ditt parallella jobb. Du kan ansluta dem med andra steg för att använda eller hantera deras innehåll i ditt postskript
  • Förenkla indataschemat, som ersätter Dataset som indata med hjälp av v2-konceptet data asset . Du kan enkelt använda dina lokala filer eller blobkatalog-URI som indata till parallella jobb.
  • Mer kraftfulla funktioner är underutvecklade endast i parallella v2-jobb. Du kan till exempel återuppta det misslyckade/avbrutna parallella jobbet för att fortsätta bearbeta de misslyckade eller obearbetade minibatcherna genom att återanvända det lyckade resultatet för att spara duplicerad ansträngning.

Om du vill uppgradera ditt aktuella parallella sdk v1-körningssteg till v2 måste du

Obs! Användarinmatningsskriptet är kompatibelt mellan v1 parallellt körningssteg och v2-parallellt jobb. Så du kan fortsätta att använda samma entry_script.py när du uppgraderar ditt parallella körningsjobb.

Den här artikeln ger en jämförelse av scenarion i SDK v1 och SDK v2. I följande exempel skapar vi ett parallellt jobb för att förutsäga indata i ett pipelinejobb. Du ser hur du skapar ett parallellt jobb och hur du använder det i ett pipelinejobb för både SDK v1 och SDK v2.

Förutsättningar

Skapa parallella steg

  • SDK v1

    # Create the configuration to wrap the inference script
    from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig
    
    parallel_run_config = ParallelRunConfig(
        source_directory=scripts_folder,
        entry_script=script_file,
        mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"),
        error_threshold=10,
        output_action="append_row",
        append_row_file_name="mnist_outputs.txt",
        environment=batch_env,
        compute_target=compute_target,
        process_count_per_node=PipelineParameter(name="process_count_param", default_value=2),
        node_count=2
    )
    
    # Create the Parallel run step
    parallelrun_step = ParallelRunStep(
        name="predict-digits-mnist",
        parallel_run_config=parallel_run_config,
        inputs=[ input_mnist_ds_consumption ],
        output=output_dir,
        allow_reuse=False
    )
    
  • SDK v2

    # parallel job to process file data
    file_batch_inference = parallel_run_function(
        name="file_batch_score",
        display_name="Batch Score with File Dataset",
        description="parallel component for batch score",
        inputs=dict(
            job_data_path=Input(
                type=AssetTypes.MLTABLE,
                description="The data to be split and scored in parallel",
            )
        ),
        outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)),
        input_data="${{inputs.job_data_path}}",
        instance_count=2,
        mini_batch_size="1",
        mini_batch_error_threshold=1,
        max_concurrency_per_instance=1,
        task=RunFunction(
            code="./src",
            entry_script="file_batch_inference.py",
            program_arguments="--job_output_path ${{outputs.job_output_path}}",
            environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1",
        ),
    )
    

Använda parallella steg i pipeline

  • SDK v1

    # Run pipeline with parallel run step
    from azureml.core import Experiment
    
    pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])
    experiment = Experiment(ws, 'digit_identification')
    pipeline_run = experiment.submit(pipeline)
    pipeline_run.wait_for_completion(show_output=True)
    
  • SDK v2

    @pipeline()
    def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model):
    
        prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path)
        # output of file & tabular data should be type MLTable
        prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE
        prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE
    
        batch_inference_with_file_data = file_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.file_output_data
        )
        # use eval_mount mode to handle file data
        batch_inference_with_file_data.inputs.job_data_path.mode = (
            InputOutputModes.EVAL_MOUNT
        )
        batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE
    
        batch_inference_with_tabular_data = tabular_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.tabular_output_data,
            score_model=pipeline_score_model,
        )
        # use direct mode to handle tabular data
        batch_inference_with_tabular_data.inputs.job_data_path.mode = (
            InputOutputModes.DIRECT
        )
    
        return {
            "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path,
            "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path,
        }
    
    pipeline_job_data_path = Input(
        path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT
    )
    pipeline_score_model = Input(
        path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD
    )
    # create a pipeline
    pipeline_job = parallel_in_pipeline(
        pipeline_job_data_path=pipeline_job_data_path,
        pipeline_score_model=pipeline_score_model,
    )
    pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE
    
    # set pipeline level compute
    pipeline_job.settings.default_compute = "cpu-cluster"
    
    # run pipeline job
    pipeline_job = ml_client.jobs.create_or_update(
        pipeline_job, experiment_name="pipeline_samples"
    )
    

Mappning av viktiga funktioner i SDK v1 och SDK v2

Funktioner i SDK v1 Grov mappning i SDK v2
azureml.pipeline.steps.parallelrunconfig
azureml.pipeline.steps.parallelrunstep
azure.ai.ml.parallel
OutputDatasetConfig Output
as_mount Indata

Parallella jobbkonfigurationer och inställningsmappning

SDK v1 SDK v2 beskrivning
ParallelRunConfig.environment parallel_run_function.task.environment Miljö som träningsjobbet körs i.
ParallelRunConfig.entry_script parallel_run_function.task.entry_script Användarskript som ska köras parallellt på flera noder.
ParallelRunConfig.error_threshold parallel_run_function.error_threshold Antalet misslyckade minibatch som kan ignoreras i det här parallella jobbet. Om antalet misslyckade minibatch är högre än det här tröskelvärdet markeras det parallella jobbet som misslyckat.

"-1" är standardnumret, vilket innebär att ignorera alla misslyckade mini-batch under parallella jobb.
ParallelRunConfig.output_action parallel_run_function.append_row_to Aggregera alla returer från varje körning av mini-batch och mata ut den till den här filen. Kan referera till någon av utdata från parallella jobb med uttrycket ${{outputs.<>output_name}}
ParallelRunConfig.node_count parallel_run_function.instance_count Valfritt antal instanser eller noder som används av beräkningsmålet. Standardvärdet är 1.
ParallelRunConfig.process_count_per_node parallel_run_function.max_concurrency_per_instance Den maximala parallellitet som varje beräkningsinstans har.
ParallelRunConfig.mini_batch_size parallel_run_function.mini_batch_size Definiera storleken på varje mini-batch för att dela indata.

Om input_data är en mapp eller uppsättning filer definierar det här numret antalet filer för varje mini-batch. Till exempel 10, 100.

Om input_data är tabelldata från mltabledefinierar det här talet den proximatiska fysiska storleken för varje mini-batch. Standardenheten är Byte och värdet kan acceptera strängen som 100 kb, 100 mb.
ParallelRunConfig.source_directory parallel_run_function.task.code En lokal sökväg eller fjärrsökväg som pekar på källkoden.
ParallelRunConfig.description parallel_run_function.description En vänlig beskrivning av parallella
ParallelRunConfig.logging_level parallel_run_function.logging_level En sträng med namnet på loggningsnivån, som definieras i loggning. Möjliga värden är "WARNING", "INFO" och "DEBUG". (valfritt, standardvärdet är "INFO".) Det här värdet kan anges via PipelineParameter.
ParallelRunConfig.run_invocation_timeout parallel_run_function.retry_settings.timeout Tidsgränsen i sekunder för körning av anpassad run()-funktion. Om körningstiden är högre än det här tröskelvärdet avbryts mini-batchen och markeras som en misslyckad mini-batch för att utlösa återförsök.
ParallelRunConfig.run_max_try parallel_run_function.retry_settings.max_retries Antalet återförsök när mini-batchen misslyckas eller tidsgränsen överskrids. Om alla återförsök misslyckas markeras mini-batchen som att den inte kunde räknas av mini_batch_error_threshold beräkning.
ParallelRunConfig.append_row_file_name parallel_run_function.append_row_to Kombinerat med append_row_to inställning.
ParallelRunConfig.allowed_failed_count parallel_run_function.mini_batch_error_threshold Antalet misslyckade minibatch som kan ignoreras i det här parallella jobbet. Om antalet misslyckade minibatch är högre än det här tröskelvärdet markeras det parallella jobbet som misslyckat.

"-1" är standardnumret, vilket innebär att ignorera alla misslyckade mini-batch under parallella jobb.
ParallelRunConfig.allowed_failed_percent parallel_run_function.task.program_arguments set
--allowed_failed_percent
Liknar "allowed_failed_count", men den här inställningen använder procentandelen misslyckade minibatch i stället för antalet minibatchfel.

Intervallet för den här inställningen är [0, 100]. "100" är standardnumret, vilket innebär att ignorera alla misslyckade mini-batchar under parallella jobb.
ParallelRunConfig.partition_keys Under utveckling.
ParallelRunConfig.environment_variables parallel_run_function.environment_variables En ordlista med miljövariablers namn och värden. Dessa miljövariabler anges i den process där användarskript körs.
ParallelRunStep.name parallel_run_function.name Namnet på det parallella jobb eller komponent som skapats.
ParallelRunStep.inputs parallel_run_function.inputs En diktering av indata som används av den här parallellen.
-- parallel_run_function.input_data Deklarera de data som ska delas och bearbetas parallellt
ParallelRunStep.output parallel_run_function.outputs Utdata från det här parallella jobbet.
ParallelRunStep.side_inputs parallel_run_function.inputs Definieras tillsammans med inputs.
ParallelRunStep.arguments parallel_run_function.task.program_arguments Argumenten för den parallella aktiviteten.
ParallelRunStep.allow_reuse parallel_run_function.is_deterministic Ange om parallellen ska returnera samma utdata med samma indata.

Nästa steg

Mer information finns i dokumentationen här: