Transformar os dados na nuvem usando a atividade Spark no Azure Data Factory
APLICA-SE A: Azure Data Factory Azure Synapse Analytics
Dica
Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange desde movimentação de dados até ciência de dados, análise em tempo real, business intelligence e relatórios. Saiba como iniciar uma avaliação gratuita!
Neste tutorial, você pode usar o Azure PowerShell para criar um pipeline do Data Factory que transforma dados usando a Atividade Spark e um serviço vinculado HDInsight sob demanda. Neste tutorial, você realizará os seguintes procedimentos:
- Criar um data factory.
- Criar e implantar serviços vinculados.
- Criar e implantar um pipeline.
- Iniciar uma execução de pipeline.
- Monitorar a execução de pipeline.
Se você não tiver uma assinatura do Azure, crie uma conta gratuita antes de começar.
Pré-requisitos
Observação
Recomendamos que você use o módulo Az PowerShell do Azure para interagir com o Azure. Para começar, consulte Instalar o Azure PowerShell. Para saber como migrar para o módulo Az PowerShell, confira Migrar o Azure PowerShell do AzureRM para o Az.
- Conta de Armazenamento do Azure. Você cria um script Python e um arquivo de entrada e carrega-os no Armazenamento do Azure. A saída do programa Spark é armazenada nessa conta de armazenamento. O cluster do Spark sob demanda usa a mesma conta de armazenamento que o respectivo armazenamento primário.
- Azure PowerShell. Siga as instruções em Como instalar e configurar o Azure PowerShell.
Carregar o script Python na sua conta do Armazenamento de Blobs
Crie um arquivo de Python chamado WordCount_Spark.py com o seguinte conteúdo:
import sys from operator import add from pyspark.sql import SparkSession def main(): spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0]) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount") spark.stop() if __name__ == "__main__": main()
Substitua <storageAccountName> pelo nome da sua conta de Armazenamento do Azure. Em seguida, salve o arquivo.
No seu Armazenamento de Blobs do Azure, crie um contêiner denominado adftutorial se ele não existir.
Crie uma pasta chamada spark.
Criar uma subpasta chamada script na pasta spark.
Carregue o arquivo WordCount_Spark.py na subpasta script.
Carregue o arquivo de entrada
- Crie um arquivo chamado minecraftstory.txt com um pouco de texto. O programa Spark conta o número de palavras no texto.
- Criar uma subpasta chamada
inputfiles
na pastaspark
. - Carregue o
minecraftstory.txt
na subpastainputfiles
.
Criar serviços vinculados
Você cria dois serviços vinculados nesta seção:
- Um serviço vinculado do Armazenamento do Azure que vincula uma conta de Armazenamento do Azure ao data factory. Esse armazenamento é usado pelo cluster HDInsight sob demanda. Ele também contém o script Spark a ser executado.
- Um serviço vinculado do HDInsight sob demanda. O Azure Data Factory cria automaticamente um cluster HDInsight, executa o programa Spark e então exclui o cluster HDInsight depois de ele ficar ocioso por um tempo pré-configurado.
Serviço vinculado de armazenamento do Azure
Crie um arquivo JSON usando seu editor preferido, copie a seguinte definição de JSON de um serviço vinculado do Armazenamento do Azure e, em seguida, salve o arquivo como MyStorageLinkedService.json.
{
"name": "MyStorageLinkedService",
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
}
}
}
Atualize o <storageAccountName> e o <storageAccountKey> com o nome e a chave de sua conta de Armazenamento do Azure.
Serviço vinculado do HDInsight sob demanda
Crie um arquivo JSON usando seu editor preferido, copie a seguinte definição de JSON de um serviço vinculado do Azure HDInsight e, em seguida, salve o arquivo como MyOnDemandSparkLinkedService.json.
{
"name": "MyOnDemandSparkLinkedService",
"properties": {
"type": "HDInsightOnDemand",
"typeProperties": {
"clusterSize": 2,
"clusterType": "spark",
"timeToLive": "00:15:00",
"hostSubscriptionId": "<subscriptionID> ",
"servicePrincipalId": "<servicePrincipalID>",
"servicePrincipalKey": {
"value": "<servicePrincipalKey>",
"type": "SecureString"
},
"tenant": "<tenant ID>",
"clusterResourceGroup": "<resourceGroupofHDICluster>",
"version": "3.6",
"osType": "Linux",
"clusterNamePrefix":"ADFSparkSample",
"linkedServiceName": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
}
Atualize os valores para as propriedades a seguir na definição de serviço vinculado:
- hostSubscriptionId. Substitua <subscriptionID> pela ID da assinatura do Azure. O cluster HDInsight sob demanda é criado nessa assinatura.
- locatário. Substitua <tenantID> pela ID do locatário do Azure.
- servicePrincipalId, servicePrincipalKey. Substitua <servicePrincipalID> e <servicePrincipalKey> pela ID e a chave da entidade de serviço no Microsoft Entra ID. Essa entidade de serviço precisa ser um membro da função de Colaborador de assinatura ou o grupo de recursos em que o cluster é criado. Consulte Criar a entidade de serviço e o aplicativo do Microsoft Entra para obter detalhes. A ID da entidade de serviço é equivalente à ID do aplicativo e uma Chave de entidade de serviço é equivalente ao valor de um Segredo do cliente.
- clusterResourceGroup. Substitua <resourceGroupOfHDICluster> pelo nome do grupo de recursos no qual o cluster HDInsight precisa ser criado.
Observação
O Azure HDInsight tem uma limitação para o número total de núcleos que você pode usar em cada região do Azure a que ele dá suporte. Para o serviço vinculado do HDInsight sob demanda, o cluster HDInsight será criado na mesma localização do Armazenamento do Azure usado como o armazenamento primário desse serviço vinculado. Verifique se você tem cotas de núcleo suficientes para que o cluster seja criado com êxito. Para obter mais informações, consulte Configurar clusters no HDInsight com Hadoop, Spark, Kafka e mais.
Criar um pipeline
Nesta etapa, você cria um pipeline com uma atividade Spark. A atividade usa a amostra de contagem de palavras. Baixe o conteúdo dessa localização, se você ainda não tiver feito isso.
Crie um arquivo JSON em seu editor preferido, copie a definição de JSON a seguir de uma definição de pipeline e salve-a como MySparkOnDemandPipeline.json.
{
"name": "MySparkOnDemandPipeline",
"properties": {
"activities": [
{
"name": "MySparkActivity",
"type": "HDInsightSpark",
"linkedServiceName": {
"referenceName": "MyOnDemandSparkLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"rootPath": "adftutorial/spark",
"entryFilePath": "script/WordCount_Spark.py",
"getDebugInfo": "Failure",
"sparkJobLinkedService": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
]
}
}
Observe os seguintes pontos:
- rootPath aponta para a pasta spark do contêiner adftutorial.
- entryFilePath aponta para o arquivo WordCount_Spark.py na subpasta script da pasta spark.
Criar uma data factory
Você criou definições de serviço vinculado e de pipeline em arquivos JSON. Agora, criaremos um data factory e implantaremos o serviço vinculado e os arquivos JSON de pipeline usando cmdlets do PowerShell. Execute os seguintes comandos do PowerShell, um de cada vez:
Defina as variáveis uma a uma.
Nome do Grupo de Recursos
$resourceGroupName = "ADFTutorialResourceGroup"
Nome do Data Factory. Ser globalmente exclusivo
$dataFactoryName = "MyDataFactory09102017"
Nome do pipeline
$pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
Inicie o PowerShell. Mantenha o Azure PowerShell aberto até o fim deste guia de início rápido. Se você fechar e reabrir, precisará executar os comandos novamente. Para obter uma lista de regiões do Azure no qual o Data Factory está disponível no momento, selecione as regiões que relevantes para você na página a seguir e, em seguida, expanda Análise para localizar Data Factory: Produtos disponíveis por região. Os armazenamentos de dados (Armazenamento do Azure, Banco de Dados SQL do Azure, etc.) e serviços de computação (HDInsight, etc.) usados pelo data factory podem estar em outras regiões.
Execute o comando a seguir e insira o nome de usuário e senha usados para entrar no portal do Azure:
Connect-AzAccount
Execute o comando abaixo para exibir todas as assinaturas dessa conta:
Get-AzSubscription
Execute o comando a seguir para selecionar a assinatura com a qual deseja trabalhar. Substitua SubscriptionId pela ID da assinatura do Azure:
Select-AzSubscription -SubscriptionId "<SubscriptionId>"
Crie o grupo de recursos: ADFTutorialResourceGroup.
New-AzResourceGroup -Name $resourceGroupName -Location "East Us"
Crie o data factory.
$df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
Execute o comando a seguir para ver a saída:
$df
Mude para a pasta em que você criou arquivos JSON e execute o seguinte comando para implantar um serviços vinculado do Armazenamento do Azure:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
Execute o comando a seguir para implantar um serviço vinculado do Spark sob demanda:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
Execute o seguinte comando para implantar um pipeline:
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
Iniciar e monitorar uma execução de pipeline
Iniciar uma execução de pipeline. Ele também captura a ID da execução de pipeline para monitoramento futuro.
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName
Execute o script a seguir para verificar continuamente o status do pipeline de execução até que ele termine.
while ($True) { $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30) if(!$result) { Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow" } elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) { Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow" } else { Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow" $result break } ($result | Format-List | Out-String) Start-Sleep -Seconds 15 } Write-Host "Activity `Output` section:" -foregroundcolor "Yellow" $result.Output -join "`r`n" Write-Host "Activity `Error` section:" -foregroundcolor "Yellow" $result.Error -join "`r`n"
Aqui está a saída da execução de exemplo:
Pipeline run status: In Progress ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : DurationInMs : Status : InProgress Error : … Pipeline ' MySparkOnDemandPipeline' run finished. Result: ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : MyDataFactory09102017 ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime} LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : 9/20/2017 6:46:30 AM DurationInMs : 763466 Status : Succeeded Error : {errorCode, message, failureType, target} Activity Output section: "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/" "jobId": "0" "ExecutionProgress": "Succeeded" "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)" Activity Error section: "errorCode": "" "message": "" "failureType": "" "target": "MySparkActivity"
Confirme que uma pasta denominada
outputfiles
é criada na pastaspark
do contêiner adftutorial com a saída do programa Spark.
Conteúdo relacionado
O pipeline nessa amostra copia dados de uma localização para outra em um Armazenamento de Blobs do Azure. Você aprendeu a:
- Criar um data factory.
- Criar e implantar serviços vinculados.
- Criar e implantar um pipeline.
- Iniciar uma execução de pipeline.
- Monitorar a execução de pipeline.
Avance para o próximo tutorial para aprender como transformar dados executando o script Hive em um cluster do Azure HDInsight que está em uma rede virtual.