Partilhar via


Como usar o Apache Spark (com tecnologia do Azure Synapse Analytics) em seu pipeline de aprendizado de máquina (preterido)

APLICA-SE A: Python SDK azureml v1

Aviso

A integração do Azure Synapse Analytics com o Azure Machine Learning, disponível no Python SDK v1, foi preterida. Os usuários ainda podem usar o espaço de trabalho Synapse, registrado no Azure Machine Learning, como um serviço vinculado. No entanto, já não pode ser registada uma nova área de trabalho do Synapse no Azure Machine Learning como um serviço associado. Recomendamos o uso de computação Spark sem servidor e pools Synapse Spark anexados, disponíveis na CLI v2 e Python SDK v2. Para mais informações, visite https://aka.ms/aml-spark.

Neste artigo, você aprenderá a usar pools do Apache Spark com tecnologia do Azure Synapse Analytics como o destino de computação para uma etapa de preparação de dados em um pipeline do Azure Machine Learning. Você aprende como um único pipeline pode usar recursos de computação adequados para a etapa específica - por exemplo, preparação de dados ou treinamento. Você também aprenderá como os dados são preparados para a etapa do Spark e como eles passam para a próxima etapa.

Pré-requisitos

Você cria e administra seus pools do Apache Spark em um espaço de trabalho do Azure Synapse Analytics. Para integrar um pool do Apache Spark a um espaço de trabalho do Azure Machine Learning, você deve vincular ao espaço de trabalho do Azure Synapse Analytics. Depois de vincular seu espaço de trabalho do Azure Machine Learning e seus espaços de trabalho do Azure Synapse Analytics, você pode anexar um pool do Apache Spark com

  • Azure Machine Learning studio

  • Python SDK, como explicado mais adiante

  • Modelo do Azure Resource Manager (ARM). Para obter mais informações, visite Exemplo de modelo ARM

    • Você pode usar a linha de comando para seguir o modelo ARM, adicionar o serviço vinculado e anexar o pool do Apache Spark com este exemplo de código:
    az deployment group create --name --resource-group <rg_name> --template-file "azuredeploy.json" --parameters @"azuredeploy.parameters.json"
    

Importante

Para vincular com êxito ao espaço de trabalho Synapse, você deve receber a função de Proprietário do espaço de trabalho Synapse. Verifique o acesso no portal do Azure.

O serviço vinculado receberá uma identidade gerenciada atribuída ao sistema (SAI) no momento da criação. Você deve atribuir a este serviço de link SAI a função "Synapse Apache Spark administrator" do Synapse Studio, para que ele possa enviar o trabalho do Spark (consulte Como gerenciar atribuições de função Synapse RBAC no Synapse Studio).

Você também deve dar ao usuário do espaço de trabalho do Azure Machine Learning a função "Colaborador", no portal do Azure de gerenciamento de recursos.

Este código mostra como recuperar serviços vinculados em seu espaço de trabalho:

from azureml.core import Workspace, LinkedService, SynapseWorkspaceLinkedServiceConfiguration

ws = Workspace.from_config()

for service in LinkedService.list(ws) : 
    print(f"Service: {service}")

# Retrieve a known linked service
linked_service = LinkedService.get(ws, 'synapselink1')

Primeiro, Workspace.from_config() acessa seu espaço de trabalho do Azure Machine Learning com a configuração no config.json arquivo. (Para mais informações, visite Crie um arquivo de configuração do espaço de trabalho). Em seguida, o código imprime todos os serviços vinculados disponíveis no espaço de trabalho. Finalmente, LinkedService.get() recupera um serviço vinculado chamado 'synapselink1'.

Anexe seu pool de faísca Apache como um destino de computação para o Azure Machine Learning

Para usar seu pool de faíscas Apache para alimentar uma etapa em seu pipeline de aprendizado de máquina, você deve anexá-lo como um ComputeTarget para a etapa de pipeline, conforme mostrado neste exemplo de código:

from azureml.core.compute import SynapseCompute, ComputeTarget

attach_config = SynapseCompute.attach_configuration(
        linked_service = linked_service,
        type="SynapseSpark",
        pool_name="spark01") # This name comes from your Synapse workspace

synapse_compute=ComputeTarget.attach(
        workspace=ws,
        name='link1-spark01',
        attach_configuration=attach_config)

synapse_compute.wait_for_completion()

O código primeiro configura o SynapseCompute. O linked_service argumento é o LinkedService objeto que você criou ou recuperou na etapa anterior. O type argumento deve ser SynapseSpark. O pool_name argumento em SynapseCompute.attach_configuration() deve corresponder ao de um pool existente em seu espaço de trabalho do Azure Synapse Analytics. Para obter mais informações sobre a criação de um pool de faíscas Apache no espaço de trabalho do Azure Synapse Analytics, visite Guia de início rápido: criar um pool Apache Spark sem servidor usando o Synapse Studio. O attach_config tipo é ComputeTargetAttachConfiguration.

