Partilhar via


Usar trabalhos paralelos em pipelines

APLICA-SE A:Azure CLI ml extension v2 (current)Python SDK azure-ai-ml v2 (current)

Este artigo explica como usar a CLI v2 e o Python SDK v2 para executar trabalhos paralelos nos pipelines do Azure Machine Learning. Trabalhos paralelos aceleram a execução de tarefas distribuindo tarefas repetidas em poderosos clusters de computação de vários nós.

Os engenheiros de aprendizado de máquina sempre têm requisitos de escala em suas tarefas de treinamento ou inferência. Por exemplo, quando um cientista de dados fornece um único script para treinar um modelo de previsão de vendas, os engenheiros de aprendizado de máquina precisam aplicar essa tarefa de treinamento a cada armazenamento de dados individual. Os desafios desse processo de expansão incluem longos tempos de execução que causam atrasos e problemas inesperados que exigem intervenção manual para manter a tarefa em execução.

O trabalho principal da paralelização do Azure Machine Learning é dividir uma única tarefa serial em minilotes e despachar esses minilotes para vários cálculos para execução em paralelo. Os trabalhos paralelos reduzem significativamente o tempo de execução de ponta a ponta e também lidam com erros automaticamente. Considere usar o trabalho paralelo do Azure Machine Learning para treinar muitos modelos sobre seus dados particionados ou para acelerar suas tarefas de inferência em lote em grande escala.

Por exemplo, em um cenário em que você está executando um modelo de deteção de objeto em um grande conjunto de imagens, os trabalhos paralelos do Aprendizado de Máquina do Azure permitem distribuir facilmente suas imagens para executar código personalizado em paralelo em um cluster de computação específico. A paralelização pode reduzir significativamente o custo de tempo. Os trabalhos paralelos do Azure Machine Learning também podem simplificar e automatizar seu processo para torná-lo mais eficiente.

Pré-requisitos

  • Tenha uma conta e um espaço de trabalho do Azure Machine Learning.
  • Entenda os pipelines do Azure Machine Learning.

Criar e executar um pipeline com uma etapa de trabalho paralela

Um trabalho paralelo do Azure Machine Learning pode ser usado apenas como uma etapa em um trabalho de pipeline.

Os exemplos a seguir vêm de Executar um trabalho de pipeline usando trabalho paralelo em pipeline no repositório de exemplos do Aprendizado de Máquina do Azure.

Prepare-se para a paralelização

Esta etapa de trabalho paralelo requer preparação. Você precisa de um script de entrada que implemente as funções predefinidas. Você também precisa definir atributos em sua definição de trabalho paralelo que:

  • Defina e vincule seus dados de entrada.
  • Defina o método de divisão de dados.
  • Configure seus recursos de computação.
  • Chame o script de entrada.

As seções a seguir descrevem como preparar o trabalho paralelo.

Declarar a configuração de entradas e divisão de dados

Um trabalho paralelo requer uma entrada importante para ser dividida e processada em paralelo. O principal formato de dados de entrada pode ser dados tabulares ou uma lista de arquivos.

Diferentes formatos de dados têm diferentes tipos de entrada, modos de entrada e métodos de divisão de dados. A tabela a seguir descreve as opções:

Formato dos dados Input type Modo de entrada Método de divisão de dados
Lista de ficheiros mltable ou uri_folder ro_mount ou download Por tamanho (número de ficheiros) ou por partição
Dados tabulares mltable direct Por tamanho (tamanho físico estimado) ou por partição

Nota

Se você usar tabular mltable como seus principais dados de entrada, você precisa:

  • Instale a mltable biblioteca em seu ambiente, como na linha 9 deste arquivo conda.
  • Tenha um arquivo de especificação MLTable sob seu caminho especificado com a transformations: - read_delimited: seção preenchida. Para obter exemplos, consulte Criar e gerenciar ativos de dados.

