Trasformare dati nel cloud usando l'attività Spark in Azure Data Factory
SI APPLICA A: Azure Data Factory Azure Synapse Analytics
Suggerimento
Provare Data Factory in Microsoft Fabric, una soluzione di analisi all-in-one per le aziende. Microsoft Fabric copre tutto, dallo spostamento dati al data science, all'analisi in tempo reale, alla business intelligence e alla creazione di report. Vedere le informazioni su come iniziare una nuova prova gratuita!
In questa esercitazione si usa Azure PowerShell per creare una pipeline di Data Factory che trasforma i dati con un'attività Spark e un servizio collegato HDInsight su richiesta. In questa esercitazione vengono completati i passaggi seguenti:
- Creare una data factory.
- Creare e distribuire servizi collegati.
- Creare e distribuire una pipeline.
- Avviare un'esecuzione della pipeline.
- Monitorare l'esecuzione della pipeline.
Se non si ha una sottoscrizione di Azure, creare un account gratuito prima di iniziare.
Prerequisiti
Nota
È consigliabile usare il modulo Azure Az PowerShell per interagire con Azure. Per iniziare, vedere Installare Azure PowerShell. Per informazioni su come eseguire la migrazione al modulo AZ PowerShell, vedere Eseguire la migrazione di Azure PowerShell da AzureRM ad Az.
- Account di archiviazione di Azure. È possibile creare uno script Python e un file di input e caricarli nell'archiviazione di Azure. L'output del programma Spark viene archiviato in questo account di archiviazione. Il cluster Spark su richiesta usa lo stesso account di archiviazione come risorsa di archiviazione primaria.
- Azure PowerShell. Seguire le istruzioni in Come installare e configurare Azure PowerShell.
Caricare lo script Python nell'account di archiviazione BLOB
Creare un file Python denominato WordCount_Spark.py con il contenuto seguente:
import sys from operator import add from pyspark.sql import SparkSession def main(): spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0]) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount") spark.stop() if __name__ == "__main__": main()
Sostituire <storageAccountName> con il nome del proprio account di archiviazione di Azure. Salvare quindi il file.
Nell'Archivio BLOB di Azure creare un contenitore denominato adftutorial, se non esiste.
Creare una cartella denominata spark.
Creare una sottocartella denominata script nella cartella spark.
Caricare il file WordCount_Spark.py nella sottocartella script.
Caricare il file di input
- Creare un file denominato minecraftstory.txt con del testo. Il programma Spark conta il numero di parole in questo testo.
- Creare una sottocartella denominata
inputfiles
nella cartellaspark
. - Caricare il file
minecraftstory.txt
nella sottocartellainputfiles
.
Creare servizi collegati
In questa sezione vengono creati due servizi collegati:
- Un servizio collegato Archiviazione di Azure che collega un account di archiviazione di Azure alla data factory. Questo archivio viene usato dal cluster HDInsight su richiesta. Contiene anche lo script Spark da eseguire.
- Un servizio collegato HDInsight su richiesta. Azure Data Factory crea automaticamente un cluster HDInsight, esegue il programma Spark ed elimina il cluster HDInsight dopo che è rimasto inattivo per un periodo di tempo preconfigurato.
Servizio collegato Archiviazione di Azure
Creare un file JSON usando l'editor preferito, copiare la definizione JSON seguente di un servizio collegato di Archiviazione di Azure e quindi salvare il file con il nome MyStorageLinkedService.json.
{
"name": "MyStorageLinkedService",
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
}
}
}
Sostituire <storageAccountName> e <storageAccountKey> con il nome e la chiave dell'account di archiviazione di Azure.
Servizio collegato HDInsight su richiesta
Creare un file JSON usando l'editor preferito, copiare la definizione JSON seguente di un servizio collegato di Azure HDInsight e quindi salvare il file con il nome MyOnDemandSparkLinkedService.json.
{
"name": "MyOnDemandSparkLinkedService",
"properties": {
"type": "HDInsightOnDemand",
"typeProperties": {
"clusterSize": 2,
"clusterType": "spark",
"timeToLive": "00:15:00",
"hostSubscriptionId": "<subscriptionID> ",
"servicePrincipalId": "<servicePrincipalID>",
"servicePrincipalKey": {
"value": "<servicePrincipalKey>",
"type": "SecureString"
},
"tenant": "<tenant ID>",
"clusterResourceGroup": "<resourceGroupofHDICluster>",
"version": "3.6",
"osType": "Linux",
"clusterNamePrefix":"ADFSparkSample",
"linkedServiceName": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
}
Aggiornare i valori per le proprietà seguenti nella definizione del servizio collegato:
- hostSubscriptionId. Sostituire <subscriptionID> con l'ID della sottoscrizione di Azure. Il cluster HDInsight su richiesta verrà creato in questa sottoscrizione.
- tenant. Sostituire <tenantID> con l'ID del tenant di Azure.
- servicePrincipalId, servicePrincipalKey. Sostituire <servicePrincipalID> e <servicePrincipalKey> con ID e chiave dell'entità servizio nell'ID Microsoft Entra. Questa entità servizio deve essere un membro del ruolo Collaboratore della sottoscrizione o del gruppo di risorse in cui viene creato il cluster. Per informazioni dettagliate, vedere Creare un'applicazione Microsoft Entra e un'entità servizio. L'ID entità servizio equivale all'ID applicazione e una chiave entità servizio equivale al valore di un segreto client.
- clusterResourceGroup. Sostituire <resourceGroupOfHDICluster> con il nome del gruppo di risorse in cui deve essere creato il cluster HDInsight.
Nota
Azure HDInsight applica un limite al numero totale di core che è possibile usare in ogni area di Azure supportata. Per il servizio collegato HDInsight su richiesta, il cluster HDInsight verrà creato nello stesso percorso dell'archivio di Azure usato come risorsa di archiviazione primaria. Assicurarsi che siano disponibili sufficienti quote di core per la creazione del cluster. Per altre informazioni, vedere Configurare i cluster di HDInsight con Hadoop, Spark, Kafka e altro ancora.
Creare una pipeline
In questo passaggio si crea una nuova pipeline con un'attività Spark. L'attività usa l'esempio del conteggio parole. Se non è già stato fatto, scaricare il contenuto da questo percorso.
Creare un file JSON usando l'editor preferito, copiare la definizione JSON seguente di una pipeline e quindi salvare il file con il nome MySparkOnDemandPipeline.json.
{
"name": "MySparkOnDemandPipeline",
"properties": {
"activities": [
{
"name": "MySparkActivity",
"type": "HDInsightSpark",
"linkedServiceName": {
"referenceName": "MyOnDemandSparkLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"rootPath": "adftutorial/spark",
"entryFilePath": "script/WordCount_Spark.py",
"getDebugInfo": "Failure",
"sparkJobLinkedService": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
]
}
}
Notare i punti seguenti:
- rootPath punta alla cartella spark del contenitore adftutorial.
- entryFilePath punta al file WordCount_Spark.py nella sottocartella script della cartella spark.
Creare una data factory
Sono state create definizioni di servizio collegato e pipeline nei file JSON. A questo punto si creerà una data factory e si distribuirà i file JSON del servizio e della pipeline collegati usando i cmdlet di PowerShell. Eseguire questi comandi di PowerShell uno alla volta:
Impostare le variabili una alla volta.
Nome gruppo di risorse
$resourceGroupName = "ADFTutorialResourceGroup"
Nome della data factory. Deve essere univoco a livello globale
$dataFactoryName = "MyDataFactory09102017"
Nome pipeline
$pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
Avviare PowerShell. Tenere aperto Azure PowerShell fino al termine di questa guida introduttiva. Se si chiude e si riapre, sarà necessario eseguire di nuovo questi comandi. Per un elenco di aree di Azure in cui Data Factory è attualmente disponibile, selezionare le aree di interesse nella pagina seguente, quindi espandere Analytics per individuare Data Factory: Prodotti disponibili in base all'area. Gli archivi dati (Archiviazione di Azure, database SQL di Azure e così via) e le risorse di calcolo (HDInsight e così via) usati dalla data factory possono trovarsi in altre aree.
Eseguire questo comando e immettere il nome utente e la password usati per accedere al portale di Azure:
Connect-AzAccount
Eseguire questo comando per visualizzare tutte le sottoscrizioni per l'account:
Get-AzSubscription
Eseguire il comando seguente per selezionare la sottoscrizione da usare. Sostituire SubscriptionId con l'ID della sottoscrizione di Azure:
Select-AzSubscription -SubscriptionId "<SubscriptionId>"
Creare il gruppo di risorse ADFTutorialResourceGroup.
New-AzResourceGroup -Name $resourceGroupName -Location "East Us"
Creare la data factory.
$df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
Eseguire questo comando per visualizzare l'output:
$df
Passare alla cartella in cui sono stati creati i file JSON ed eseguire questo comando per distribuire un servizio collegato di Archiviazione di Azure:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
Eseguire questo comando per distribuire un servizio collegato Spark su richiesta:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
Eseguire questo comando per distribuire una pipeline:
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
Avviare e monitorare un'esecuzione della pipeline
Avviare un'esecuzione della pipeline. Viene anche acquisito l'ID di esecuzione della pipeline per il monitoraggio futuro.
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName
Eseguire questo script per verificare costantemente lo stato di esecuzione della pipeline fino al termine.
while ($True) { $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30) if(!$result) { Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow" } elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) { Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow" } else { Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow" $result break } ($result | Format-List | Out-String) Start-Sleep -Seconds 15 } Write-Host "Activity `Output` section:" -foregroundcolor "Yellow" $result.Output -join "`r`n" Write-Host "Activity `Error` section:" -foregroundcolor "Yellow" $result.Error -join "`r`n"
Ecco l'output dell'esecuzione di esempio:
Pipeline run status: In Progress ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : DurationInMs : Status : InProgress Error : … Pipeline ' MySparkOnDemandPipeline' run finished. Result: ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : MyDataFactory09102017 ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime} LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : 9/20/2017 6:46:30 AM DurationInMs : 763466 Status : Succeeded Error : {errorCode, message, failureType, target} Activity Output section: "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/" "jobId": "0" "ExecutionProgress": "Succeeded" "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)" Activity Error section: "errorCode": "" "message": "" "failureType": "" "target": "MySparkActivity"
Verificare che sia stata creata una cartella denominata
outputfiles
nella cartellaspark
del contenitore adftutorial con l'output del programma Spark.
Contenuto correlato
La pipeline in questo esempio copia i dati da una posizione a un'altra in un archivio BLOB di Azure. Contenuto del modulo:
- Creare una data factory.
- Creare e distribuire servizi collegati.
- Creare e distribuire una pipeline.
- Avviare un'esecuzione della pipeline.
- Monitorare l'esecuzione della pipeline.
Passare alla prossima esercitazione per informazioni su come trasformare i dati tramite l'esecuzione di uno script Hive in un cluster Azure HDInsight che si trova in una rete virtuale.