Partilhar via


Transformar dados na cloud através da atividade do Spark no Azure Data Factory

APLICA-SE A: Azure Data Factory Azure Synapse Analytics

Gorjeta

Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange tudo, desde a movimentação de dados até ciência de dados, análises em tempo real, business intelligence e relatórios. Saiba como iniciar uma nova avaliação gratuitamente!

Neste tutorial, vai utilizar o Azure PowerShell para criar um pipeline do Data Factory que transforma dados com a Atividade do Spark e um serviço ligado do HDInsight a pedido. Vai executar os seguintes passos neste tutorial:

  • Criar uma fábrica de dados.
  • Criar e implementar serviços ligados.
  • Criar e implementar um pipeline.
  • Iniciar uma execução de pipeline.
  • Monitorizar a execução do pipeline.

Se não tiver uma subscrição do Azure, crie uma conta gratuita antes de começar.

Pré-requisitos

Nota

Recomendamos que utilize o módulo Azure Az do PowerShell para interagir com o Azure. Para começar, consulte Instalar o Azure PowerShell. Para saber como migrar para o módulo do Az PowerShell, veja Migrar o Azure PowerShell do AzureRM para o Az.

Carregue o script Python para sua conta de armazenamento de Blob

  1. Crie um ficheiro Python com o nome WordCount_Spark.py com o seguinte conteúdo:

    import sys
    from operator import add
    
    from pyspark.sql import SparkSession
    
    def main():
        spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .getOrCreate()
    
        lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0])
        counts = lines.flatMap(lambda x: x.split(' ')) \
            .map(lambda x: (x, 1)) \
            .reduceByKey(add)
        counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount")
    
        spark.stop()
    
    if __name__ == "__main__":
        main()
    
  2. Substitua <storageAccountName> pelo nome da sua conta de Armazenamento do Azure. Em seguida, guarde o ficheiro.

  3. No Armazenamento de Blobs do Azure, crie um contentor com o nome adftutorial, caso ainda não exista.

  4. Crie uma pasta com o nome spark.

  5. Crie uma subpasta com o nome script na pasta spark.

  6. Carregue o ficheiro WordCount_Spark.py para a subpasta script.

Carregue o ficheiro de entrada

  1. Crie um ficheiro com o nome minecraftstory.txt com algum texto. O programa Spark conta o número de palavras neste texto.
  2. Crie uma subpasta com o nome inputfiles na pasta spark.
  3. Carregue o ficheiro minecraftstory.txt para a subpasta inputfiles.

Criar serviços ligados

Nesta secção, vai criar dois Serviços Ligados:

  • Um Serviço Ligado do Armazenamento do Azure que liga uma conta de Armazenamento do Azure à fábrica de dados. Este armazenamento é utilizado pelo cluster do HDInsight a pedido. Também contém o script Spark que vai ser executado.
  • Um Serviço Ligado do HDInsight a Pedido. O Azure Data Factory cria automaticamente um cluster do HDInsight, executa o programa Spark e, em seguida, elimina o cluster do HDInsight depois de estar inativo durante um período de tempo pré-configurado.

Serviço ligado do Storage do Azure

Crie um ficheiro JSON com o seu editor preferencial, copie a seguinte definição JSON de um serviço ligado do Armazenamento do Azure e, em seguida, guarde o ficheiro como MyStorageLinkedService.json.

{
    "name": "MyStorageLinkedService",
    "properties": {
      "type": "AzureStorage",
      "typeProperties": {
        "connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
      }
    }
}

Atualize os parâmetros <storageAccountName> e <storageAccountKey> com o nome e a chave da sua conta de Armazenamento do Azure.

Serviço Ligado do HDInsight a pedido

Crie um ficheiro JSON com o seu editor preferencial, copie a seguinte definição JSON de um serviço ligado do Azure HDInsight e guarde o ficheiro como MyOnDemandSparkLinkedService.json.

