Partilhar via


Disputa interativa de dados com o Apache Spark no Azure Machine Learning

A disputa de dados torna-se um dos aspetos mais importantes dos projetos de aprendizado de máquina. A integração da integração do Azure Machine Learning com o Azure Synapse Analytics fornece acesso a um pool do Apache Spark - apoiado pelo Azure Synapse - para disputa de dados interativa que usa os Blocos de Anotações do Azure Machine Learning.

Neste artigo, você aprenderá a lidar com a disputa de dados usando

  • Computação do Spark sem servidor
  • Piscina Synapse Spark anexada

Pré-requisitos

  • Uma assinatura do Azure; se você não tiver uma assinatura do Azure, crie uma conta gratuita antes de começar.
  • Uma área de trabalho do Azure Machine Learning. Visite Criar recursos de espaço de trabalho para obter mais informações.
  • Uma conta de armazenamento do Azure Data Lake Storage (ADLS) Gen 2. Visite Criar uma conta de armazenamento do Azure Data Lake Storage (ADLS) Gen 2 para obter mais informações.
  • (Opcional): Um Cofre de Chaves do Azure. Visite Criar um Cofre de Chaves do Azure para obter mais informações.
  • (Opcional): Uma entidade de serviço. Visite Criar uma entidade de serviço para obter mais informações.
  • (Opcional): um pool Synapse Spark anexado no espaço de trabalho do Azure Machine Learning.

Antes de iniciar suas tarefas de disputa de dados, saiba mais sobre o processo de armazenamento de segredos

  • Chave de acesso da conta de armazenamento de Blob do Azure
  • Token de Assinatura de Acesso Compartilhado (SAS)
  • Informações principais do serviço Azure Data Lake Storage (ADLS) Gen 2

no Cofre da Chave do Azure. Você também precisa saber como lidar com atribuições de função nas contas de armazenamento do Azure. As seções a seguir neste documento descrevem esses conceitos. Em seguida, exploramos os detalhes da disputa interativa de dados, usando os pools do Spark nos Blocos de Anotações do Azure Machine Learning.

Gorjeta

Para saber mais sobre a configuração de atribuição de função de conta de armazenamento do Azure ou se você acessar dados em suas contas de armazenamento usando passagem de identidade de usuário, visite Adicionar atribuições de função em contas de armazenamento do Azure para obter mais informações.

Disputa interativa de dados com o Apache Spark

Para disputa de dados interativa com o Apache Spark nos Blocos de Anotações do Azure Machine Learning, o Azure Machine Learning oferece computação Spark sem servidor e pool Synapse Spark anexado. A computação do Spark sem servidor não requer a criação de recursos no espaço de trabalho do Azure Synapse. Em vez disso, uma computação Spark sem servidor totalmente gerenciada fica disponível diretamente nos Blocos de Anotações do Azure Machine Learning. O uso de uma computação do Spark sem servidor é a maneira mais fácil de acessar um cluster do Spark no Azure Machine Learning.

Computação do Serverless Spark nos Blocos de Anotações do Azure Machine Learning

Uma computação do Spark sem servidor está disponível nos Blocos de Anotações do Azure Machine Learning por padrão. Para acessá-lo em um bloco de anotações, selecione Serverless Spark Compute em Azure Machine Learning Serverless Spark no menu de seleção Compute .