Após a criação da configuração, crie um aprendizado ComputeTarget de máquina passando os valores e ComputeTargetAttachConfiguration e Workspace o nome pelo qual você gostaria de se referir à computação dentro do espaço de trabalho de aprendizado de máquina. A chamada para ComputeTarget.attach() é assíncrona, portanto, o exemplo é bloqueado até que a chamada seja concluída.

Crie um SynapseSparkStep que use o pool vinculado do Apache Spark

O exemplo de trabalho do Spark do bloco de anotações no pool de faíscas do Apache define um pipeline de aprendizado de máquina simples. Primeiro, o bloco de anotações define uma etapa de preparação de dados, alimentada synapse_compute pela definida na etapa anterior. Em seguida, o notebook define uma etapa de treinamento alimentada por um alvo de computação mais apropriado para treinamento. O caderno de exemplo usa o banco de dados de sobrevivência do Titanic para mostrar a entrada e saída de dados. Na verdade, ele não limpa os dados ou faz um modelo preditivo. Como esse exemplo realmente não envolve treinamento, a etapa de treinamento usa um recurso de computação barato e baseado em CPU.

Os dados fluem para um pipeline de aprendizado de máquina por meio DatasetConsumptionConfig de objetos, que podem conter dados tabulares ou conjuntos de arquivos. Os dados geralmente vêm de arquivos no armazenamento de blob em um armazenamento de dados de espaço de trabalho. Este exemplo de código mostra o código típico que cria entrada para um pipeline de aprendizado de máquina:

from azureml.core import Dataset

datastore = ws.get_default_datastore()
file_name = 'Titanic.csv'

titanic_tabular_dataset = Dataset.Tabular.from_delimited_files(path=[(datastore, file_name)])
step1_input1 = titanic_tabular_dataset.as_named_input("tabular_input")

# Example only: it wouldn't make sense to duplicate input data, especially one as tabular and the other as files
titanic_file_dataset = Dataset.File.from_files(path=[(datastore, file_name)])
step1_input2 = titanic_file_dataset.as_named_input("file_input").as_hdfs()

O exemplo de código pressupõe que o arquivo Titanic.csv está no armazenamento de blob. O código mostra como ler o arquivo como um TabularDataset e como um FileDatasetarquivo . Este código é apenas para fins de demonstração, porque se tornaria confuso duplicar entradas ou interpretar uma única fonte de dados como um recurso contendo tabela e estritamente como um arquivo.

Importante

Para usar um FileDataset como entrada, você precisa de uma azureml-core versão de pelo menos 1.20.0. Você pode especificar isso com a classe, conforme discutido Environment mais adiante. Quando uma etapa for concluída, você poderá armazenar os dados de saída, conforme mostrado neste exemplo de código:

from azureml.data import HDFSOutputDatasetConfig
step1_output = HDFSOutputDatasetConfig(destination=(datastore,"test")).register_on_complete(name="registered_dataset")

Neste exemplo de código, o datastore armazenaria os dados em um arquivo chamado test. Os dados estariam disponíveis no espaço de trabalho de aprendizado de máquina como um Dataset, com o nome registered_dataset.

Além dos dados, uma etapa de pipeline pode ter dependências Python por etapa. Além disso, objetos individuais SynapseSparkStep podem especificar sua configuração precisa do Azure Synapse Apache Spark. Para mostrar isso, o exemplo de código a seguir especifica que a versão do azureml-core pacote deve ser pelo menos 1.20.0. Como mencionado anteriormente, este requisito para o azureml-core pacote é necessário para usar um FileDataset como entrada.

from azureml.core.environment import Environment
from azureml.pipeline.steps import SynapseSparkStep

env = Environment(name="myenv")
env.python.conda_dependencies.add_pip_package("azureml-core>=1.20.0")

step_1 = SynapseSparkStep(name = 'synapse-spark',
                          file = 'dataprep.py',
                          source_directory="./code", 
                          inputs=[step1_input1, step1_input2],
                          outputs=[step1_output],
                          arguments = ["--tabular_input", step1_input1, 
                                       "--file_input", step1_input2,
                                       "--output_dir", step1_output],
                          compute_target = 'link1-spark01',
                          driver_memory = "7g",
                          driver_cores = 4,
                          executor_memory = "7g",
                          executor_cores = 2,
                          num_executors = 1,
                          environment = env)

Este código especifica uma única etapa no pipeline do Azure Machine Learning. O environment valor desse código define uma versão específica azureml-core , e o código pode adicionar outras dependências conda ou pip conforme necessário.

