Sdílet prostřednictvím


Transformace dat v cloudu pomocí aktivity Sparku ve službě Azure Data Factory

PLATÍ PRO: Azure Data Factory Azure Synapse Analytics

Tip

Vyzkoušejte si službu Data Factory v Microsoft Fabric, řešení pro analýzy typu all-in-one pro podniky. Microsoft Fabric zahrnuje všechno od přesunu dat až po datové vědy, analýzy v reálném čase, business intelligence a vytváření sestav. Přečtěte si, jak začít používat novou zkušební verzi zdarma.

V tomto kurzu použijete Azure PowerShell k vytvoření kanálu Data Factory, který transformuje data pomocí aktivity Sparku a propojené služby HDInsight na vyžádání. V tomto kurzu provedete následující kroky:

  • Vytvoření datové továrny
  • Vytvoření a nasazení propojených služeb
  • Vytvoření a nasazení kanálu
  • Zahajte spuštění kanálu.
  • Monitorování spuštění kanálu

Pokud ještě nemáte předplatné Azure, vytvořte si napřed bezplatný účet.

Požadavky

Poznámka:

Při práci s Azure doporučujeme používat modul Azure Az PowerShellu. Začněte tím, že si projdete téma Instalace Azure PowerShellu. Informace o tom, jak migrovat na modul Az PowerShell, najdete v tématu Migrace Azure PowerShellu z AzureRM na Az.

  • Účet služby Azure Storage. Vytvoříte skript Pythonu a vstupní soubor a nahrajete je do úložiště Azure. V tomto účtu úložiště se ukládá výstup z programu Sparku. Cluster Spark na vyžádání používá stejný účet úložiště jako primární úložiště.
  • Azure PowerShell: Postupujte podle pokynů v tématu Jak nainstalovat a nakonfigurovat Azure PowerShell.

Nahrání skriptu Pythonu do účtu služby Blob Storage

  1. Vytvořte soubor Pythonu s názvem WordCount_Spark.py s následujícím obsahem:

    import sys
    from operator import add
    
    from pyspark.sql import SparkSession
    
    def main():
        spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .getOrCreate()
    
        lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0])
        counts = lines.flatMap(lambda x: x.split(' ')) \
            .map(lambda x: (x, 1)) \
            .reduceByKey(add)
        counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount")
    
        spark.stop()
    
    if __name__ == "__main__":
        main()
    
  2. Nahraďte <storageAccountName> názvem vašeho účtu služby Azure Storage. Pak soubor uložte.

  3. Ve službě Azure Blob Storage, vytvořte kontejner nazvaný adftutorial, pokud ještě neexistuje.

  4. Vytvořte složku s názvem spark.

  5. Ve složce spark vytvořte podsložku script.

  6. Do podsložky script uložte soubor WordCount_Spark.py.

Nahrání vstupního souboru

  1. Vytvořte soubor minecraftstory.txt a nějakým textem. Program Sparku spočítá slova v tomto textu.
  2. Ve složce spark vytvořte podsložku inputfiles.
  3. Do podsložky inputfiles uložte soubor minecraftstory.txt.

Vytvoření propojených služeb

V této části vytvoříte dvě propojené služby:

  • Propojená služba Azure Storage, která propojí váš účet služby Azure Storage s datovou továrnou. Toto úložiště používá cluster HDInsight na vyžádání. Obsahuje také skript Sparku, který se má spustit.
  • Propojená služba HDInsight na vyžádání. Azure Data Factory automaticky vytvoří cluster HDInsight, spustí program Sparku a pak odstraní cluster HDInsight, jakmile bude nečinný po předkonfigurovanou dobu.

Propojená služba Azure Storage

Pomocí preferovaného editoru vytvořte soubor JSON, zkopírujte do něj následující definici JSON propojené služby Azure Storage a potom tento soubor uložte jako MyStorageLinkedService.json.

{
    "name": "MyStorageLinkedService",
    "properties": {
      "type": "AzureStorage",
      "typeProperties": {
        "connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
      }
    }
}

Aktualizujte <storageAccountName> a <storageAccountKey> s použitím názvu a klíče vašeho účtu služby Azure Storage.

Propojená služba HDInsight na vyžádání

