Estruturação de Dados Interativa com Apache Spark no Azure Machine Learning
A estruturação de dados torna-se um dos aspectos mais importantes dos projetos de machine learning. A integração da integração do Azure Machine Learning com o Azure Synapse Analytics fornece acesso a um pool do Apache Spark - apoiado pelo Azure Synapse - para a estruturação de dados interativa que usa notebooks do Azure Machine Learning.
Neste artigo, você aprenderá a lidar com a estruturação de dados usando
- Computação do Spark sem servidor
- Pool do Spark do Synapse anexado
Pré-requisitos
- Uma assinatura do Azure. Caso não tenha uma, crie uma conta gratuita antes de começar.
- Um Workspace do Azure Machine Learning. Visite Criar recursos de workspace para obter mais informações.
- Uma conta de armazenamento do ADLS (Azure Data Lake Storage) Gen 2. Visite Criar uma conta de armazenamento do Azure Data Lake Storage (ADLS) Gen 2 para obter mais informações.
- (Opcional): um Azure Key Vault. Visite Criar um Azure Key Vault para obter mais informações.
- (Opcional): uma entidade de serviço. Visite Criar uma entidade de serviço para obter mais informações.
- (Opcional): Um pool do Spark do Synapse anexado no workspace do Azure Machine Learning.
Antes de iniciar suas tarefas de estruturação de dados, saiba mais sobre o processo de armazenamento de segredos
- Chave de acesso da conta de armazenamento de blobs do Azure
- Um token SAS (Assinatura de Acesso Compartilhado)
- Informações da entidade de serviço do ADLS (Azure Data Lake Storage) Gen 2
no Azure Key Vault. Você também precisa saber como lidar com atribuições de função nas contas de armazenamento do Azure. As seções a seguir neste documento descrevem esses conceitos. Depois, veremos os detalhes da estruturação de dados interativa usando os pools do Spark nos Notebooks do Azure Machine Learning.
Dica
Para saber mais sobre a configuração de atribuição de função da conta de armazenamento do Azure ou se você acessa dados em suas contas de armazenamento usando a passagem de identidade de usuário, confira Adicionar atribuições de função em contas de armazenamento do Azure para saber mais.
Estruturação de dados interativa com o Apache Spark
Para estruturar dados interativos com o Apache Spark nos Notebooks do Azure Machine Learning, o Azure Machine Learning oferece computação spark sem servidor e pool do Spark do Synapse anexado. A computação do Spark sem servidor não requer a criação de recursos no espaço de trabalho do Azure Synapse. Em vez disso, uma computação do Spark sem servidor totalmente gerenciada se torna disponível diretamente nos Notebooks do Azure Machine Learning. O uso de uma computação Spark sem servidor é a maneira mais fácil de acessar um cluster Spark no Azure Machine Learning.
Computação do Spark sem servidor nos Notebooks do Azure Machine Learning
Uma computação do Spark sem servidor está disponível por padrão nos Notebooks de Machine Learning do Azure. Para acessá-la em um notebook, selecioneComputação do Spark Sem Servidor na guia Spark Sem Servidor do Azure Machine Learning do menu de seleção de Computação.
A interface de usuário do Notebooks também fornece opções de configuração de sessão do Spark para a computação do Spark sem servidor. Para configurar uma sessão do Spark:
- Selecione Configurar sessão na parte superior da tela.
- Selecione uma versão do Apache Spark no menu suspenso.
Importante
Runtime do Azure Synapse para o Apache Spark: Comunicados
- Runtime do Azure Synapse para Apache Spark 3.2:
- Data do Comunicado EOLA: 8 de julho de 2023
- Data do Término do Suporte: 8 de julho de 2024. Após essa data, o runtime será desabilitado.
- Apache Spark 3.3:
- Data do Comunicado EOLA: 12 de julho de 2024
- Data do Término do Suporte: 31 de julho de 2025. Após essa data, o runtime será desabilitado.
- Para suporte contínuo e desempenho ideal, aconselhamos a migração para Apache Spark 3.4
- Runtime do Azure Synapse para Apache Spark 3.2:
- Selecione Tipo de instância no menu suspenso. Atualmente, há suporte para estes tipos:
Standard_E4s_v3
Standard_E8s_v3
Standard_E16s_v3
Standard_E32s_v3
Standard_E64s_v3
- Insira um valor de Tempo limite da Sessão do Spark, em minutos.
- Selecione se deseja ou não Alocar executores dinamicamente
- Selecione o número de Executores para a sessão do Spark.
- Selecione o Tamanho do executor no menu suspenso.
- Selecione o Tamanho do driver no menu suspenso.
- Para usar um arquivo Conda para configurar uma sessão do Spark, marque a caixa de seleção Carregar arquivo Conda. Em seguida, selecione Procurar e escolha o arquivo Conda com a configuração da sessão do Spark desejada.
- Adicione as propriedades de Definições de configuração, valores de entrada nas caixas de texto Propriedade e Valor e, em seguida, selecione Adicionar.
- Escolha Aplicar.
- No pop-up Configurar nova sessão?, selecione Interromper sessão.
As alterações de configuração da sessão persistem e se tornam disponíveis para uma outra sessão de notebook que for iniciada usando a computação do Spark sem servidor.
Dica
Se você usar pacotes Conda no nível da sessão, poderá melhorar o tempo de inicialização a frio na sessão do Spark se definir a variável de configuração spark.hadoop.aml.enable_cache
como true. Uma inicialização a frio da sessão com pacotes Conda de nível de sessão normalmente leva de 10 a 15 minutos quando a sessão é iniciada pela primeira vez. No entanto, a inicialização a frio da sessão subsequente com a variável de configuração definida como true normalmente leva de três a cinco minutos.
Importar e reorganizar os dados do ADLS (Azure Data Lake Storage) Gen 2
Você pode acessar e manipular dados armazenados em contas de armazenamento do Azure Data Lake Storage (ADLS) Gen 2 com URIs de dados abfss://
. Para fazer isso, você deve seguir um dos dois mecanismos de acesso a dados:
- Passagem de identidade do usuário
- Acesso aos dados com base na entidade de serviço
Dica
A organização dos dados com uma computação do Spark sem servidor e a passagem da identidade do usuário para acessar os dados em uma conta de armazenamento do Azure Data Lake Storage (ADLS) Gen 2 requerem o menor número de etapas de configuração.
Para iniciar a estruturação interativa de dados com a passagem de identidade do usuário:
Verifique se a identidade do usuário tem as atribuições de função de Colaborador e Colaborador de Dados do Blob de Armazenamento na conta de armazenamento do Azure Data Lake Storage (ADLS) Gen 2.
Para usar a computação do Spark sem servidor, selecione Computação do Spark Sem Servidor na guia Spark Sem Servidor do Azure Machine Learning do menu de seleção Computação.
Para usar um pool do Spark do Synapse anexado, selecione um pool do Spark do Synapse anexado na guia Pools do Spark do Synapse do menu de seleção de Computação.
Este exemplo de código de estruturação de dados Titanic mostra o uso de um URI de dados no formato
abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA>
compyspark.pandas
epyspark.ml.feature.Imputer
.import pyspark.pandas as pd from pyspark.ml.feature import Imputer df = pd.read_csv( "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/titanic.csv", index_col="PassengerId", ) imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( "mean" ) # Replace missing values in Age column with the mean value df.fillna( value={"Cabin": "None"}, inplace=True ) # Fill Cabin column with value "None" if missing df.dropna(inplace=True) # Drop the rows which still have any missing value df.to_csv( "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/wrangled", index_col="PassengerId", )
Observação
Este exemplo de código Python usa
pyspark.pandas
. Somente a versão 3.2 do runtime do Spark ou posterior dá suporte a isso.
Para estruturar os dados por meio de acesso por uma entidade de serviço:
Verifique se a entidade de serviço tem as atribuições de função de Colaborador e Colaborador de Dados do Blob de Armazenamento na conta de armazenamento do Azure Data Lake Storage (ADLS) Gen 2.
Crie segredos do Azure Key Vault para a ID do locatário da entidade de serviço, a ID do cliente e os valores do segredo do cliente.
No menu de seleção Computação, selecione computação do Spark sem servidor em Azure Machine Learning sem servidor. Você também pode selecionar um pool do Spark do Synapse anexado em pools do Synapse Spark no menu de seleção Computação.
Defina a ID do locatário da entidade de serviço, a ID do cliente e os valores de segredo do cliente na configuração e execute o exemplo de código a seguir.
A chamada
get_secret()
no código depende do nome do Azure Key Vault e dos nomes dos segredos do Azure Key Vault criados para a ID do locatário da entidade de serviço, a ID do cliente e o segredo do cliente. Defina estes valores/nome da propriedade correspondente na configuração:- Propriedade da ID do cliente:
fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
- Propriedade do segredo do cliente:
fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
- Propriedade da ID do locatário:
fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
- Valor da ID do locatário:
https://login.microsoftonline.com/<TENANT_ID>/oauth2/token
from pyspark.sql import SparkSession sc = SparkSession.builder.getOrCreate() token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary # Set up service principal tenant ID, client ID and secret from Azure Key Vault client_id = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_ID_SECRET_NAME>") tenant_id = token_library.getSecret("<KEY_VAULT_NAME>", "<TENANT_ID_SECRET_NAME>") client_secret = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_SECRET_NAME>") # Set up service principal which has access of the data sc._jsc.hadoopConfiguration().set( "fs.azure.account.auth.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "OAuth" ) sc._jsc.hadoopConfiguration().set( "fs.azure.account.oauth.provider.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider", ) sc._jsc.hadoopConfiguration().set( "fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", client_id, ) sc._jsc.hadoopConfiguration().set( "fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", client_secret, ) sc._jsc.hadoopConfiguration().set( "fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "https://login.microsoftonline.com/" + tenant_id + "/oauth2/token", )
- Propriedade da ID do cliente:
Usando os dados do Titanic, importe os dados preparados usando o URI de dados no formato
abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA>
, conforme mostrado no exemplo de código.
Importar e estruturar os dados do Armazenamento de Blobs do Azure
Você pode acessar os dados do Armazenamento de Blobs do Azure com a chave de acesso da conta de armazenamento ou um token SAS (assinatura de acesso compartilhado). Você deve armazenar essas credenciais no Azure Key Vault como um segredo e defini-las como propriedades na configuração da sessão.
Para iniciar a estruturação interativa de dados:
No painel esquerdo do Estúdio do Azure Machine Learning, selecione Notebooks.
No menu de seleção Computação, selecione computação do Spark sem servidor em Azure Machine Learning sem servidor. Você também pode selecionar um pool do Spark do Synapse anexado em pools do Synapse Spark no menu de seleção Computação.
Para configurar a chave de acesso da conta de armazenamento ou um token SAS (assinatura de acesso compartilhado) para acesso a dados no Azure Machine Learning Notebooks:
Para a chave de acesso, defina a propriedade
fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net
, conforme mostrado neste trecho de código:from pyspark.sql import SparkSession sc = SparkSession.builder.getOrCreate() token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary access_key = token_library.getSecret("<KEY_VAULT_NAME>", "<ACCESS_KEY_SECRET_NAME>") sc._jsc.hadoopConfiguration().set( "fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net", access_key )
Para o token SAS, defina a propriedade
fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net
, conforme mostrado neste trecho de código:from pyspark.sql import SparkSession sc = SparkSession.builder.getOrCreate() token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary sas_token = token_library.getSecret("<KEY_VAULT_NAME>", "<SAS_TOKEN_SECRET_NAME>") sc._jsc.hadoopConfiguration().set( "fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net", sas_token, )
Observação
As chamadas
get_secret()
nos trechos de código anteriores exigem o nome do Azure Key Vault e os nomes dos segredos criados para a chave de acesso ou o token SAS da conta de Armazenamento de Blobs do Azure.
Execute o código de estruturação de dados no mesmo notebook. Formatar o URI de dados como
wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/<PATH_TO_DATA>
, semelhante ao que este snippet de código exibe:import pyspark.pandas as pd from pyspark.ml.feature import Imputer df = pd.read_csv( "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/titanic.csv", index_col="PassengerId", ) imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( "mean" ) # Replace missing values in Age column with the mean value df.fillna( value={"Cabin": "None"}, inplace=True ) # Fill Cabin column with value "None" if missing df.dropna(inplace=True) # Drop the rows which still have any missing value df.to_csv( "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/wrangled", index_col="PassengerId", )
Observação
Este exemplo de código Python usa
pyspark.pandas
. Somente a versão 3.2 do runtime do Spark ou posterior dá suporte a isso.
Importar e estruturar os dados do armazenamento de dados do Azure Machine Learning
Para acessar dados do Repositório de Dados do Azure Machine Learning, defina um caminho para os dados no repositório de dados com o formato de URI azureml://datastores/<DATASTORE_NAME>/paths/<PATH_TO_DATA>
. Para estruturar os dados de um Armazenamento de dados do Azure Machine Learning em uma sessão do Notebooks interativamente:
Selecione Computação do Spark Sem Servidor na guia Spark Sem Servidor do Azure Machine Learning do menu de seleção Computação, ou selecione um pool do Spark do Synapse anexado na guia Pools do Spark do Synapse do menu de seleção Computação.
Este exemplo de código mostra como ler e estruturar os dados Titanic de um armazenamento de dados do Azure Machine Learning, usando o URI do armazenamento de dados
azureml://
,pyspark.pandas
epyspark.ml.feature.Imputer
.import pyspark.pandas as pd from pyspark.ml.feature import Imputer df = pd.read_csv( "azureml://datastores/workspaceblobstore/paths/data/titanic.csv", index_col="PassengerId", ) imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( "mean" ) # Replace missing values in Age column with the mean value df.fillna( value={"Cabin": "None"}, inplace=True ) # Fill Cabin column with value "None" if missing df.dropna(inplace=True) # Drop the rows which still have any missing value df.to_csv( "azureml://datastores/workspaceblobstore/paths/data/wrangled", index_col="PassengerId", )
Observação
Este exemplo de código Python usa
pyspark.pandas
. Somente a versão 3.2 do runtime do Spark ou posterior dá suporte a isso.
Os armazenamentos de dados do Azure Machine Learning podem acessar os dados usando as credenciais da conta de armazenamento do Azure
- chave de acesso
- Token SAS
- entidade de serviço
ou usam acesso a dados sem credencial. Dependendo do tipo de armazenamento de dados e do tipo de conta de armazenamento subjacente do Azure, selecione um mecanismo de autenticação apropriado para garantir o acesso aos dados. Esta tabela resume os mecanismos de autenticação para acessar dados nos armazenamentos de dados do Azure Machine Learning:
Tipo de conta de armazenamento | Acesso a dados sem credencial | Mecanismo de acesso a dados | Atribuições de função |
---|---|---|---|
Blob do Azure | Não | Chave de acesso ou token SAS | Não é necessária uma atribuição de função |
Blob do Azure | Sim | Passagem de identidade do usuário* | A identidade do usuário deve ter atribuições de função apropriadas na conta de armazenamento de Blobs do Azure |
ADLS (Azure Data Lake Storage) Gen 2 | Não | Entidade de serviço | A entidade de serviço deve ter atribuições de função apropriadas na conta de armazenamento do ADLS (Azure Data Lake Storage) Gen 2 |
ADLS (Azure Data Lake Storage) Gen 2 | Sim | Passagem de identidade do usuário | A identidade do usuário deve ter atribuições de função apropriadas na conta de armazenamento do ADLS (Azure Data Lake Storage) Gen 2 |
* A passagem de identidade do usuário funciona nos armazenamentos de dados sem credenciais que apontam para as contas de armazenamento de blobs do Azure somente se a exclusão temporária não está habilitada.
Acesso aos dados no compartilhamento de arquivo padrão
O compartilhamento de arquivos padrão é montado tanto para a computação do Spark sem servidor quanto para os Pools do Spark do Synapse anexados.
No Estúdio do Azure Machine Learning, os arquivos no compartilhamento de arquivos padrão são mostrados na árvore de diretório na guia Arquivos. O código do notebook pode acessar diretamente os arquivos armazenados neste compartilhamento de arquivos com o protocolo file://
, juntamente com o caminho absoluto do arquivo, sem mais configurações. Este snippet de código mostra como acessar um arquivo armazenado no compartilhamento de arquivo padrão:
import os
import pyspark.pandas as pd
from pyspark.ml.feature import Imputer
abspath = os.path.abspath(".")
file = "file://" + abspath + "/Users/<USER>/data/titanic.csv"
print(file)
df = pd.read_csv(file, index_col="PassengerId")
imputer = Imputer(
inputCols=["Age"],
outputCol="Age").setStrategy("mean") # Replace missing values in Age column with the mean value
df.fillna(value={"Cabin" : "None"}, inplace=True) # Fill Cabin column with value "None" if missing
df.dropna(inplace=True) # Drop the rows which still have any missing value
output_path = "file://" + abspath + "/Users/<USER>/data/wrangled"
df.to_csv(output_path, index_col="PassengerId")
Observação
Este exemplo de código Python usa pyspark.pandas
. Somente a versão 3.2 do runtime do Spark ou posterior dá suporte a isso.