Przekształcanie danych w chmurze za pomocą działania platformy Spark w usłudze Azure Data Factory
DOTYCZY: Azure Data Factory Azure Synapse Analytics
Napiwek
Wypróbuj usługę Data Factory w usłudze Microsoft Fabric — rozwiązanie analityczne typu all-in-one dla przedsiębiorstw. Usługa Microsoft Fabric obejmuje wszystko, od przenoszenia danych do nauki o danych, analizy w czasie rzeczywistym, analizy biznesowej i raportowania. Dowiedz się, jak bezpłatnie rozpocząć nową wersję próbną !
W tym samouczku użyjesz programu Azure PowerShell do utworzenia potoku fabryki danych, który przekształca dane przy użyciu działania platformy Spark i połączonej usługi HDInsight na żądanie. Ten samouczek obejmuje następujące procedury:
- Tworzenie fabryki danych.
- Tworzenie i wdrażanie połączonych usług
- Redagowanie i wdrażanie potoku.
- Uruchom potok.
- Monitorowanie uruchomienia potoku.
Jeśli nie masz subskrypcji platformy Azure, przed rozpoczęciem utwórz bezpłatne konto.
Wymagania wstępne
Uwaga
Do interakcji z platformą Azure zalecamy używanie modułu Azure Az w programie PowerShell. Zobacz Instalowanie programu Azure PowerShell, aby rozpocząć. Aby dowiedzieć się, jak przeprowadzić migrację do modułu Az PowerShell, zobacz Migracja programu Azure PowerShell z modułu AzureRM do modułu Az.
- Konto usługi Azure Storage. Utworzysz skrypt języka Python i plik wejściowy, a następnie przekażesz go do usługi Azure Storage. Dane wyjściowe programu platformy Spark są przechowywane na tym koncie magazynu. Klaster platformy Spark na żądanie używa tego samego konta magazynu, jako swojego podstawowego magazynu.
- Azure PowerShell. Wykonaj instrukcje podane w temacie Instalowanie i konfigurowanie programu Azure PowerShell.
Przekazywanie skryptu języka Python na konto usługi Blob Storage
Utwórz plik w języku Python o nazwie WordCount_Spark.py i następującej zawartości:
import sys from operator import add from pyspark.sql import SparkSession def main(): spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0]) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount") spark.stop() if __name__ == "__main__": main()
Zastąp wartość <storageAccountName> nazwą swojego konta usługi Azure Storage. Następnie zapisz plik.
W usłudze Azure Blob Storage utwórz kontener o nazwie adftutorial, jeśli nie istnieje.
Utwórz folder o nazwie spark.
Utwórz podfolder o nazwie script w folderze spark.
Przekaż plik WordCount_Spark.py do podfolderu script.
Przekazywanie pliku wejściowego
- Utwórz plik o nazwie minecraftstory.txt zawierający tekst. Program platformy Spark zlicza liczbę słów w tym tekście.
- Utwórz podfolder o nazwie
inputfiles
w folderzespark
. - Przekaż
minecraftstory.txt
do podfolderuinputfiles
.
Redagowanie połączonych usług
W tej sekcji zredagujesz dwie połączone usługi:
- Połączoną usługę Azure Storage, która łączy konto usługi Azure Storage z fabryką danych. Ten magazyn jest używany przez klaster usługi HDInsight na żądanie. Zawiera on także skrypt platformy Spark do wykonania.
- Połączona usługa HDInsight na żądanie. Usługa Azure Data Factory automatycznie tworzy klaster usługi HDInsight, uruchamia program platformy Spark i usuwa klaster usługi HDInsight, gdy jest on bezczynny przez wstępnie skonfigurowany czas.
Połączona usługa Azure Storage
Utwórz plik w formacie JSON za pomocą preferowanego edytora, skopiuj poniższą definicję formatu JSON dotyczącą połączonej usługi Azure Storage, a następnie zapisz plik jako MyStorageLinkedService.json.
{
"name": "MyStorageLinkedService",
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
}
}
}
Zaktualizuj wartości parametrów <storageAccountName> i <storageAccountKey> nazwą konta usługi Azure Storage i jego kluczem.
Połączona usługa HDInsight na żądanie
Utwórz plik w formacie JSON za pomocą preferowanego edytora, skopiuj poniższą definicję formatu JSON dotyczącą połączonej usługi Azure HDIsight, a następnie zapisz plik jako MyOnDemandSparkLinkedService.json.
{
"name": "MyOnDemandSparkLinkedService",
"properties": {
"type": "HDInsightOnDemand",
"typeProperties": {
"clusterSize": 2,
"clusterType": "spark",
"timeToLive": "00:15:00",
"hostSubscriptionId": "<subscriptionID> ",
"servicePrincipalId": "<servicePrincipalID>",
"servicePrincipalKey": {
"value": "<servicePrincipalKey>",
"type": "SecureString"
},
"tenant": "<tenant ID>",
"clusterResourceGroup": "<resourceGroupofHDICluster>",
"version": "3.6",
"osType": "Linux",
"clusterNamePrefix":"ADFSparkSample",
"linkedServiceName": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
}
Zaktualizuj wartości następujących właściwości w definicji połączonej usługi:
- hostSubscriptionId. Zastąp właściwość <SubscriptionId> identyfikatorem subskrypcji platformy Azure. Klaster usługi HDInsight na żądanie jest tworzony w tej subskrypcji.
- tenant. Zastąp właściwość <tenantID> identyfikatorem dzierżawy platformy Azure.
- servicePrincipalId, servicePrincipalKey. Zastąp <wartości servicePrincipalID> i <servicePrincipalKey> identyfikatorem i kluczem jednostki usługi w identyfikatorze Entra firmy Microsoft. Jednostka usługi musi być członkiem roli współautora subskrypcji lub grupy zasobów, gdzie został utworzony klaster. Aby uzyskać szczegółowe informacje, zobacz tworzenie aplikacji Microsoft Entra i jednostki usługi. Identyfikator jednostki usługi jest odpowiednikiem identyfikatora aplikacji, a klucz jednostki usługi jest odpowiednikiem wartości klucza tajnego klienta.
- clusterResourceGroup. Zastąp właściwość <resourceGroupOfHDICluster> nazwą grupy zasobów, w której ma zostać utworzony klaster usługi HDInsight.
Uwaga
Usługa Azure HDInsight ma ograniczenia całkowitej liczby rdzeni, których możesz użyć w każdym obsługiwanym przez nią regionie platformy Azure. Dla połączonej usługi HDInsight na żądanie zostanie utworzony klaster usługi HDInsight w tej samej lokalizacji usługi Azure Storage użytej jako jej podstawowy magazyn. Upewnij się, że masz wystarczająco duże limity przydziału dla klastra, aby można go było pomyślnie utworzyć. Aby uzyskać więcej informacji, zobacz Set up clusters in HDInsight with Hadoop, Spark, Kafka and more (Konfigurowanie klastrów w usłudze HDInsight za pomocą platform Hadoop, Spark, Kafka i innych).
Redagowanie potoku
W tym kroku utworzysz nowy potok za pomocą działania platformy Spark. Działanie wykorzystuje próbkę liczby słów. Jeśli jeszcze tego nie zrobiono, pobierz zawartość z tej lokalizacji.
Utwórz plik w formacie JSON za pomocą preferowanego edytora, skopiuj poniższą definicję formatu JSON dotyczącą definicji potoku, a następnie zapisz go jako MySparkOnDemandPipeline.json.
{
"name": "MySparkOnDemandPipeline",
"properties": {
"activities": [
{
"name": "MySparkActivity",
"type": "HDInsightSpark",
"linkedServiceName": {
"referenceName": "MyOnDemandSparkLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"rootPath": "adftutorial/spark",
"entryFilePath": "script/WordCount_Spark.py",
"getDebugInfo": "Failure",
"sparkJobLinkedService": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
]
}
}
Należy uwzględnić następujące informacje:
- Właściwość rootPath wskazuje folder platformy Spark kontenera adftutorial.
- Właściwość entryFilePath wskazuje plik WordCount_Spark.py w podfolderze skryptu folderu Spark.
Tworzenie fabryki danych
W plikach w formacie JSON zostały zredagowane połączona usługa i definicje potoku. Teraz utwórzmy fabrykę danych i wdróżmy połączone pliki JSON usługi i potoku przy użyciu poleceń cmdlet programu PowerShell. Uruchom następujące polecenia programu PowerShell jedno po drugim:
Ustaw zmienne jedną po drugiej.
Nazwa grupy zasobów
$resourceGroupName = "ADFTutorialResourceGroup"
Nazwa fabryki danych. Musi ona być unikatowa w skali globalnej
$dataFactoryName = "MyDataFactory09102017"
Nazwa potoku
$pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
Uruchom program PowerShell. Nie zamykaj programu Azure PowerShell aż do końca tego samouczka Szybki start. Jeśli go zamkniesz i otworzysz ponownie, musisz uruchomić te polecenia jeszcze raz. Aby uzyskać listę regionów platformy Azure, w których obecnie jest dostępna usługa Data Factory, wybierz dane regiony na poniższej stronie, a następnie rozwiń węzeł Analiza, aby zlokalizować pozycję Data Factory: Produkty dostępne według regionu. Magazyny danych (Azure Storage, Azure SQL Database itp.) i jednostki obliczeniowe (HDInsight itp.) używane przez fabrykę danych mogą mieścić się w innych regionach.
Uruchom poniższe polecenie i wprowadź nazwę użytkownika oraz hasło, których używasz do logowania się w witrynie Azure Portal:
Connect-AzAccount
Uruchom poniższe polecenie, aby wyświetlić wszystkie subskrypcje dla tego konta:
Get-AzSubscription
Uruchom poniższe polecenie, aby wybrać subskrypcję, z którą chcesz pracować. Zastąp parametr SubscriptionId identyfikatorem Twojej subskrypcji platformy Azure:
Select-AzSubscription -SubscriptionId "<SubscriptionId>"
Utwórz grupę zasobów: ADFTutorialResourceGroup.
New-AzResourceGroup -Name $resourceGroupName -Location "East Us"
Utwórz fabrykę danych.
$df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
Wykonaj następujące polecenie, aby wyświetlić dane wyjściowe:
$df
Przejdź do folderu, w którym zostały utworzone pliki w formacie JSON, a następnie uruchom następujące polecenie, aby wdrożyć połączoną usługę Azure Storage:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
Uruchom następujące polecenie, aby wdrożyć połączoną usługę Spark na żądanie:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
Uruchom następujące polecenie, aby wdrożyć potok:
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
Uruchamianie i monitorowanie działania potoku
Uruchom potok. Umożliwia to również przechwycenie identyfikatora uruchomienia potoku w celu monitorowania w przyszłości.
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName
Uruchom następujący skrypt, aby stale sprawdzać stan uruchomienia potoku do momentu zakończenia jego działania.
while ($True) { $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30) if(!$result) { Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow" } elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) { Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow" } else { Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow" $result break } ($result | Format-List | Out-String) Start-Sleep -Seconds 15 } Write-Host "Activity `Output` section:" -foregroundcolor "Yellow" $result.Output -join "`r`n" Write-Host "Activity `Error` section:" -foregroundcolor "Yellow" $result.Error -join "`r`n"
Oto dane wyjściowe przykładowego przebiegu:
Pipeline run status: In Progress ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : DurationInMs : Status : InProgress Error : … Pipeline ' MySparkOnDemandPipeline' run finished. Result: ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : MyDataFactory09102017 ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime} LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : 9/20/2017 6:46:30 AM DurationInMs : 763466 Status : Succeeded Error : {errorCode, message, failureType, target} Activity Output section: "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/" "jobId": "0" "ExecutionProgress": "Succeeded" "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)" Activity Error section: "errorCode": "" "message": "" "failureType": "" "target": "MySparkActivity"
Potwierdź, że folder o nazwie
outputfiles
jest tworzony w folderzespark
kontenera adftutorial zawierającym dane wyjściowe z programu platformy Spark.
Powiązana zawartość
Potok w tym przykładzie kopiuje dane z jednej lokalizacji do innej lokalizacji w usłudze Azure Blob Storage. W tym samouczku omówiono:
- Tworzenie fabryki danych.
- Tworzenie i wdrażanie połączonych usług
- Redagowanie i wdrażanie potoku.
- Uruchom potok.
- Monitorowanie uruchomienia potoku.
Przejdź do następnego samouczka, aby dowiedzieć się, jak przekształcać dane, uruchamiając skrypt programu Hive w klastrze usługi Azure HDInsight, który znajduje się w sieci wirtualnej.