Pomocí preferovaného editoru vytvořte soubor JSON, zkopírujte do něj následující definici JSON propojené služby Azure HDInsight a potom tento soubor uložte jako MyOnDemandSparkLinkedService.json.

{
    "name": "MyOnDemandSparkLinkedService",
    "properties": {
      "type": "HDInsightOnDemand",
      "typeProperties": {
        "clusterSize": 2,
        "clusterType": "spark",
        "timeToLive": "00:15:00",
        "hostSubscriptionId": "<subscriptionID> ",
        "servicePrincipalId": "<servicePrincipalID>",
        "servicePrincipalKey": {
          "value": "<servicePrincipalKey>",
          "type": "SecureString"
        },
        "tenant": "<tenant ID>",
        "clusterResourceGroup": "<resourceGroupofHDICluster>",
        "version": "3.6",
        "osType": "Linux",
        "clusterNamePrefix":"ADFSparkSample",
        "linkedServiceName": {
          "referenceName": "MyStorageLinkedService",
          "type": "LinkedServiceReference"
        }
      }
    }
}

V definici propojené služby aktualizujte hodnoty následujících vlastností:

  • hostSubscriptionId. Místo <subscriptionID> použijte ID vašeho předplatného Azure. Cluster HDInsight na vyžádání se vytvoří v tomto předplatném.
  • tenant. Místo <tenantID> použijte ID vašeho tenanta Azure.
  • servicePrincipalId, servicePrincipalKey. Nahraďte <servicePrincipalID> a <servicePrincipalKey> ID a klíčem vašeho instančního objektu v ID Microsoft Entra. Tento instanční objekt musí být členem role přispěvatele předplatného nebo skupiny prostředků, ve které se cluster vytvoří. Podrobnosti najdete v tématu vytvoření aplikace Microsoft Entra a instančního objektu . ID instančního objektu odpovídá ID aplikace a klíč instančního objektu odpovídá hodnotě tajného klíče klienta.
  • clusterResourceGroup. Nahraďte <resourceGroupOfHDICluster> názvem skupiny prostředků, ve které se má cluster HDInsight vytvořit.

Poznámka:

Pro Azure HDInsight platí omezení celkového počtu jader, která můžete v jednotlivých podporovaných oblastech Azure použít. V případě propojené služby HDInsight na vyžádání se cluster HDInsight vytvoří ve stejném umístění jako služba Azure Storage, kterou používá jako primární úložiště. Ujistěte se, že máte dostatečné kvóty pro jádra, aby bylo možné cluster úspěšně vytvořit. Další informace najdete v tématu Nastavení clusterů v HDInsight se systémem Hadoop, Spark, Kafka a dalšími.

Vytvoření kanálu

V tomto kroku vytvoříte nový kanál s aktivitou Sparku. Aktivita používá ukázku word count (počet slov). Pokud jste to ještě neudělali, stáhněte obsah z tohoto umístění.

Pomocí preferovaného editoru vytvořte soubor JSON, zkopírujte do něj následující definici JSON kanálu a potom tento soubor uložte jako MySparkOnDemandPipeline.json.

{
  "name": "MySparkOnDemandPipeline",
  "properties": {
    "activities": [
      {
        "name": "MySparkActivity",
        "type": "HDInsightSpark",
        "linkedServiceName": {
            "referenceName": "MyOnDemandSparkLinkedService",
            "type": "LinkedServiceReference"
        },
        "typeProperties": {
          "rootPath": "adftutorial/spark",
          "entryFilePath": "script/WordCount_Spark.py",
          "getDebugInfo": "Failure",
          "sparkJobLinkedService": {
            "referenceName": "MyStorageLinkedService",
            "type": "LinkedServiceReference"
          }
        }
      }
    ]
  }
}

Mějte na paměti následující body:

  • rootPath odkazuje na složku spark kontejneru adftutorial.
  • entryFilePath odkazuje na soubor WordCount_Spark.py v podsložce script složky spark.

Vytvoření datové továrny