O SynapseSparkStep zips e carrega o ./code subdiretório do computador local. Esse diretório é recriado no servidor de computação e a etapa executa o dataprep.py script a partir desse diretório. O inputs e outputs dessa etapa são o step1_input1, step1_input2e step1_output objetos discutidos anteriormente. A maneira mais fácil de acessar esses valores dentro do dataprep.py script é associá-los ao nome arguments.

O próximo conjunto de argumentos para o construtor controla o SynapseSparkStep Apache Spark. O compute_target é o 'link1-spark01' que anexamos como um destino de computação anteriormente. Os outros parâmetros especificam a memória e os núcleos que gostaríamos de usar.

O bloco de anotações de exemplo usa este código para dataprep.py:

import os
import sys
import azureml.core
from pyspark.sql import SparkSession
from azureml.core import Run, Dataset

print(azureml.core.VERSION)
print(os.environ)

import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--tabular_input")
parser.add_argument("--file_input")
parser.add_argument("--output_dir")
args = parser.parse_args()

# use dataset sdk to read tabular dataset
run_context = Run.get_context()
dataset = Dataset.get_by_id(run_context.experiment.workspace,id=args.tabular_input)
sdf = dataset.to_spark_dataframe()
sdf.show()

# use hdfs path to read file dataset
spark= SparkSession.builder.getOrCreate()
sdf = spark.read.option("header", "true").csv(args.file_input)
sdf.show()

sdf.coalesce(1).write\
.option("header", "true")\
.mode("append")\
.csv(args.output_dir)

Este script de "preparação de dados" não faz nenhuma transformação de dados real, mas mostra como recuperar dados, convertê-los em um dataframe do Spark e como fazer alguma manipulação básica do Apache Spark. Para localizar a saída no estúdio do Azure Machine Learning, abra o trabalho filho, escolha a guia Saídas + logs e abra o logs/azureml/driver/stdout arquivo, conforme mostrado nesta captura de tela:

Captura de tela do estúdio mostrando a guia stdout do trabalho infantil

Use o SynapseSparkStep em um pipeline

O próximo exemplo usa a saída do SynapseSparkStep criado na seção anterior. Outras etapas no pipeline podem ter seus próprios ambientes exclusivos e podem ser executadas em diferentes recursos de computação apropriados para a tarefa em questão. O bloco de anotações de exemplo executa a "etapa de treinamento" em um pequeno cluster de CPU:

from azureml.core.compute import AmlCompute

cpu_cluster_name = "cpucluster"

if cpu_cluster_name in ws.compute_targets:
    cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
    print('Found existing cluster, use it.')
else:
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D2_V2', max_nodes=1)
    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)
    print('Allocating new CPU compute cluster')

cpu_cluster.wait_for_completion(show_output=True)

step2_input = step1_output.as_input("step2_input").as_download()

step_2 = PythonScriptStep(script_name="train.py",
                          arguments=[step2_input],
                          inputs=[step2_input],
                          compute_target=cpu_cluster_name,
                          source_directory="./code",
                          allow_reuse=False)

Esse código cria o novo recurso de computação, se necessário. Em seguida, ele converte o step1_output resultado em entrada para a etapa de treinamento. A as_download() opção significa que os dados são movidos para o recurso de computação, resultando em acesso mais rápido. Se os dados forem tão grandes que não caberiam no disco rígido de computação local, você deverá usar a as_mount() opção para transmitir os dados com o FUSE sistema de arquivos. A compute_target segunda etapa é 'cpucluster', não o 'link1-spark01' recurso usado na etapa de preparação de dados. Esta etapa usa um script simples train.py em vez do dataprep.py script usado na etapa anterior. O bloco de anotações de exemplo tem detalhes do train.py script.

Depois de definir todas as etapas, você pode criar e executar seu pipeline.

from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[step_1, step_2])
pipeline_run = pipeline.submit('synapse-pipeline', regenerate_outputs=True)

Esse código cria um pipeline que consiste na etapa de preparação de dados em pools do Apache Spark, alimentada pelo Azure Synapse Analytics (step_1) e na etapa de treinamento (step_2). O Azure examina as dependências de dados entre as etapas para calcular o gráfico de execução. Neste caso, há apenas uma dependência direta. Aqui, step2_input necessariamente requer step1_output.

A pipeline.submit chamada cria, se necessário, um Experimento chamado synapse-pipeline, e inicia de forma assíncrona um Trabalho dentro dele. As etapas individuais dentro do pipeline são executadas como trabalhos filhos desse trabalho principal, e a página Experimentos do Studio pode monitorar e revisar essas etapas.

Próximos passos