{
    "name": "MyOnDemandSparkLinkedService",
    "properties": {
      "type": "HDInsightOnDemand",
      "typeProperties": {
        "clusterSize": 2,
        "clusterType": "spark",
        "timeToLive": "00:15:00",
        "hostSubscriptionId": "<subscriptionID> ",
        "servicePrincipalId": "<servicePrincipalID>",
        "servicePrincipalKey": {
          "value": "<servicePrincipalKey>",
          "type": "SecureString"
        },
        "tenant": "<tenant ID>",
        "clusterResourceGroup": "<resourceGroupofHDICluster>",
        "version": "3.6",
        "osType": "Linux",
        "clusterNamePrefix":"ADFSparkSample",
        "linkedServiceName": {
          "referenceName": "MyStorageLinkedService",
          "type": "LinkedServiceReference"
        }
      }
    }
}

Atualize os valores para as seguintes propriedades na definição de serviço ligado:

  • hostSubscriptionId. Substitua <subscriptionID> pelo ID da sua subscrição do Azure. O cluster do HDInsight a pedido é criado nesta subscrição.
  • tenant. Substitua <tenantID> pelo ID do inquilino do Azure.
  • servicePrincipalId, servicePrincipalKey. Substitua <servicePrincipalID> e <servicePrincipalKey> pela ID e chave da entidade de serviço na ID do Microsoft Entra. Este principal de serviço tem de ser membro da função de Contribuinte da subscrição ou do Grupo de recursos no qual o cluster é criado. Consulte criar entidade de serviço e aplicativo Microsoft Entra para obter detalhes. A ID da entidade de serviço é equivalente à ID do aplicativo e uma chave da entidade de serviço é equivalente ao valor de um segredo do cliente.
  • clusterResourceGroup. Substitua <resourceGroupOfHDICluster> pelo nome do grupo de recursos no qual o cluster do HDInsight tem de ser criado.

Nota

O Azure HDInsight tem limitação do número total de núcleos que pode utilizar em cada região do Azure que suporta. Para o Serviço Ligado do HDInsight a Pedido, o cluster do HDInsight será criado na mesma localização do Armazenamento do Azure utilizado como o respetivo armazenamento primário. Certifique-se de que tem um número suficiente de quotas de núcleo para o cluster ser criado com êxito. Para obter mais informações, veja Configurar clusters no HDInsight com o Hadoop, Spark, Kafka e muito mais.

Criar um pipeline

Neste passo, vai criar um novo pipeline com uma atividade do Spark. A atividade utiliza o exemplo de contagem de palavras. Transfira os conteúdos a partir desta localização, caso ainda não o tenha feito.

Crie um ficheiro JSON no seu editor preferencial, copie a seguinte definição JSON de uma definição de pipeline e guarde-o como MySparkOnDemandPipeline.json.

{
  "name": "MySparkOnDemandPipeline",
  "properties": {
    "activities": [
      {
        "name": "MySparkActivity",
        "type": "HDInsightSpark",
        "linkedServiceName": {
            "referenceName": "MyOnDemandSparkLinkedService",
            "type": "LinkedServiceReference"
        },
        "typeProperties": {
          "rootPath": "adftutorial/spark",
          "entryFilePath": "script/WordCount_Spark.py",
          "getDebugInfo": "Failure",
          "sparkJobLinkedService": {
            "referenceName": "MyStorageLinkedService",
            "type": "LinkedServiceReference"
          }
        }
      }
    ]
  }
}

Tenha em conta os seguintes pontos:

  • rootPath aponta para a pasta spark do contentor adftutorial.
  • entryFilePath aponta para o ficheiro WordCount_Spark.py na subpasta script da pasta spark.

Criar uma fábrica de dados