A interface do usuário do Notebooks também fornece opções para a configuração de sessão do Spark para a computação do Spark sem servidor. Para configurar uma sessão do Spark:

  1. Selecione Configurar sessão na parte superior da tela.
  2. Selecione Versão do Apache Spark no menu suspenso.

    Importante

    Azure Synapse Runtime for Apache Spark: Anúncios

    • Azure Synapse Runtime para Apache Spark 3.2:
      • Data de anúncio EOLA: 8 de julho de 2023
      • Data de fim do suporte: 8 de julho de 2024. Após essa data, o tempo de execução será desativado.
    • Apache Spark 3.3:
      • Data de anúncio EOLA: 12 de julho de 2024
      • Data de fim do suporte: 31 de março de 2025. Após essa data, o tempo de execução será desativado.
    • Para suporte contínuo e desempenho ideal, aconselhamos a migração para o Apache Spark 3.4
  3. Selecione Tipo de instância no menu suspenso. Estes tipos são atualmente suportados:
    • Standard_E4s_v3
    • Standard_E8s_v3
    • Standard_E16s_v3
    • Standard_E32s_v3
    • Standard_E64s_v3
  4. Insira um valor de tempo limite da Sessão Spark, em minutos.
  5. Selecione se deseja ou não alocar executores dinamicamente
  6. Selecione o número de Executores para a sessão do Spark.
  7. Selecione Tamanho do executor no menu suspenso.
  8. Selecione Tamanho do driver no menu suspenso.
  9. Para usar um arquivo Conda para configurar uma sessão do Spark, marque a caixa de seleção Carregar arquivo conda. Em seguida, selecione Procurar e escolha o arquivo Conda com a configuração de sessão do Spark desejada.
  10. Adicione propriedades de definições de configuração, valores de entrada nas caixas de texto Propriedade e Valor e selecione Adicionar.
  11. Selecione Aplicar.
  12. No pop-up Configurar nova sessão?, selecione Parar sessão.

As alterações de configuração da sessão persistem e ficam disponíveis para outra sessão de bloco de anotações que é iniciada usando a computação do Spark sem servidor.

Gorjeta

Se você usar pacotes Conda no nível da sessão, poderá melhorar o tempo de início a frio da sessão do Spark se definir a variável spark.hadoop.aml.enable_cache de configuração como true. Um início frio de sessão com pacotes Conda de nível de sessão normalmente leva de 10 a 15 minutos quando a sessão começa pela primeira vez. No entanto, o início frio da sessão subsequente com a variável de configuração definida como true normalmente leva de três a cinco minutos.

Importar e processar dados do Azure Data Lake Storage (ADLS) Gen 2

Você pode acessar e processar dados armazenados em contas de armazenamento do Azure Data Lake Storage (ADLS) Gen 2 com abfss:// URIs de dados. Para fazer isso, você deve seguir um dos dois mecanismos de acesso a dados:

  • Passagem de identidade do usuário
  • Acesso a dados baseado em entidade de serviço

Gorjeta

A disputa de dados com uma computação Spark sem servidor e a passagem de identidade do usuário para acessar dados em uma conta de armazenamento do Azure Data Lake Storage (ADLS) Gen 2 exigem o menor número de etapas de configuração.

Para iniciar a disputa interativa de dados com a passagem de identidade do usuário:

  • Verifique se a identidade do usuário tem atribuições de função de Colaborador e Colaborador de Dados de Blob de Armazenamento na conta de armazenamento do Azure Data Lake Storage (ADLS) Gen 2.

  • Para usar a computação do Spark sem servidor, selecione Serverless Spark Compute em Azure Machine Learning Serverless Spark no menu de seleção Compute .

  • Para usar um pool Synapse Spark anexado, selecione um pool Synapse Spark anexado em Synapse Spark pools no menu Seleção de computação.

  • Este exemplo de código de disputa de dados do Titanic mostra o uso de um URI de dados no formato abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA> com pyspark.pandas e pyspark.ml.feature.Imputer.

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

    Nota

    Este exemplo de código Python usa pyspark.pandas. Somente o tempo de execução do Spark versão 3.2 ou posterior suporta isso.