Você pode declarar seus principais dados de entrada com o input_data atributo no trabalho paralelo YAML ou Python e vincular os dados com o definido input do seu trabalho paralelo usando ${{inputs.<input name>}}. Em seguida, você define o atributo de divisão de dados para sua entrada principal, dependendo do seu método de divisão de dados.

Método de divisão de dados Attribute name Tipo de atributo Exemplo de trabalho
Por tamanho mini_batch_size string Previsão de lote Iris
Por partição partition_keys Lista de cadeias de caracteres Previsão de vendas de suco de laranja

Configurar os recursos de computação para paralelização

Depois de definir o atributo de divisão de dados, configure os recursos de computação para sua paralelização definindo os instance_count atributos e max_concurrency_per_instance .

Attribute name Type Description Default value
instance_count integer O número de nós a serem usados para o trabalho. 1
max_concurrency_per_instance integer O número de processadores em cada nó. Para um cálculo de GPU: 1. Para um cálculo de CPU: número de núcleos.

Esses atributos funcionam em conjunto com o cluster de computação especificado, conforme mostrado no diagrama a seguir:

Diagrama mostrando como os dados distribuídos funcionam em trabalho paralelo.

Chamar o script de entrada

O script de entrada é um único arquivo Python que implementa as seguintes três funções predefinidas com código personalizado.

Nome da função Necessário Description Entrada Devolver
Init() Y Preparação comum antes de começar a executar mini-lotes. Por exemplo, use essa função para carregar o modelo em um objeto global. -- --
Run(mini_batch) Y Implementa a lógica de execução principal para minilotes. mini_batch é pandas dataframe se os dados de entrada forem dados tabulares, ou file path list se os dados de entrada forem um diretório. Dataframe, lista ou tupla.
Shutdown() N Função opcional para fazer limpezas personalizadas antes de retornar o cálculo para o pool. -- --

Importante

Para evitar exceções ao analisar argumentos em Init() ou Run(mini_batch) funções, use parse_known_args em vez de parse_args. Consulte o exemplo iris_score para um script de entrada com analisador de argumentos.

Importante

A Run(mini_batch) função requer um retorno de um dataframe, lista ou item de tupla. O trabalho paralelo usa a contagem desse retorno para medir os itens de sucesso desse minilote. A contagem de minilotes deve ser igual à contagem da lista de retornos se todos os itens tiverem sido processados.

O trabalho paralelo executa as funções em cada processador, conforme mostrado no diagrama a seguir.

Diagrama mostrando como o script de entrada funciona em trabalho paralelo.

Veja os seguintes exemplos de script de entrada:

Para chamar o script de entrada, defina os dois atributos a seguir na definição de trabalho paralelo:

Attribute name Type Description
code string Caminho local para o diretório de código-fonte a ser carregado e usado para o trabalho.
entry_script string O arquivo Python que contém a implementação de funções paralelas predefinidas.

Exemplo de etapa de trabalho paralelo

A etapa de trabalho paralelo a seguir declara o tipo de entrada, o modo e o método de divisão de dados, vincula a entrada, configura a computação e chama o script de entrada.

batch_prediction:
  type: parallel
  compute: azureml:cpu-cluster
  inputs:
    input_data: 
      type: mltable
      path: ./neural-iris-mltable
      mode: direct
    score_model: 
      type: uri_folder
      path: ./iris-model
      mode: download
  outputs:
    job_output_file:
      type: uri_file
      mode: rw_mount

  input_data: ${{inputs.input_data}}
  mini_batch_size: "10kb"
  resources:
      instance_count: 2
  max_concurrency_per_instance: 2

  logging_level: "DEBUG"
  mini_batch_error_threshold: 5
  retry_settings:
    max_retries: 2
    timeout: 60

  task:
    type: run_function
    code: "./script"
    entry_script: iris_prediction.py
    environment:
      name: "prs-env"
      version: 1
      image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04
      conda_file: ./environment/environment_parallel.yml