Vytvořili jste definice propojené služby a kanálu v souborech JSON. Teď vytvoříme datovou továrnu a nasadíme propojené soubory JSON služby a kanálu pomocí rutin PowerShellu. Postupně spusťte následující příkazy PowerShellu:

  1. Nastavte proměnné jednu po druhé.

    Název skupiny prostředků

    $resourceGroupName = "ADFTutorialResourceGroup" 
    

    Název datové továrny. Musí být globálně jedinečný.

    $dataFactoryName = "MyDataFactory09102017"
    

    Název kanálu

    $pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
    
  2. Spusťte PowerShell. Nechte prostředí Azure PowerShell otevřené až do konce tohoto kurzu Rychlý start. Pokud ho zavřete a znovu otevřete, bude potřeba tyto příkazy spustit znovu. Pokud chcete zobrazit seznam oblastí Azure, ve kterých je služba Data Factory aktuálně dostupná, na následující stránce vyberte oblasti, které vás zajímají, pak rozbalte Analýza a vyhledejte Data Factory:Dostupné produkty v jednotlivých oblastech. Úložiště dat (Azure Storage, Azure SQL Database atd.) a výpočetní prostředí (HDInsight atd.) používané datovou továrnou mohou být v jiných oblastech.

    Spusťte následující příkaz a zadejte uživatelské jméno a heslo, které používáte k přihlášení na web Azure Portal:

    Connect-AzAccount
    

    Spuštěním následujícího příkazu zobrazíte všechna předplatná pro tento účet:

    Get-AzSubscription
    

    Spuštěním následujícího příkazu vyberte předplatné, se kterým chcete pracovat. Místo SubscriptionId použijte ID vašeho předplatného Azure:

    Select-AzSubscription -SubscriptionId "<SubscriptionId>"    
    
  3. Vytvořte skupinu prostředků ADFTutorialResourceGroup.

    New-AzResourceGroup -Name $resourceGroupName -Location "East Us" 
    
  4. Vytvořte datovou továrnu.

     $df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
    

    Spusťte následující příkaz pro zobrazení výstupu:

    $df
    
  5. Přejděte do složky, ve které jste vytvořili soubory JSON, a spuštěním následujícího příkazu nasaďte propojenou službu Azure Storage:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
    
  6. Spuštěním následujícího příkazu nasaďte propojenou službu Sparku na vyžádání:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
    
  7. Spuštěním následujícího příkazu nasaďte kanál:

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
    

Spuštění kanálu a jeho monitorování

  1. Zahajte spuštění kanálu. Zaznamená se také ID spuštění kanálu pro budoucí monitorování.

    $runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName  
    
  2. Spusťte následující skript, který bude nepřetržitě kontrolovat stav spuštění kanálu, dokud neskončí.

    while ($True) {
        $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
    
        if(!$result) {
            Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
        }
        elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
            Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
        }
        else {
            Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
            $result
            break
        }
        ($result | Format-List | Out-String)
        Start-Sleep -Seconds 15
    }
    
    Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
    $result.Output -join "`r`n"
    
    Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
    $result.Error -join "`r`n" 
    
  3. Zde je výstup tohoto ukázkového spuštění:

    Pipeline run status: In Progress
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : 
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : 
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 
    DurationInMs      : 
    Status            : InProgress
    Error             :
    …
    
    Pipeline ' MySparkOnDemandPipeline' run finished. Result:
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : MyDataFactory09102017
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime}
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 9/20/2017 6:46:30 AM
    DurationInMs      : 763466
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    Activity Output section:
    "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/"
    "jobId": "0"
    "ExecutionProgress": "Succeeded"
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
    Activity Error section:
    "errorCode": ""
    "message": ""
    "failureType": ""
    "target": "MySparkActivity"
    
  4. Pomocí výstupu z programu Sparku potvrďte vytvoření složky outputfiles ve složce spark kontejneru adftutorial.

Kanál v této ukázce kopíruje data z jednoho umístění do jiného umístění v úložišti objektů blob Azure. Naučili jste se:

  • Vytvoření datové továrny
  • Vytvoření a nasazení propojených služeb
  • Vytvoření a nasazení kanálu
  • Zahajte spuštění kanálu.
  • Monitorování spuštění kanálu

Pokud chcete zjistit, jak transformovat data spuštěním skriptu Hivu v clusteru Azure HDInsight ve virtuální síti, přejděte k následujícímu kurzu.