Para processar dados por meio de acesso por meio de uma entidade de serviço:

  1. Verifique se a entidade de serviço tem atribuições de função de Colaborador e Colaborador de Dados de Blob de Armazenamento na conta de armazenamento do Azure Data Lake Storage (ADLS) Gen 2.

  2. Crie segredos do Cofre da Chave do Azure para a ID do locatário da entidade de serviço, a ID do cliente e os valores do segredo do cliente.

  3. No menu de seleção Computação, selecione Computação sem servidor em Azure Machine Learning Serverless Spark. Você também pode selecionar um pool Synapse Spark anexado em Synapse Spark pools no menu de seleção Compute.

  4. Defina os valores ID do locatário da entidade de serviço, ID do cliente e segredo do cliente na configuração e execute o exemplo de código a seguir.

    • A get_secret() chamada no código depende do nome do Cofre da Chave do Azure e dos nomes dos segredos do Cofre da Chave do Azure criados para a ID do locatário da entidade de serviço, a ID do cliente e o segredo do cliente. Defina estes nomes/valores de propriedade correspondentes na configuração:

      • Propriedade ID do cliente: fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Propriedade secreta do cliente: fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Propriedade ID do inquilino: fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Valor da ID do locatário: https://login.microsoftonline.com/<TENANT_ID>/oauth2/token
      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      
      # Set up service principal tenant ID, client ID and secret from Azure Key Vault
      client_id = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_ID_SECRET_NAME>")
      tenant_id = token_library.getSecret("<KEY_VAULT_NAME>", "<TENANT_ID_SECRET_NAME>")
      client_secret = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_SECRET_NAME>")
      
      # Set up service principal which has access of the data
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.auth.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "OAuth"
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth.provider.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          client_id,
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          client_secret,
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          "https://login.microsoftonline.com/" + tenant_id + "/oauth2/token",
      )
      
  5. Usando os dados do Titanic, importe e faça o wrangle usando o URI de dados no abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA> formato, conforme mostrado no exemplo de código.

Importar e processar dados do armazenamento de Blob do Azure

Você pode acessar os dados de armazenamento de Blob do Azure com a chave de acesso da conta de armazenamento ou um token de assinatura de acesso compartilhado (SAS). Você deve armazenar essas credenciais no Cofre da Chave do Azure como um segredo e defini-las como propriedades na configuração da sessão.

Para iniciar a disputa interativa de dados:

  1. No painel esquerdo do estúdio de Aprendizado de Máquina do Azure, selecione Blocos de Anotações.

  2. No menu de seleção Computação, selecione Computação sem servidor em Azure Machine Learning Serverless Spark. Você também pode selecionar um pool Synapse Spark anexado em Synapse Spark pools no menu de seleção Compute.

  3. Para configurar a chave de acesso da conta de armazenamento ou um token de assinatura de acesso compartilhado (SAS) para acesso a dados nos Blocos de Anotações do Azure Machine Learning:

    • Para a chave de acesso, defina a fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net propriedade, conforme mostrado neste trecho de código:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      access_key = token_library.getSecret("<KEY_VAULT_NAME>", "<ACCESS_KEY_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net", access_key
      )
      
    • Para o token SAS, defina a fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net propriedade, conforme mostrado neste trecho de código:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      sas_token = token_library.getSecret("<KEY_VAULT_NAME>", "<SAS_TOKEN_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net",
          sas_token,
      )
      

      Nota

      As get_secret() chamadas nos trechos de código anteriores exigem o nome do Cofre da Chave do Azure e os nomes dos segredos criados para a chave de acesso da conta de armazenamento de Blob do Azure ou token SAS.

  4. Execute o código de disputa de dados no mesmo bloco de anotações. Formate o URI de dados como wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/<PATH_TO_DATA>, semelhante ao que este trecho de código mostra:

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

    Nota

    Este exemplo de código Python usa pyspark.pandas. Somente o tempo de execução do Spark versão 3.2 ou posterior suporta isso.

Importar e processar dados do Azure Machine Learning Datastore