Criou definições de serviço ligado e de pipeline em ficheiros JSON. Agora, vamos criar uma fábrica de dados e implantar os arquivos JSON de serviço e pipeline vinculados usando cmdlets do PowerShell. Execute os seguintes comandos do PowerShell um a um:

  1. Defina as variáveis uma a uma.

    Nome do Grupo de Recursos

    $resourceGroupName = "ADFTutorialResourceGroup" 
    

    Nome do Data Factory. Tem de ser globalmente exclusivo

    $dataFactoryName = "MyDataFactory09102017"
    

    Nome do Pipeline

    $pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
    
  2. Inicie o Azure PowerShell. Mantenha o Azure PowerShell aberto até ao fim deste início rápido. Se o fechar e reabrir, terá de executar os comandos novamente. Para obter uma lista de regiões do Azure em que o Data Factory está atualmente disponível, selecione as regiões que lhe interessam na página seguinte e, em seguida, expanda Analytics para localizar Data Factory: Produtos disponíveis por região. Os arquivos de dados (Armazenamento do Azure, Base de Dados SQL do Azure, etc.) e as computações (HDInsight, etc.) utilizados pela fábrica de dados podem estar noutras regiões.

    Execute o comando seguinte e introduza o nome de utilizador e a palavra-passe que utiliza para iniciar sessão no Portal do Azure:

    Connect-AzAccount
    

    Execute o comando seguinte para ver todas as subscrições desta conta:

    Get-AzSubscription
    

    Execute o comando seguinte para selecionar a subscrição com a qual pretende trabalhar. Substitua SubscriptionId pelo ID da sua subscrição do Azure:

    Select-AzSubscription -SubscriptionId "<SubscriptionId>"    
    
  3. Crie o grupo de recursos: ADFTutorialResourceGroup.

    New-AzResourceGroup -Name $resourceGroupName -Location "East Us" 
    
  4. Crie a fábrica de dados.

     $df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
    

    Execute o seguinte comando para ver a saída:

    $df
    
  5. Mude para a pasta onde criou os ficheiros JSON e execute o seguinte comando para implementar o serviço ligado do Armazenamento do Azure:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
    
  6. Execute o seguinte comando implementar um serviço ligado do Spark a pedido:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
    
  7. Execute o seguinte comando para implementar um pipeline:

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
    

Iniciar e monitorizar uma execução de pipeline

  1. Iniciar uma execução de pipeline. Também captura o ID de execução do pipeline para monitorização futura.

    $runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName  
    
  2. Execute o script seguinte para verificar continuamente o estado de execução do pipeline até terminar.

    while ($True) {
        $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
    
        if(!$result) {
            Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
        }
        elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
            Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
        }
        else {
            Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
            $result
            break
        }
        ($result | Format-List | Out-String)
        Start-Sleep -Seconds 15
    }
    
    Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
    $result.Output -join "`r`n"
    
    Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
    $result.Error -join "`r`n" 
    
  3. Eis o resultado da execução de exemplo:

    Pipeline run status: In Progress
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : 
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : 
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 
    DurationInMs      : 
    Status            : InProgress
    Error             :
    …
    
    Pipeline ' MySparkOnDemandPipeline' run finished. Result:
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : MyDataFactory09102017
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime}
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 9/20/2017 6:46:30 AM
    DurationInMs      : 763466
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    Activity Output section:
    "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/"
    "jobId": "0"
    "ExecutionProgress": "Succeeded"
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
    Activity Error section:
    "errorCode": ""
    "message": ""
    "failureType": ""
    "target": "MySparkActivity"
    
  4. Confirme que uma pasta com o nome outputfiles é criada na pasta spark do contentor adftutorial com a saída do programa spark.

O pipeline neste exemplo copia dados de uma localização para outra localização num armazenamento de blobs do Azure. Aprendeu a:

  • Criar uma fábrica de dados.
  • Criar e implementar serviços ligados.
  • Criar e implementar um pipeline.
  • Iniciar uma execução de pipeline.
  • Monitorizar a execução do pipeline.

Avance para o tutorial seguinte para saber como transformar dados executando o script Hive num cluster do Azure HDInsight que se encontra numa rede virtual.