Transformar dados na cloud através da atividade do Spark no Azure Data Factory
APLICA-SE A: Azure Data Factory Azure Synapse Analytics
Gorjeta
Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange tudo, desde a movimentação de dados até ciência de dados, análises em tempo real, business intelligence e relatórios. Saiba como iniciar uma nova avaliação gratuitamente!
Neste tutorial, vai utilizar o Azure PowerShell para criar um pipeline do Data Factory que transforma dados com a Atividade do Spark e um serviço ligado do HDInsight a pedido. Vai executar os seguintes passos neste tutorial:
- Criar uma fábrica de dados.
- Criar e implementar serviços ligados.
- Criar e implementar um pipeline.
- Iniciar uma execução de pipeline.
- Monitorizar a execução do pipeline.
Se não tiver uma subscrição do Azure, crie uma conta gratuita antes de começar.
Pré-requisitos
Nota
Recomendamos que utilize o módulo Azure Az do PowerShell para interagir com o Azure. Para começar, consulte Instalar o Azure PowerShell. Para saber como migrar para o módulo do Az PowerShell, veja Migrar o Azure PowerShell do AzureRM para o Az.
- Conta do Armazenamento do Azure. Você cria um script Python e um arquivo de entrada e os carrega no armazenamento do Azure. A saída do programa Spark é armazenada nesta conta de armazenamento. O cluster do Spark a pedido utiliza a mesma conta de armazenamento como o respetivo armazenamento primário.
- Azure PowerShell. Siga as instruções em How to install and configure Azure PowerShell (Como instalar e configurar o Azure PowerShell).
Carregue o script Python para sua conta de armazenamento de Blob
Crie um ficheiro Python com o nome WordCount_Spark.py com o seguinte conteúdo:
import sys from operator import add from pyspark.sql import SparkSession def main(): spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0]) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount") spark.stop() if __name__ == "__main__": main()
Substitua <storageAccountName> pelo nome da sua conta de Armazenamento do Azure. Em seguida, guarde o ficheiro.
No Armazenamento de Blobs do Azure, crie um contentor com o nome adftutorial, caso ainda não exista.
Crie uma pasta com o nome spark.
Crie uma subpasta com o nome script na pasta spark.
Carregue o ficheiro WordCount_Spark.py para a subpasta script.
Carregue o ficheiro de entrada
- Crie um ficheiro com o nome minecraftstory.txt com algum texto. O programa Spark conta o número de palavras neste texto.
- Crie uma subpasta com o nome
inputfiles
na pastaspark
. - Carregue o ficheiro
minecraftstory.txt
para a subpastainputfiles
.
Criar serviços ligados
Nesta secção, vai criar dois Serviços Ligados:
- Um Serviço Ligado do Armazenamento do Azure que liga uma conta de Armazenamento do Azure à fábrica de dados. Este armazenamento é utilizado pelo cluster do HDInsight a pedido. Também contém o script Spark que vai ser executado.
- Um Serviço Ligado do HDInsight a Pedido. O Azure Data Factory cria automaticamente um cluster do HDInsight, executa o programa Spark e, em seguida, elimina o cluster do HDInsight depois de estar inativo durante um período de tempo pré-configurado.
Serviço ligado do Storage do Azure
Crie um ficheiro JSON com o seu editor preferencial, copie a seguinte definição JSON de um serviço ligado do Armazenamento do Azure e, em seguida, guarde o ficheiro como MyStorageLinkedService.json.
{
"name": "MyStorageLinkedService",
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
}
}
}
Atualize os parâmetros <storageAccountName> e <storageAccountKey> com o nome e a chave da sua conta de Armazenamento do Azure.
Serviço Ligado do HDInsight a pedido
Crie um ficheiro JSON com o seu editor preferencial, copie a seguinte definição JSON de um serviço ligado do Azure HDInsight e guarde o ficheiro como MyOnDemandSparkLinkedService.json.
{
"name": "MyOnDemandSparkLinkedService",
"properties": {
"type": "HDInsightOnDemand",
"typeProperties": {
"clusterSize": 2,
"clusterType": "spark",
"timeToLive": "00:15:00",
"hostSubscriptionId": "<subscriptionID> ",
"servicePrincipalId": "<servicePrincipalID>",
"servicePrincipalKey": {
"value": "<servicePrincipalKey>",
"type": "SecureString"
},
"tenant": "<tenant ID>",
"clusterResourceGroup": "<resourceGroupofHDICluster>",
"version": "3.6",
"osType": "Linux",
"clusterNamePrefix":"ADFSparkSample",
"linkedServiceName": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
}
Atualize os valores para as seguintes propriedades na definição de serviço ligado:
- hostSubscriptionId. Substitua <subscriptionID> pelo ID da sua subscrição do Azure. O cluster do HDInsight a pedido é criado nesta subscrição.
- tenant. Substitua <tenantID> pelo ID do inquilino do Azure.
- servicePrincipalId, servicePrincipalKey. Substitua <servicePrincipalID> e <servicePrincipalKey> pela ID e chave da entidade de serviço na ID do Microsoft Entra. Este principal de serviço tem de ser membro da função de Contribuinte da subscrição ou do Grupo de recursos no qual o cluster é criado. Consulte criar entidade de serviço e aplicativo Microsoft Entra para obter detalhes. A ID da entidade de serviço é equivalente à ID do aplicativo e uma chave da entidade de serviço é equivalente ao valor de um segredo do cliente.
- clusterResourceGroup. Substitua <resourceGroupOfHDICluster> pelo nome do grupo de recursos no qual o cluster do HDInsight tem de ser criado.
Nota
O Azure HDInsight tem limitação do número total de núcleos que pode utilizar em cada região do Azure que suporta. Para o Serviço Ligado do HDInsight a Pedido, o cluster do HDInsight será criado na mesma localização do Armazenamento do Azure utilizado como o respetivo armazenamento primário. Certifique-se de que tem um número suficiente de quotas de núcleo para o cluster ser criado com êxito. Para obter mais informações, veja Configurar clusters no HDInsight com o Hadoop, Spark, Kafka e muito mais.
Criar um pipeline
Neste passo, vai criar um novo pipeline com uma atividade do Spark. A atividade utiliza o exemplo de contagem de palavras. Transfira os conteúdos a partir desta localização, caso ainda não o tenha feito.
Crie um ficheiro JSON no seu editor preferencial, copie a seguinte definição JSON de uma definição de pipeline e guarde-o como MySparkOnDemandPipeline.json.
{
"name": "MySparkOnDemandPipeline",
"properties": {
"activities": [
{
"name": "MySparkActivity",
"type": "HDInsightSpark",
"linkedServiceName": {
"referenceName": "MyOnDemandSparkLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"rootPath": "adftutorial/spark",
"entryFilePath": "script/WordCount_Spark.py",
"getDebugInfo": "Failure",
"sparkJobLinkedService": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
]
}
}
Tenha em conta os seguintes pontos:
- rootPath aponta para a pasta spark do contentor adftutorial.
- entryFilePath aponta para o ficheiro WordCount_Spark.py na subpasta script da pasta spark.
Criar uma fábrica de dados
Criou definições de serviço ligado e de pipeline em ficheiros JSON. Agora, vamos criar uma fábrica de dados e implantar os arquivos JSON de serviço e pipeline vinculados usando cmdlets do PowerShell. Execute os seguintes comandos do PowerShell um a um:
Defina as variáveis uma a uma.
Nome do Grupo de Recursos
$resourceGroupName = "ADFTutorialResourceGroup"
Nome do Data Factory. Tem de ser globalmente exclusivo
$dataFactoryName = "MyDataFactory09102017"
Nome do Pipeline
$pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
Inicie o Azure PowerShell. Mantenha o Azure PowerShell aberto até ao fim deste início rápido. Se o fechar e reabrir, terá de executar os comandos novamente. Para obter uma lista de regiões do Azure em que o Data Factory está atualmente disponível, selecione as regiões que lhe interessam na página seguinte e, em seguida, expanda Analytics para localizar Data Factory: Produtos disponíveis por região. Os arquivos de dados (Armazenamento do Azure, Base de Dados SQL do Azure, etc.) e as computações (HDInsight, etc.) utilizados pela fábrica de dados podem estar noutras regiões.
Execute o comando seguinte e introduza o nome de utilizador e a palavra-passe que utiliza para iniciar sessão no Portal do Azure:
Connect-AzAccount
Execute o comando seguinte para ver todas as subscrições desta conta:
Get-AzSubscription
Execute o comando seguinte para selecionar a subscrição com a qual pretende trabalhar. Substitua SubscriptionId pelo ID da sua subscrição do Azure:
Select-AzSubscription -SubscriptionId "<SubscriptionId>"
Crie o grupo de recursos: ADFTutorialResourceGroup.
New-AzResourceGroup -Name $resourceGroupName -Location "East Us"
Crie a fábrica de dados.
$df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
Execute o seguinte comando para ver a saída:
$df
Mude para a pasta onde criou os ficheiros JSON e execute o seguinte comando para implementar o serviço ligado do Armazenamento do Azure:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
Execute o seguinte comando implementar um serviço ligado do Spark a pedido:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
Execute o seguinte comando para implementar um pipeline:
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
Iniciar e monitorizar uma execução de pipeline
Iniciar uma execução de pipeline. Também captura o ID de execução do pipeline para monitorização futura.
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName
Execute o script seguinte para verificar continuamente o estado de execução do pipeline até terminar.
while ($True) { $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30) if(!$result) { Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow" } elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) { Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow" } else { Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow" $result break } ($result | Format-List | Out-String) Start-Sleep -Seconds 15 } Write-Host "Activity `Output` section:" -foregroundcolor "Yellow" $result.Output -join "`r`n" Write-Host "Activity `Error` section:" -foregroundcolor "Yellow" $result.Error -join "`r`n"
Eis o resultado da execução de exemplo:
Pipeline run status: In Progress ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : DurationInMs : Status : InProgress Error : … Pipeline ' MySparkOnDemandPipeline' run finished. Result: ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : MyDataFactory09102017 ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime} LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : 9/20/2017 6:46:30 AM DurationInMs : 763466 Status : Succeeded Error : {errorCode, message, failureType, target} Activity Output section: "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/" "jobId": "0" "ExecutionProgress": "Succeeded" "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)" Activity Error section: "errorCode": "" "message": "" "failureType": "" "target": "MySparkActivity"
Confirme que uma pasta com o nome
outputfiles
é criada na pastaspark
do contentor adftutorial com a saída do programa spark.
Conteúdos relacionados
O pipeline neste exemplo copia dados de uma localização para outra localização num armazenamento de blobs do Azure. Aprendeu a:
- Criar uma fábrica de dados.
- Criar e implementar serviços ligados.
- Criar e implementar um pipeline.
- Iniciar uma execução de pipeline.
- Monitorizar a execução do pipeline.
Avance para o tutorial seguinte para saber como transformar dados executando o script Hive num cluster do Azure HDInsight que se encontra numa rede virtual.