Para acessar dados do Repositório de Dados do Azure Machine Learning, defina um caminho para os dados no armazenamento de dados com o formato azureml://datastores/<DATASTORE_NAME>/paths/<PATH_TO_DATA>URI. Para extrair dados de um armazenamento de dados do Azure Machine Learning em uma sessão do Notebooks de forma interativa:

  1. Selecione Serverless Spark compute em Azure Machine Learning Serverless Spark no menu de seleção Compute ou selecione um pool Synapse Spark anexado em Synapse Spark pools no menu de seleção Compute .

  2. Este exemplo de código mostra como ler e manipular dados do Titanic de um armazenamento de dados do Azure Machine Learning, usando azureml:// o URI do armazenamento de dados, pyspark.pandase pyspark.ml.feature.Imputer.

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "azureml://datastores/workspaceblobstore/paths/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "azureml://datastores/workspaceblobstore/paths/data/wrangled",
        index_col="PassengerId",
    )
    

    Nota

    Este exemplo de código Python usa pyspark.pandas. Somente o tempo de execução do Spark versão 3.2 ou posterior suporta isso.

Os armazenamentos de dados do Azure Machine Learning podem acessar dados usando credenciais de conta de armazenamento do Azure

  • chave de acesso
  • Token de SAS
  • principal de serviço

ou usam acesso a dados sem credenciais. Dependendo do tipo de armazenamento de dados e do tipo de conta de armazenamento subjacente do Azure, selecione um mecanismo de autenticação apropriado para garantir o acesso aos dados. Esta tabela resume os mecanismos de autenticação para acessar dados nos armazenamentos de dados do Azure Machine Learning:

Storage account type Acesso a dados sem credenciais Mecanismo de acesso aos dados Atribuições de funções
Blob do Azure Não Chave de acesso ou token SAS Não são necessárias atribuições de função
Blob do Azure Sim Passagem de identidade do usuário* A identidade do usuário deve ter atribuições de função apropriadas na conta de armazenamento de Blob do Azure
Armazenamento Azure Data Lake (ADLS) Gen 2 Não Service principal (Principal de serviço) A entidade de serviço deve ter atribuições de função apropriadas na conta de armazenamento do Azure Data Lake Storage (ADLS) Gen 2
Armazenamento Azure Data Lake (ADLS) Gen 2 Sim Passagem de identidade do usuário A identidade do usuário deve ter atribuições de função apropriadas na conta de armazenamento do Azure Data Lake Storage (ADLS) Gen 2

* A passagem de identidade do usuário funciona para armazenamentos de dados sem credenciais que apontam para contas de armazenamento de Blob do Azure, somente se a exclusão suave não estiver habilitada.

Acessando dados no compartilhamento de arquivos padrão

O compartilhamento de arquivos padrão é montado na computação do Spark sem servidor e nos pools Synapse Spark anexados.

Captura de ecrã a mostrar a utilização de uma partilha de ficheiros.

No estúdio do Azure Machine Learning, os arquivos no compartilhamento de arquivos padrão são mostrados na árvore de diretórios na guia Arquivos . O código do bloco de anotações pode acessar diretamente os arquivos armazenados neste compartilhamento de arquivos com o file:// protocolo, juntamente com o caminho absoluto do arquivo, sem mais configurações. Este trecho de código mostra como acessar um arquivo armazenado no compartilhamento de arquivos padrão:

import os
import pyspark.pandas as pd
from pyspark.ml.feature import Imputer

abspath = os.path.abspath(".")
file = "file://" + abspath + "/Users/<USER>/data/titanic.csv"
print(file)
df = pd.read_csv(file, index_col="PassengerId")
imputer = Imputer(
    inputCols=["Age"],
    outputCol="Age").setStrategy("mean") # Replace missing values in Age column with the mean value
df.fillna(value={"Cabin" : "None"}, inplace=True) # Fill Cabin column with value "None" if missing
df.dropna(inplace=True) # Drop the rows which still have any missing value
output_path = "file://" + abspath + "/Users/<USER>/data/wrangled"
df.to_csv(output_path, index_col="PassengerId")

Nota

Este exemplo de código Python usa pyspark.pandas. Somente o tempo de execução do Spark versão 3.2 ou posterior suporta isso.

Próximos passos