Преобразование данных в облаке с помощью действия Spark в фабрике данных Azure
ОБЛАСТЬ ПРИМЕНЕНИЯ: Фабрика данных Azure Azure Synapse Analytics
Совет
Попробуйте использовать фабрику данных в Microsoft Fabric, решение для аналитики с одним интерфейсом для предприятий. Microsoft Fabric охватывает все, от перемещения данных до обработки и анализа данных в режиме реального времени, бизнес-аналитики и отчетности. Узнайте, как бесплатно запустить новую пробную версию !
В этом руководстве вы используете Azure PowerShell для создания конвейера фабрики данных, который преобразовывает данные с помощью действия Spark и служба, связанная по запросу HDInsight. В этом руководстве вы выполните следующие шаги:
- Создали фабрику данных.
- Создали и развернули эти связанные службы.
- Создание и развертывание конвейера.
- Запуск конвейера.
- Осуществили мониторинг выполнения конвейера.
Если у вас нет подписки Azure, создайте бесплатную учетную запись, прежде чем приступить к работе.
Предварительные требования
Примечание.
Мы рекомендуем использовать модуль Azure Az PowerShell для взаимодействия с Azure. Сведения о начале работы см. в статье "Установка Azure PowerShell". Дополнительные сведения см. в статье Перенос Azure PowerShell с AzureRM на Az.
- Учетная запись хранения Azure. Нужно создать скрипт Python и входной файл и отправить их в хранилище Azure. Выходные данные программы Spark хранятся в этой учетной записи хранения. Кластер Spark по запросу использует ту же учетную запись хранения, что и его основное хранилище.
- Azure PowerShell. Следуйте инструкциям по установке и настройке Azure PowerShell.
Отправка скрипта Python в учетную запись хранилища BLOB-объектов
Создайте файл Python с именем WordCount_Spark.py со следующим содержимым:
import sys from operator import add from pyspark.sql import SparkSession def main(): spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0]) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount") spark.stop() if __name__ == "__main__": main()
Замените свойство storageAccountName<> именем своей учетной записи хранения Azure. Затем сохраните файл.
В хранилище BLOB-объектов Azure создайте контейнер с именем adftutorial, если он не существует.
Создайте папку с именем spark.
Создайте вложенную папку с именем script в папке spark.
Отправьте файл WordCount_Spark.py во вложенную папку script.
Отправка входного файла
- Создайте файл с определенным текстом и назовите его minecraftstory.txt. Программа Spark подсчитывает количество слов в этом тексте.
- Создайте вложенную папку с именем
inputfiles
в папкеspark
. - Отправьте файл
minecraftstory.txt
во вложенную папкуinputfiles
.
Создание связанных служб
Создайте две связанные службы в этом разделе:
- Связанную службу хранилища Azure, которая связывает учетную запись хранения Azure с фабрикой данных. Это хранилище используется кластером HDInsight по запросу. В нем также содержится скрипт Spark для выполнения.
- Связанную службу HDInsight по запросу. Фабрика данных Azure автоматически создает кластер HDInsight, запускает программу Spark, а затем удаляет кластер HDInsight после простоя в течение предварительно настроенного времени.
Связанная служба хранения Azure
Создайте файл JSON, используя предпочитаемый редактор, скопируйте следующее определение JSON связанной службы хранилища Azure и сохраните файл как MyStorageLinkedService.json.
{
"name": "MyStorageLinkedService",
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
}
}
}
Обновите параметры <storageAccountName> и <storageAccountKey>, использовав имя и ключ своей учетной записи хранения Azure.
Связанная служба HDInsight по запросу
Создайте файл JSON, используя предпочитаемый редактор, скопируйте следующее определение JSON связанной службы HDInsight Azure и сохраните файл как MyOnDemandSparkLinkedService.json.
{
"name": "MyOnDemandSparkLinkedService",
"properties": {
"type": "HDInsightOnDemand",
"typeProperties": {
"clusterSize": 2,
"clusterType": "spark",
"timeToLive": "00:15:00",
"hostSubscriptionId": "<subscriptionID> ",
"servicePrincipalId": "<servicePrincipalID>",
"servicePrincipalKey": {
"value": "<servicePrincipalKey>",
"type": "SecureString"
},
"tenant": "<tenant ID>",
"clusterResourceGroup": "<resourceGroupofHDICluster>",
"version": "3.6",
"osType": "Linux",
"clusterNamePrefix":"ADFSparkSample",
"linkedServiceName": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
}
Обновите значения следующих свойств в определении связанных служб:
- hostSubscriptionId. Замените значение <> на идентификатор подписки Azure. В этой подписке создается кластер HDInsight по требованию.
- tenant. Замените <tenantID> на идентификатор своего клиента Azure.
- servicePrincipalId, servicePrincipalKey. Замените <servicePrincipalID> и servicePrincipalKey> идентификатором и <ключом субъекта-службы в идентификаторе Microsoft Entra. Этому субъекту-службе должна быть назначена роли участника подписки или группы ресурсов, в которой создается кластер. Дополнительные сведения см . в разделе "Создание приложения Microsoft Entra" и субъекта-службы . Идентификатор субъекта-службы —это эквивалент идентификатора приложения, а ключ субъекта-службы — значения секрета клиента.
- clusterResourceGroup. Замените <resourceGroupOfHDICluster> на имя группы ресурсов, в которой необходимо создать кластер HDInsight.
Примечание.
Azure HDInsight имеет ограничение на общее количество ядер, которые можно использовать в каждом поддерживаемом регионе Azure. Для связанной службы HDInsight по требованию будет создан кластер HDInsight в расположении хранилища Azure, используемом в качестве его основного хранилища. Убедитесь, что имеется достаточное количество квот ядра для успешного создания кластера. Дополнительные сведения см. в статье Установка кластеров в HDInsight с использованием Hadoop, Spark, Kafka и других технологий.
Создание конвейера
На этом этапе создайте конвейер с действием Spark. В действии используется пример статистики. Загрузите содержимое из этого расположения, если вы еще не сделали этого.
Создайте файл JSON в предпочитаемом редакторе, скопируйте следующее определение JSON определения конвейера и сохраните его как MySparkOnDemandPipeline.json.
{
"name": "MySparkOnDemandPipeline",
"properties": {
"activities": [
{
"name": "MySparkActivity",
"type": "HDInsightSpark",
"linkedServiceName": {
"referenceName": "MyOnDemandSparkLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"rootPath": "adftutorial/spark",
"entryFilePath": "script/WordCount_Spark.py",
"getDebugInfo": "Failure",
"sparkJobLinkedService": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
]
}
}
Обратите внимание на следующие аспекты:
- rootPath указывает на папку spark контейнера adftutorial.
- entryFilePath указывает на файл WordCount_Spark.py во вложенной папке скрипта папки spark.
Создание фабрики данных
Вы создали связанную службу и определения конвейера в файлах JSON. Теперь нужно создать фабрику данных и развернуть связанную службу и файлы JSON конвейера с помощью командлетов PowerShell. Последовательно выполните следующие команды PowerShell:
По очереди задайте переменные.
Имя группы ресурсов
$resourceGroupName = "ADFTutorialResourceGroup"
Имя Фабрики данных. (оно должно быть глобально уникальным)
$dataFactoryName = "MyDataFactory09102017"
Имя конвейера
$pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
Запустите PowerShell. Не закрывайте Azure PowerShell, пока выполняются описанные в этом кратком руководстве инструкции. Если закрыть и снова открыть это окно, то придется вновь выполнять эти команды. Чтобы получить список регионов Azure, в которых в настоящее время доступна Фабрика данных, выберите интересующие вас регионы на следующей странице, а затем разверните раздел Аналитика, чтобы найти пункт Фабрика данных: Доступность продуктов по регионам. Хранилища данных (служба хранилища Azure, база данных SQL Azure и т. д.) и вычисления (HDInsight и т. д.), используемые фабрикой данных, могут располагаться в других регионах.
Выполните следующую команду и введите имя пользователя и пароль, которые используются для входа на портал Azure.
Connect-AzAccount
Чтобы просмотреть все подписки для этой учетной записи, выполните следующую команду:
Get-AzSubscription
Выполните следующую команду, чтобы выбрать подписку, с которой вы собираетесь работать. Замените значение SubscriptionId на идентификатор подписки Azure:
Select-AzSubscription -SubscriptionId "<SubscriptionId>"
Создайте группу ресурсов ADFTutorialResourceGroup.
New-AzResourceGroup -Name $resourceGroupName -Location "East Us"
Создайте фабрику данных.
$df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
Чтобы просмотреть выходные данные, выполните следующую команду:
$df
Перейдите в папку, где были созданы файлы JSON, и выполните следующую команду, чтобы развернуть связанную службу хранилища Azure.
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
Выполните следующую команду, чтобы развернуть связанную службу Spark по запросу.
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
Выполните следующую команду, чтобы развернуть конвейер.
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
Запуск и мониторинг выполнения конвейера
Запуск конвейера. Эта команда также запишет идентификатор выполнения конвейера для будущего мониторинга.
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName
Запустите следующий скрипт, чтобы проверять состояние выполнения, пока оно не завершится.
while ($True) { $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30) if(!$result) { Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow" } elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) { Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow" } else { Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow" $result break } ($result | Format-List | Out-String) Start-Sleep -Seconds 15 } Write-Host "Activity `Output` section:" -foregroundcolor "Yellow" $result.Output -join "`r`n" Write-Host "Activity `Error` section:" -foregroundcolor "Yellow" $result.Error -join "`r`n"
Вот результат примера выполнения:
Pipeline run status: In Progress ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : DurationInMs : Status : InProgress Error : … Pipeline ' MySparkOnDemandPipeline' run finished. Result: ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : MyDataFactory09102017 ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime} LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : 9/20/2017 6:46:30 AM DurationInMs : 763466 Status : Succeeded Error : {errorCode, message, failureType, target} Activity Output section: "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/" "jobId": "0" "ExecutionProgress": "Succeeded" "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)" Activity Error section: "errorCode": "" "message": "" "failureType": "" "target": "MySparkActivity"
Убедитесь, что папка с именем
outputfiles
создана в папкеspark
контейнера adftutorial с выходными данными программы Spark.
Связанный контент
В этом примере конвейер копирует данные из одного расположения в другое в хранилище BLOB-объектов Azure. Вы научились выполнять следующие задачи:
- Создали фабрику данных.
- Создали и развернули эти связанные службы.
- Создание и развертывание конвейера.
- Запуск конвейера.
- Осуществили мониторинг выполнения конвейера.
Чтобы узнать, как преобразовать данные, запустив скрипт Hive в кластере Azure HDInsight, который находится в виртуальной сети, ознакомьтесь со следующим руководством.
Transform data in Azure Virtual Network using Hive activity in Azure Data Factory (Преобразование данных в виртуальной сети Azure с помощью действия Hive в фабрике данных Azure).