Atualizar etapa de execução paralela para SDK v2
No SDK v2, a "etapa de execução paralela" é consolidada no conceito de trabalho como parallel job
. O trabalho paralelo mantém o mesmo objetivo de capacitar os usuários a acelerar a execução do trabalho, distribuindo tarefas repetidas em clusters de cálculo de vários nós avançados. Além da etapa de execução paralela, o trabalho paralelo v2 oferece benefícios adicionais:
- Interface flexível, que permite que o usuário defina várias entradas e saídas personalizadas para o trabalho paralelo. Você pode conectá-las a outras etapas para consumir ou gerenciar o conteúdo no script de entrada
- Simplifique o esquema de entrada, que substitui
Dataset
como entrada, usando o conceito dodata asset
v2. Você pode usar facilmente seus arquivos locais ou o URI do diretório de blob como entradas para o trabalho paralelo. - Recursos mais avançados estão em desenvolvimento apenas no trabalho paralelo v2. Por exemplo, retome o trabalho paralelo com falha/cancelado para continuar a processar os minilotes com falha ou não processados reutilizando o resultado bem-sucedido para economizar esforço duplicado.
Para atualizar a etapa de execução paralela do SDK v1 atual para v2, você precisará fazer o seguinte
- Use
parallel_run_function
para criar um trabalho paralelo substituindoParallelRunConfig
eParallelRunStep
na v1. - Fazer upgrade do seu pipeline v1 para a v2. Em seguida, invoque seu trabalho paralelo v2 como uma etapa no seu pipeline v2. Consulte como fazer upgrade do pipeline da v1 para a v2 para obter detalhes sobre o upgrade de pipeline.
Observação: o script de entrada do usuário é compatível entre a etapa de execução paralela v1 e o trabalho paralelo v2. Portanto, você pode continuar usando o mesmo entry_script.py ao fazer upgrade do seu trabalho de execução paralela.
Este artigo fornece uma comparação de cenários no SDK v1 e no SDK v2. Nos exemplos a seguir, criaremos um trabalho paralelo para prever dados de entrada em um trabalho de pipelines. Você verá como criar um trabalho paralelo e como usá-lo em um trabalho de pipeline para SDK v1 e SDK v2.
Pré-requisitos
- Preparar seu ambiente do SDK v2: Instalar o SDK v2 do Azure Machine Learning para Python
- Entenda a base do pipeline do SDK v2: Como criar um pipeline do Azure Machine Learning com o SDK v2 do Python
Criar etapa paralela
SDK v1
# Create the configuration to wrap the inference script from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig parallel_run_config = ParallelRunConfig( source_directory=scripts_folder, entry_script=script_file, mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"), error_threshold=10, output_action="append_row", append_row_file_name="mnist_outputs.txt", environment=batch_env, compute_target=compute_target, process_count_per_node=PipelineParameter(name="process_count_param", default_value=2), node_count=2 ) # Create the Parallel run step parallelrun_step = ParallelRunStep( name="predict-digits-mnist", parallel_run_config=parallel_run_config, inputs=[ input_mnist_ds_consumption ], output=output_dir, allow_reuse=False )
SDK v2
# parallel job to process file data file_batch_inference = parallel_run_function( name="file_batch_score", display_name="Batch Score with File Dataset", description="parallel component for batch score", inputs=dict( job_data_path=Input( type=AssetTypes.MLTABLE, description="The data to be split and scored in parallel", ) ), outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)), input_data="${{inputs.job_data_path}}", instance_count=2, mini_batch_size="1", mini_batch_error_threshold=1, max_concurrency_per_instance=1, task=RunFunction( code="./src", entry_script="file_batch_inference.py", program_arguments="--job_output_path ${{outputs.job_output_path}}", environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1", ), )
Usar etapa paralela no pipeline
SDK v1
# Run pipeline with parallel run step from azureml.core import Experiment pipeline = Pipeline(workspace=ws, steps=[parallelrun_step]) experiment = Experiment(ws, 'digit_identification') pipeline_run = experiment.submit(pipeline) pipeline_run.wait_for_completion(show_output=True)
SDK v2
@pipeline() def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model): prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path) # output of file & tabular data should be type MLTable prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE batch_inference_with_file_data = file_batch_inference( job_data_path=prepare_file_tabular_data.outputs.file_output_data ) # use eval_mount mode to handle file data batch_inference_with_file_data.inputs.job_data_path.mode = ( InputOutputModes.EVAL_MOUNT ) batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE batch_inference_with_tabular_data = tabular_batch_inference( job_data_path=prepare_file_tabular_data.outputs.tabular_output_data, score_model=pipeline_score_model, ) # use direct mode to handle tabular data batch_inference_with_tabular_data.inputs.job_data_path.mode = ( InputOutputModes.DIRECT ) return { "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path, "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path, } pipeline_job_data_path = Input( path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT ) pipeline_score_model = Input( path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD ) # create a pipeline pipeline_job = parallel_in_pipeline( pipeline_job_data_path=pipeline_job_data_path, pipeline_score_model=pipeline_score_model, ) pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE # set pipeline level compute pipeline_job.settings.default_compute = "cpu-cluster" # run pipeline job pipeline_job = ml_client.jobs.create_or_update( pipeline_job, experiment_name="pipeline_samples" )
Mapeamento da funcionalidade de chave no SDK v1 e no SDK v2
Funcionalidade no SDK v1 | Mapeamento aproximado no SDK v2 |
---|---|
azureml.pipeline.steps.parallelrunconfig azureml.pipeline.steps.parallelrunstep |
azure.ai.ml.parallel |
OutputDatasetConfig | Saída |
conjunto de dados as_mount | Entrada |
Mapeamento de configurações e definições de trabalho paralelo
SDK v1 | SDK v2 | Descrição |
---|---|---|
ParallelRunConfig.environment | parallel_run_function.task.environment | Ambiente em que o trabalho de treinamento será executado. |
ParallelRunConfig.entry_script | parallel_run_function.task.entry_script | Script do usuário que será executado em paralelo em vários nós. |
ParallelRunConfig.error_threshold | parallel_run_function.error_threshold | O número de minilotes com falha que podem ser ignorados neste trabalho paralelo. Se a contagem do minilote com falha for maior que esse limite, o trabalho paralelo será marcado como com falha. "-1" é o número padrão, o que significa ignorar todos os minilotes com falha durante o trabalho paralelo. |
ParallelRunConfig.output_action | parallel_run_function.append_row_to | Agregar todos os retornos de cada execução do minilote e gerá-lo para este arquivo. Pode fazer referência a uma das saídas do trabalho paralelo usando a expressão ${{outputs.<output_name>}} |
ParallelRunConfig.node_count | parallel_run_function.instance_count | Número opcional de instâncias ou nós usados pelo destino de computação. O valor padrão é 1. |
ParallelRunConfig.process_count_per_node | parallel_run_function.max_concurrency_per_instance | O paralelismo máximo de cada instância de computação. |
ParallelRunConfig.mini_batch_size | parallel_run_function.mini_batch_size | Defina o tamanho de cada minilote para dividir a entrada. Se o input_data for uma pasta ou um conjunto de arquivos, esse número definirá a contagem de arquivos para cada minilote. Por exemplo: 10, 100. Se o input_data consiste em dados tabulares do mltable , esse número define o tamanho físico aproximado de cada minilote. A unidade padrão é Byte e o valor pode aceitar a cadeia de caracteres como 100 kb, 100 mb. |
ParallelRunConfig.source_directory | parallel_run_function.task.code | Um caminho local ou remoto apontando para o código-fonte. |
ParallelRunConfig.description | parallel_run_function.description | Uma descrição amigável do paralelo |
ParallelRunConfig.logging_level | parallel_run_function.logging_level | Uma cadeia de caracteres do nome do nível de registros em log, que é definido em 'logging'. Os valores possíveis são 'WARNING', 'INFO' e 'DEBUG'. (opcional, o valor padrão é 'INFO'.) Esse valor pode ser definido por meio do PipelineParameter. |
ParallelRunConfig.run_invocation_timeout | parallel_run_function.retry_settings.timeout | O tempo limite em segundos para executar a função run() personalizada. Se o tempo de execução for maior que esse limite, o minilote será anulado e marcado como um minilote com falha para disparar nova tentativa. |
ParallelRunConfig.run_max_try | parallel_run_function.retry_settings.max_retries | O número de repetições quando o minilote estiver com falha ou atingir o tempo limite. Se todas as novas tentativas falharem, o minilote será marcado como falha ao ser contado pelo cálculo mini_batch_error_threshold. |
ParallelRunConfig.append_row_file_name | parallel_run_function.append_row_to | Combinado com a configuração append_row_to . |
ParallelRunConfig.allowed_failed_count | parallel_run_function.mini_batch_error_threshold | O número de minilotes com falha que podem ser ignorados neste trabalho paralelo. Se a contagem do minilote com falha for maior que esse limite, o trabalho paralelo será marcado como com falha. "-1" é o número padrão, o que significa ignorar todos os minilotes com falha durante o trabalho paralelo. |
ParallelRunConfig.allowed_failed_percent | parallel_run_function.task.program_arguments set --allowed_failed_percent |
Semelhante a "allowed_failed_count", mas essa configuração usa o percentual de minilotes com falha, em vez da contagem de falhas do minilote. O intervalo dessa configuração é [0, 100]. "100" é o número padrão, o que significa ignorar todos os minilotes com falha durante o trabalho paralelo. |
ParallelRunConfig.partition_keys | Em Desenvolvimento. | |
ParallelRunConfig.environment_variables | parallel_run_function.environment_variables | Um dicionário de valores e nomes de variáveis de ambiente. Essas variáveis de ambiente são definidas no processo em que o script do usuário está sendo executado. |
ParallelRunStep.name | parallel_run_function.name | Nome do trabalho paralelo ou componente criado. |
ParallelRunStep.inputs | parallel_run_function.inputs | Um dict de entradas usado por esse paralelo. |
-- | parallel_run_function.input_data | Declarar os dados a serem divididos e processados com paralelo |
ParallelRunStep.output | parallel_run_function.outputs | As saídas desse trabalho paralelo. |
ParallelRunStep.side_inputs | parallel_run_function.inputs | Definido em conjunto com inputs . |
ParallelRunStep.arguments | parallel_run_function.task.program_arguments | Os argumentos da tarefa paralela. |
ParallelRunStep.allow_reuse | parallel_run_function.is_deterministic | Especifique se o paralelo retornará a mesma saída considerando a mesma entrada. |
Próximas etapas
Para obter mais informações, confira esta documentação: