Uppgradera parallell körningssteg till SDK v2
I SDK v2 konsolideras "Parallell körningssteg" till jobbbegrepp som parallel job
. Parallella jobb behåller samma mål för att ge användarna möjlighet att påskynda jobbkörningen genom att distribuera upprepade uppgifter på kraftfulla beräkningskluster med flera noder. Utöver parallella körningssteg ger parallella v2-jobb extra fördelar:
- Ett flexibelt gränssnitt som gör att användaren kan definiera flera anpassade indata och utdata för ditt parallella jobb. Du kan ansluta dem med andra steg för att använda eller hantera deras innehåll i ditt postskript
- Förenkla indataschemat, som ersätter
Dataset
som indata med hjälp av v2-konceptetdata asset
. Du kan enkelt använda dina lokala filer eller blobkatalog-URI som indata till parallella jobb. - Mer kraftfulla funktioner är underutvecklade endast i parallella v2-jobb. Du kan till exempel återuppta det misslyckade/avbrutna parallella jobbet för att fortsätta bearbeta de misslyckade eller obearbetade minibatcherna genom att återanvända det lyckade resultatet för att spara duplicerad ansträngning.
Om du vill uppgradera ditt aktuella parallella sdk v1-körningssteg till v2 måste du
- Använd
parallel_run_function
för att skapa parallella jobb genom attParallelRunConfig
ersätta ochParallelRunStep
i v1. - Uppgradera v1-pipelinen till v2. Anropa sedan ditt parallella v2-jobb som ett steg i v2-pipelinen. Mer information om pipelineuppgradering finns i uppgradera pipeline från v1 till v2 .
Obs! Användarinmatningsskriptet är kompatibelt mellan v1 parallellt körningssteg och v2-parallellt jobb. Så du kan fortsätta att använda samma entry_script.py när du uppgraderar ditt parallella körningsjobb.
Den här artikeln ger en jämförelse av scenarion i SDK v1 och SDK v2. I följande exempel skapar vi ett parallellt jobb för att förutsäga indata i ett pipelinejobb. Du ser hur du skapar ett parallellt jobb och hur du använder det i ett pipelinejobb för både SDK v1 och SDK v2.
Förutsättningar
- Förbereda din SDK v2-miljö: Installera Azure Machine Learning SDK v2 för Python
- Förstå grunden för SDK v2-pipelinen: Så här skapar du Azure Machine Learning-pipeline med Python SDK v2
Skapa parallella steg
SDK v1
# Create the configuration to wrap the inference script from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig parallel_run_config = ParallelRunConfig( source_directory=scripts_folder, entry_script=script_file, mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"), error_threshold=10, output_action="append_row", append_row_file_name="mnist_outputs.txt", environment=batch_env, compute_target=compute_target, process_count_per_node=PipelineParameter(name="process_count_param", default_value=2), node_count=2 ) # Create the Parallel run step parallelrun_step = ParallelRunStep( name="predict-digits-mnist", parallel_run_config=parallel_run_config, inputs=[ input_mnist_ds_consumption ], output=output_dir, allow_reuse=False )
SDK v2
# parallel job to process file data file_batch_inference = parallel_run_function( name="file_batch_score", display_name="Batch Score with File Dataset", description="parallel component for batch score", inputs=dict( job_data_path=Input( type=AssetTypes.MLTABLE, description="The data to be split and scored in parallel", ) ), outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)), input_data="${{inputs.job_data_path}}", instance_count=2, mini_batch_size="1", mini_batch_error_threshold=1, max_concurrency_per_instance=1, task=RunFunction( code="./src", entry_script="file_batch_inference.py", program_arguments="--job_output_path ${{outputs.job_output_path}}", environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1", ), )
Använda parallella steg i pipeline
SDK v1
# Run pipeline with parallel run step from azureml.core import Experiment pipeline = Pipeline(workspace=ws, steps=[parallelrun_step]) experiment = Experiment(ws, 'digit_identification') pipeline_run = experiment.submit(pipeline) pipeline_run.wait_for_completion(show_output=True)
SDK v2
@pipeline() def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model): prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path) # output of file & tabular data should be type MLTable prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE batch_inference_with_file_data = file_batch_inference( job_data_path=prepare_file_tabular_data.outputs.file_output_data ) # use eval_mount mode to handle file data batch_inference_with_file_data.inputs.job_data_path.mode = ( InputOutputModes.EVAL_MOUNT ) batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE batch_inference_with_tabular_data = tabular_batch_inference( job_data_path=prepare_file_tabular_data.outputs.tabular_output_data, score_model=pipeline_score_model, ) # use direct mode to handle tabular data batch_inference_with_tabular_data.inputs.job_data_path.mode = ( InputOutputModes.DIRECT ) return { "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path, "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path, } pipeline_job_data_path = Input( path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT ) pipeline_score_model = Input( path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD ) # create a pipeline pipeline_job = parallel_in_pipeline( pipeline_job_data_path=pipeline_job_data_path, pipeline_score_model=pipeline_score_model, ) pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE # set pipeline level compute pipeline_job.settings.default_compute = "cpu-cluster" # run pipeline job pipeline_job = ml_client.jobs.create_or_update( pipeline_job, experiment_name="pipeline_samples" )
Mappning av viktiga funktioner i SDK v1 och SDK v2
Funktioner i SDK v1 | Grov mappning i SDK v2 |
---|---|
azureml.pipeline.steps.parallelrunconfig azureml.pipeline.steps.parallelrunstep |
azure.ai.ml.parallel |
OutputDatasetConfig | Output |
as_mount | Indata |
Parallella jobbkonfigurationer och inställningsmappning
SDK v1 | SDK v2 | beskrivning |
---|---|---|
ParallelRunConfig.environment | parallel_run_function.task.environment | Miljö som träningsjobbet körs i. |
ParallelRunConfig.entry_script | parallel_run_function.task.entry_script | Användarskript som ska köras parallellt på flera noder. |
ParallelRunConfig.error_threshold | parallel_run_function.error_threshold | Antalet misslyckade minibatch som kan ignoreras i det här parallella jobbet. Om antalet misslyckade minibatch är högre än det här tröskelvärdet markeras det parallella jobbet som misslyckat. "-1" är standardnumret, vilket innebär att ignorera alla misslyckade mini-batch under parallella jobb. |
ParallelRunConfig.output_action | parallel_run_function.append_row_to | Aggregera alla returer från varje körning av mini-batch och mata ut den till den här filen. Kan referera till någon av utdata från parallella jobb med uttrycket ${{outputs.<>output_name}} |
ParallelRunConfig.node_count | parallel_run_function.instance_count | Valfritt antal instanser eller noder som används av beräkningsmålet. Standardvärdet är 1. |
ParallelRunConfig.process_count_per_node | parallel_run_function.max_concurrency_per_instance | Den maximala parallellitet som varje beräkningsinstans har. |
ParallelRunConfig.mini_batch_size | parallel_run_function.mini_batch_size | Definiera storleken på varje mini-batch för att dela indata. Om input_data är en mapp eller uppsättning filer definierar det här numret antalet filer för varje mini-batch. Till exempel 10, 100. Om input_data är tabelldata från mltable definierar det här talet den proximatiska fysiska storleken för varje mini-batch. Standardenheten är Byte och värdet kan acceptera strängen som 100 kb, 100 mb. |
ParallelRunConfig.source_directory | parallel_run_function.task.code | En lokal sökväg eller fjärrsökväg som pekar på källkoden. |
ParallelRunConfig.description | parallel_run_function.description | En vänlig beskrivning av parallella |
ParallelRunConfig.logging_level | parallel_run_function.logging_level | En sträng med namnet på loggningsnivån, som definieras i loggning. Möjliga värden är "WARNING", "INFO" och "DEBUG". (valfritt, standardvärdet är "INFO".) Det här värdet kan anges via PipelineParameter. |
ParallelRunConfig.run_invocation_timeout | parallel_run_function.retry_settings.timeout | Tidsgränsen i sekunder för körning av anpassad run()-funktion. Om körningstiden är högre än det här tröskelvärdet avbryts mini-batchen och markeras som en misslyckad mini-batch för att utlösa återförsök. |
ParallelRunConfig.run_max_try | parallel_run_function.retry_settings.max_retries | Antalet återförsök när mini-batchen misslyckas eller tidsgränsen överskrids. Om alla återförsök misslyckas markeras mini-batchen som att den inte kunde räknas av mini_batch_error_threshold beräkning. |
ParallelRunConfig.append_row_file_name | parallel_run_function.append_row_to | Kombinerat med append_row_to inställning. |
ParallelRunConfig.allowed_failed_count | parallel_run_function.mini_batch_error_threshold | Antalet misslyckade minibatch som kan ignoreras i det här parallella jobbet. Om antalet misslyckade minibatch är högre än det här tröskelvärdet markeras det parallella jobbet som misslyckat. "-1" är standardnumret, vilket innebär att ignorera alla misslyckade mini-batch under parallella jobb. |
ParallelRunConfig.allowed_failed_percent | parallel_run_function.task.program_arguments set --allowed_failed_percent |
Liknar "allowed_failed_count", men den här inställningen använder procentandelen misslyckade minibatch i stället för antalet minibatchfel. Intervallet för den här inställningen är [0, 100]. "100" är standardnumret, vilket innebär att ignorera alla misslyckade mini-batchar under parallella jobb. |
ParallelRunConfig.partition_keys | Under utveckling. | |
ParallelRunConfig.environment_variables | parallel_run_function.environment_variables | En ordlista med miljövariablers namn och värden. Dessa miljövariabler anges i den process där användarskript körs. |
ParallelRunStep.name | parallel_run_function.name | Namnet på det parallella jobb eller komponent som skapats. |
ParallelRunStep.inputs | parallel_run_function.inputs | En diktering av indata som används av den här parallellen. |
-- | parallel_run_function.input_data | Deklarera de data som ska delas och bearbetas parallellt |
ParallelRunStep.output | parallel_run_function.outputs | Utdata från det här parallella jobbet. |
ParallelRunStep.side_inputs | parallel_run_function.inputs | Definieras tillsammans med inputs . |
ParallelRunStep.arguments | parallel_run_function.task.program_arguments | Argumenten för den parallella aktiviteten. |
ParallelRunStep.allow_reuse | parallel_run_function.is_deterministic | Ange om parallellen ska returnera samma utdata med samma indata. |
Nästa steg
Mer information finns i dokumentationen här: