Transformace dat v cloudu pomocí aktivity Sparku ve službě Azure Data Factory
PLATÍ PRO: Azure Data Factory Azure Synapse Analytics
Tip
Vyzkoušejte si službu Data Factory v Microsoft Fabric, řešení pro analýzy typu all-in-one pro podniky. Microsoft Fabric zahrnuje všechno od přesunu dat až po datové vědy, analýzy v reálném čase, business intelligence a vytváření sestav. Přečtěte si, jak začít používat novou zkušební verzi zdarma.
V tomto kurzu použijete Azure PowerShell k vytvoření kanálu Data Factory, který transformuje data pomocí aktivity Sparku a propojené služby HDInsight na vyžádání. V tomto kurzu provedete následující kroky:
- Vytvoření datové továrny
- Vytvoření a nasazení propojených služeb
- Vytvoření a nasazení kanálu
- Zahajte spuštění kanálu.
- Monitorování spuštění kanálu
Pokud ještě nemáte předplatné Azure, vytvořte si napřed bezplatný účet.
Požadavky
Poznámka:
Při práci s Azure doporučujeme používat modul Azure Az PowerShellu. Začněte tím, že si projdete téma Instalace Azure PowerShellu. Informace o tom, jak migrovat na modul Az PowerShell, najdete v tématu Migrace Azure PowerShellu z AzureRM na Az.
- Účet služby Azure Storage. Vytvoříte skript Pythonu a vstupní soubor a nahrajete je do úložiště Azure. V tomto účtu úložiště se ukládá výstup z programu Sparku. Cluster Spark na vyžádání používá stejný účet úložiště jako primární úložiště.
- Azure PowerShell: Postupujte podle pokynů v tématu Jak nainstalovat a nakonfigurovat Azure PowerShell.
Nahrání skriptu Pythonu do účtu služby Blob Storage
Vytvořte soubor Pythonu s názvem WordCount_Spark.py s následujícím obsahem:
import sys from operator import add from pyspark.sql import SparkSession def main(): spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0]) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount") spark.stop() if __name__ == "__main__": main()
Nahraďte <storageAccountName> názvem vašeho účtu služby Azure Storage. Pak soubor uložte.
Ve službě Azure Blob Storage, vytvořte kontejner nazvaný adftutorial, pokud ještě neexistuje.
Vytvořte složku s názvem spark.
Ve složce spark vytvořte podsložku script.
Do podsložky script uložte soubor WordCount_Spark.py.
Nahrání vstupního souboru
- Vytvořte soubor minecraftstory.txt a nějakým textem. Program Sparku spočítá slova v tomto textu.
- Ve složce
spark
vytvořte podsložkuinputfiles
. - Do podsložky
inputfiles
uložte souborminecraftstory.txt
.
Vytvoření propojených služeb
V této části vytvoříte dvě propojené služby:
- Propojená služba Azure Storage, která propojí váš účet služby Azure Storage s datovou továrnou. Toto úložiště používá cluster HDInsight na vyžádání. Obsahuje také skript Sparku, který se má spustit.
- Propojená služba HDInsight na vyžádání. Azure Data Factory automaticky vytvoří cluster HDInsight, spustí program Sparku a pak odstraní cluster HDInsight, jakmile bude nečinný po předkonfigurovanou dobu.
Propojená služba Azure Storage
Pomocí preferovaného editoru vytvořte soubor JSON, zkopírujte do něj následující definici JSON propojené služby Azure Storage a potom tento soubor uložte jako MyStorageLinkedService.json.
{
"name": "MyStorageLinkedService",
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
}
}
}
Aktualizujte <storageAccountName> a <storageAccountKey> s použitím názvu a klíče vašeho účtu služby Azure Storage.
Propojená služba HDInsight na vyžádání
Pomocí preferovaného editoru vytvořte soubor JSON, zkopírujte do něj následující definici JSON propojené služby Azure HDInsight a potom tento soubor uložte jako MyOnDemandSparkLinkedService.json.
{
"name": "MyOnDemandSparkLinkedService",
"properties": {
"type": "HDInsightOnDemand",
"typeProperties": {
"clusterSize": 2,
"clusterType": "spark",
"timeToLive": "00:15:00",
"hostSubscriptionId": "<subscriptionID> ",
"servicePrincipalId": "<servicePrincipalID>",
"servicePrincipalKey": {
"value": "<servicePrincipalKey>",
"type": "SecureString"
},
"tenant": "<tenant ID>",
"clusterResourceGroup": "<resourceGroupofHDICluster>",
"version": "3.6",
"osType": "Linux",
"clusterNamePrefix":"ADFSparkSample",
"linkedServiceName": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
}
V definici propojené služby aktualizujte hodnoty následujících vlastností:
- hostSubscriptionId. Místo <subscriptionID> použijte ID vašeho předplatného Azure. Cluster HDInsight na vyžádání se vytvoří v tomto předplatném.
- tenant. Místo <tenantID> použijte ID vašeho tenanta Azure.
- servicePrincipalId, servicePrincipalKey. Nahraďte <servicePrincipalID> a <servicePrincipalKey> ID a klíčem vašeho instančního objektu v ID Microsoft Entra. Tento instanční objekt musí být členem role přispěvatele předplatného nebo skupiny prostředků, ve které se cluster vytvoří. Podrobnosti najdete v tématu vytvoření aplikace Microsoft Entra a instančního objektu . ID instančního objektu odpovídá ID aplikace a klíč instančního objektu odpovídá hodnotě tajného klíče klienta.
- clusterResourceGroup. Nahraďte <resourceGroupOfHDICluster> názvem skupiny prostředků, ve které se má cluster HDInsight vytvořit.
Poznámka:
Pro Azure HDInsight platí omezení celkového počtu jader, která můžete v jednotlivých podporovaných oblastech Azure použít. V případě propojené služby HDInsight na vyžádání se cluster HDInsight vytvoří ve stejném umístění jako služba Azure Storage, kterou používá jako primární úložiště. Ujistěte se, že máte dostatečné kvóty pro jádra, aby bylo možné cluster úspěšně vytvořit. Další informace najdete v tématu Nastavení clusterů v HDInsight se systémem Hadoop, Spark, Kafka a dalšími.
Vytvoření kanálu
V tomto kroku vytvoříte nový kanál s aktivitou Sparku. Aktivita používá ukázku word count (počet slov). Pokud jste to ještě neudělali, stáhněte obsah z tohoto umístění.
Pomocí preferovaného editoru vytvořte soubor JSON, zkopírujte do něj následující definici JSON kanálu a potom tento soubor uložte jako MySparkOnDemandPipeline.json.
{
"name": "MySparkOnDemandPipeline",
"properties": {
"activities": [
{
"name": "MySparkActivity",
"type": "HDInsightSpark",
"linkedServiceName": {
"referenceName": "MyOnDemandSparkLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"rootPath": "adftutorial/spark",
"entryFilePath": "script/WordCount_Spark.py",
"getDebugInfo": "Failure",
"sparkJobLinkedService": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
]
}
}
Mějte na paměti následující body:
- rootPath odkazuje na složku spark kontejneru adftutorial.
- entryFilePath odkazuje na soubor WordCount_Spark.py v podsložce script složky spark.
Vytvoření datové továrny
Vytvořili jste definice propojené služby a kanálu v souborech JSON. Teď vytvoříme datovou továrnu a nasadíme propojené soubory JSON služby a kanálu pomocí rutin PowerShellu. Postupně spusťte následující příkazy PowerShellu:
Nastavte proměnné jednu po druhé.
Název skupiny prostředků
$resourceGroupName = "ADFTutorialResourceGroup"
Název datové továrny. Musí být globálně jedinečný.
$dataFactoryName = "MyDataFactory09102017"
Název kanálu
$pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
Spusťte PowerShell. Nechte prostředí Azure PowerShell otevřené až do konce tohoto kurzu Rychlý start. Pokud ho zavřete a znovu otevřete, bude potřeba tyto příkazy spustit znovu. Pokud chcete zobrazit seznam oblastí Azure, ve kterých je služba Data Factory aktuálně dostupná, na následující stránce vyberte oblasti, které vás zajímají, pak rozbalte Analýza a vyhledejte Data Factory:Dostupné produkty v jednotlivých oblastech. Úložiště dat (Azure Storage, Azure SQL Database atd.) a výpočetní prostředí (HDInsight atd.) používané datovou továrnou mohou být v jiných oblastech.
Spusťte následující příkaz a zadejte uživatelské jméno a heslo, které používáte k přihlášení na web Azure Portal:
Connect-AzAccount
Spuštěním následujícího příkazu zobrazíte všechna předplatná pro tento účet:
Get-AzSubscription
Spuštěním následujícího příkazu vyberte předplatné, se kterým chcete pracovat. Místo SubscriptionId použijte ID vašeho předplatného Azure:
Select-AzSubscription -SubscriptionId "<SubscriptionId>"
Vytvořte skupinu prostředků ADFTutorialResourceGroup.
New-AzResourceGroup -Name $resourceGroupName -Location "East Us"
Vytvořte datovou továrnu.
$df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
Spusťte následující příkaz pro zobrazení výstupu:
$df
Přejděte do složky, ve které jste vytvořili soubory JSON, a spuštěním následujícího příkazu nasaďte propojenou službu Azure Storage:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
Spuštěním následujícího příkazu nasaďte propojenou službu Sparku na vyžádání:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
Spuštěním následujícího příkazu nasaďte kanál:
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
Spuštění kanálu a jeho monitorování
Zahajte spuštění kanálu. Zaznamená se také ID spuštění kanálu pro budoucí monitorování.
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName
Spusťte následující skript, který bude nepřetržitě kontrolovat stav spuštění kanálu, dokud neskončí.
while ($True) { $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30) if(!$result) { Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow" } elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) { Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow" } else { Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow" $result break } ($result | Format-List | Out-String) Start-Sleep -Seconds 15 } Write-Host "Activity `Output` section:" -foregroundcolor "Yellow" $result.Output -join "`r`n" Write-Host "Activity `Error` section:" -foregroundcolor "Yellow" $result.Error -join "`r`n"
Zde je výstup tohoto ukázkového spuštění:
Pipeline run status: In Progress ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : DurationInMs : Status : InProgress Error : … Pipeline ' MySparkOnDemandPipeline' run finished. Result: ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : MyDataFactory09102017 ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime} LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : 9/20/2017 6:46:30 AM DurationInMs : 763466 Status : Succeeded Error : {errorCode, message, failureType, target} Activity Output section: "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/" "jobId": "0" "ExecutionProgress": "Succeeded" "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)" Activity Error section: "errorCode": "" "message": "" "failureType": "" "target": "MySparkActivity"
Pomocí výstupu z programu Sparku potvrďte vytvoření složky
outputfiles
ve složcespark
kontejneru adftutorial.
Související obsah
Kanál v této ukázce kopíruje data z jednoho umístění do jiného umístění v úložišti objektů blob Azure. Naučili jste se:
- Vytvoření datové továrny
- Vytvoření a nasazení propojených služeb
- Vytvoření a nasazení kanálu
- Zahajte spuštění kanálu.
- Monitorování spuštění kanálu
Pokud chcete zjistit, jak transformovat data spuštěním skriptu Hivu v clusteru Azure HDInsight ve virtuální síti, přejděte k následujícímu kurzu.