Considere as configurações de automação

O trabalho paralelo do Azure Machine Learning expõe muitas configurações opcionais que podem controlar automaticamente o trabalho sem intervenção manual. A tabela a seguir descreve essas configurações.

Chave Tipo Description Valores permitidos Default value Definir em atributo ou argumento de programa
mini_batch_error_threshold integer Número de minilotes com falha a serem ignorados neste trabalho paralelo. Se a contagem de minilotes com falha for maior que esse limite, o trabalho paralelo será marcado como falha.

O minilote é marcado como reprovado se:
- A contagem de retorno de é menor do que a contagem de entrada de run() minilote.
- As exceções são capturadas no código personalizado run() .
[-1, int.max] -1, o que significa ignorar todos os minilotes com falha Atributo mini_batch_error_threshold
mini_batch_max_retries integer Número de novas tentativas quando o minilote falha ou atinge o tempo limite. Se todas as novas tentativas falharem, o minilote será marcado como falha de acordo com o mini_batch_error_threshold cálculo. [0, int.max] 2 Atributo retry_settings.max_retries
mini_batch_timeout integer Tempo limite em segundos para executar a função personalizada run() . Se o tempo de execução for maior que esse limite, o minilote será abortado e marcado como falha ao acionar a nova tentativa. (0, 259200] 60 Atributo retry_settings.timeout
item_error_threshold integer O limite de itens com falha. Os itens com falha são contados pela diferença numérica entre entradas e retornos de cada minilote. Se a soma dos itens com falha for maior que esse limite, o trabalho paralelo será marcado como falha. [-1, int.max] -1, o que significa ignorar todas as falhas durante o trabalho paralelo Argumento do programa
--error_threshold
allowed_failed_percent integer Semelhante ao mini_batch_error_threshold, mas usa a porcentagem de minilotes com falha em vez da contagem. [0, 100] 100 Argumento do programa
--allowed_failed_percent
overhead_timeout integer Tempo limite em segundos para inicialização de cada minilote. Por exemplo, carregue dados de minilote e passe-os para a run() função. (0, 259200] 600 Argumento do programa
--task_overhead_timeout
progress_update_timeout integer Tempo limite em segundos para monitorar o progresso da execução de minilotes. Se nenhuma atualização de progresso for recebida nessa configuração de tempo limite, o trabalho paralelo será marcado como falha. (0, 259200] Calculado dinamicamente por outras configurações Argumento do programa
--progress_update_timeout
first_task_creation_timeout integer Tempo limite em segundos para monitorar o tempo entre o início do trabalho e a execução do primeiro minilote. (0, 259200] 600 Argumento do programa
--first_task_creation_timeout
logging_level string O nível de logs a serem despejados nos arquivos de log do usuário. INFO, WARNING ou DEBUG INFO Atributo logging_level
append_row_to string Agregar todos os retornos de cada execução do minilote e enviá-los para este arquivo. Pode referir-se a uma das saídas do trabalho paralelo usando a expressão ${{outputs.<output_name>}} Atributo task.append_row_to
copy_logs_to_parent string Opção booleana se deseja copiar o progresso, a visão geral e os logs do trabalho para o trabalho de pipeline pai. True ou False False Argumento do programa
--copy_logs_to_parent
resource_monitor_interval integer Intervalo de tempo em segundos para despejar o uso de recursos do nó (por exemplo, cpu ou memória) para a pasta de log sob o caminho logs/sys/perf .

Nota: Logs frequentes de recursos de despejo reduzem ligeiramente a velocidade de execução. Defina esse valor como 0 para interromper o uso de recursos de despejo.
[0, int.max] 600 Argumento do programa
--resource_monitor_interval

O código de exemplo a seguir atualiza essas configurações:

batch_prediction:
  type: parallel
  compute: azureml:cpu-cluster
  inputs:
    input_data: 
      type: mltable
      path: ./neural-iris-mltable
      mode: direct
    score_model: 
      type: uri_folder
      path: ./iris-model
      mode: download
  outputs:
    job_output_file:
      type: uri_file
      mode: rw_mount

  input_data: ${{inputs.input_data}}
  mini_batch_size: "10kb"
  resources:
      instance_count: 2
  max_concurrency_per_instance: 2

  logging_level: "DEBUG"
  mini_batch_error_threshold: 5
  retry_settings:
    max_retries: 2
    timeout: 60

  task:
    type: run_function
    code: "./script"
    entry_script: iris_prediction.py
    environment:
      name: "prs-env"
      version: 1
      image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04
      conda_file: ./environment/environment_parallel.yml
    program_arguments: >-
      --model ${{inputs.score_model}}
      --error_threshold 5
      --allowed_failed_percent 30
      --task_overhead_timeout 1200
      --progress_update_timeout 600
      --first_task_creation_timeout 600
      --copy_logs_to_parent True
      --resource_monitor_interva 20
    append_row_to: ${{outputs.job_output_file}}

Criar o pipeline com a etapa de trabalho paralelo

O exemplo a seguir mostra o trabalho de pipeline completo com a etapa de trabalho paralelo embutida:

$schema: https://azuremlschemas.azureedge.net/latest/pipelineJob.schema.json
type: pipeline

display_name: iris-batch-prediction-using-parallel
description: The hello world pipeline job with inline parallel job
tags:
  tag: tagvalue
  owner: sdkteam

settings:
  default_compute: azureml:cpu-cluster

jobs:
  batch_prediction:
    type: parallel
    compute: azureml:cpu-cluster
    inputs:
      input_data: 
        type: mltable
        path: ./neural-iris-mltable
        mode: direct
      score_model: 
        type: uri_folder
        path: ./iris-model
        mode: download
    outputs:
      job_output_file:
        type: uri_file
        mode: rw_mount

    input_data: ${{inputs.input_data}}
    mini_batch_size: "10kb"
    resources:
        instance_count: 2
    max_concurrency_per_instance: 2

    logging_level: "DEBUG"
    mini_batch_error_threshold: 5
    retry_settings:
      max_retries: 2
      timeout: 60

    task:
      type: run_function
      code: "./script"
      entry_script: iris_prediction.py
      environment:
        name: "prs-env"
        version: 1
        image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04
        conda_file: ./environment/environment_parallel.yml
      program_arguments: >-
        --model ${{inputs.score_model}}
        --error_threshold 5
        --allowed_failed_percent 30
        --task_overhead_timeout 1200
        --progress_update_timeout 600
        --first_task_creation_timeout 600
        --copy_logs_to_parent True
        --resource_monitor_interva 20
      append_row_to: ${{outputs.job_output_file}}

Enviar o trabalho de pipeline

Envie seu trabalho de pipeline com etapa paralela usando o az ml job create comando CLI:

az ml job create --file pipeline.yml

Verifique a etapa paralela na interface do usuário do estúdio

Depois de enviar um trabalho de pipeline, o widget SDK ou CLI fornece um link de URL da Web para o gráfico de pipeline na interface do usuário do estúdio de Aprendizado de Máquina do Azure.

Para exibir os resultados do trabalho paralelo, clique duas vezes na etapa paralela no gráfico de pipeline, selecione a guia Configurações no painel de detalhes, expanda Configurações de execução e expanda a seção Paralelo.

Captura de ecrã do estúdio do Azure Machine Learning a mostrar as definições de trabalho paralelo.

Para depurar falha de trabalho paralelo, selecione a guia Saídas + logs , expanda a pasta logs e verifique job_result.txt para entender por que o trabalho paralelo falhou. Para obter informações sobre a estrutura de log de trabalhos paralelos, consulte readme.txt na mesma pasta.

Captura de ecrã do estúdio do Azure Machine Learning no separador de trabalhos a mostrar os resultados do trabalho paralelo.