你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
在 Azure 数据工厂中使用 Spark 活动转换云中的数据
适用于: Azure 数据工厂 Azure Synapse Analytics
提示
试用 Microsoft Fabric 中的数据工厂,这是一种适用于企业的一站式分析解决方案。 Microsoft Fabric 涵盖从数据移动到数据科学、实时分析、商业智能和报告的所有内容。 了解如何免费开始新的试用!
本教程使用 Azure PowerShell 创建一个数据工厂管道,该管道可以使用 Spark 活动和按需 HDInsight 链接服务转换数据。 在本教程中执行以下步骤:
- 创建数据工厂。
- 创作并部署链接服务。
- 创作并部署管道。
- 启动管道运行。
- 监视管道运行。
如果没有 Azure 订阅,请在开始之前创建一个免费帐户。
先决条件
注意
建议使用 Azure Az PowerShell 模块与 Azure 交互。 若要开始,请参阅安装 Azure PowerShell。 若要了解如何迁移到 Az PowerShell 模块,请参阅 将 Azure PowerShell 从 AzureRM 迁移到 Az。
- Azure 存储帐户。 创建 Python 脚本和输入文件,并将其上传到 Azure 存储。 Spark 程序的输出存储在此存储帐户中。 按需 Spark 群集使用相同的存储帐户作为其主存储。
- Azure PowerShell。 遵循如何安装和配置 Azure PowerShell 中的说明。
将 Python 脚本上传到 Blob 存储帐户
创建包含以下内容的名为 WordCount_Spark.py 的 Python 文件:
import sys from operator import add from pyspark.sql import SparkSession def main(): spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0]) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount") spark.stop() if __name__ == "__main__": main()
将 <storageAccountName> 替换为 Azure 存储帐户的名称。 然后保存文件。
在 Azure Blob 存储中,创建名为 adftutorial 的容器(如果尚不存在)。
创建名为 spark 的文件夹。
在 spark 文件夹中创建名为 script 的子文件夹。
将 WordCount_Spark.py 文件上传到 script 子文件夹。
上传输入文件
- 创建包含一些文本的名为 minecraftstory.txt 的文件。 Spark 程序会统计此文本中的单词数量。
- 在
spark
文件夹中创建名为inputfiles
的子文件夹。 - 将
minecraftstory.txt
上传到inputfiles
子文件夹。
创作链接服务
在本部分中创作两个链接服务:
- 一个 Azure 存储链接服务,用于将 Azure 存储帐户链接到数据工厂。 按需 HDInsight 群集使用此存储。 此存储还包含要执行的 Spark 脚本。
- 一个按需 HDInsight 链接服务。 Azure 数据工厂自动创建 HDInsight 群集,运行 Spark 程序,然后在 HDInsight 群集空闲预配置的时间后将其删除。
Azure 存储链接服务
使用偏好的编辑器创建一个 JSON 文件,复制 Azure 存储链接服务的以下 JSON 定义,并将该文件另存为 MyStorageLinkedService.json。
{
"name": "MyStorageLinkedService",
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
}
}
}
使用 Azure 存储帐户的名称和密钥更新 <storageAccountName> 和 <storageAccountKey>。
按需 HDInsight 链接服务
使用偏好的编辑器创建一个 JSON 文件,复制 Azure HDInsight 链接服务的以下 JSON 定义,并将该文件另存为 MyOnDemandSparkLinkedService.json。
{
"name": "MyOnDemandSparkLinkedService",
"properties": {
"type": "HDInsightOnDemand",
"typeProperties": {
"clusterSize": 2,
"clusterType": "spark",
"timeToLive": "00:15:00",
"hostSubscriptionId": "<subscriptionID> ",
"servicePrincipalId": "<servicePrincipalID>",
"servicePrincipalKey": {
"value": "<servicePrincipalKey>",
"type": "SecureString"
},
"tenant": "<tenant ID>",
"clusterResourceGroup": "<resourceGroupofHDICluster>",
"version": "3.6",
"osType": "Linux",
"clusterNamePrefix":"ADFSparkSample",
"linkedServiceName": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
}
更新链接服务定义中以下属性的值:
- hostSubscriptionId。 将 <subscriptionID> 替换为 Azure 订阅的 ID。 按需 HDInsight 群集在此 Azure 订阅中创建。
- tenant。 将 <tenantID> 替换为 Azure 租户的 ID。
- servicePrincipalId、servicePrincipalKey。 将 <servicePrincipalID> 和 <servicePrincipalKey> 分别替换为 Microsoft Entra ID 中服务主体的 ID 和密钥。 此服务主体需是订阅“参与者”角色的成员,或创建群集的资源组的成员。 有关详细信息,请参阅创建 Microsoft Entra 应用程序和服务主体。 服务主体 ID 等效于应用程序 ID,服务主体密钥等效于客户端密码的值 。
- clusterResourceGroup。 将 <resourceGroupOfHDICluster> 替换为需要在其中创建资源组的 HDInsight 群集的名称。
注意
Azure HDInsight 会限制可在其支持的每个 Azure 区域中使用的核心总数。 对于按需 HDInsight 链接服务,将在 Azure 存储用作其主存储的同一位置创建 HDInsight 群集。 请确保有足够的核心配额,以便能够成功创建群集。 有关详细信息,请参阅使用 Hadoop、Spark、Kafka 等在 HDInsight 中设置群集。
创作管道
本步骤创建包含 Spark 活动的新管道。 该活动使用单词计数示例。 从此位置下载内容(如果尚未这样做)。
在偏好的编辑器中创建一个 JSON 文件,复制管道定义的以下 JSON 定义,并将该文件另存为 MySparkOnDemandPipeline.json。
{
"name": "MySparkOnDemandPipeline",
"properties": {
"activities": [
{
"name": "MySparkActivity",
"type": "HDInsightSpark",
"linkedServiceName": {
"referenceName": "MyOnDemandSparkLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"rootPath": "adftutorial/spark",
"entryFilePath": "script/WordCount_Spark.py",
"getDebugInfo": "Failure",
"sparkJobLinkedService": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
]
}
}
请注意以下几点:
- rootPath 指向 adftutorial 容器的 spark 文件夹。
- entryFilePath 指向 spark 文件夹的 script 子文件夹中的 WordCount_Spark.py 文件。
创建数据工厂
已在 JSON 文件中创作链接服务和管道定义。 现在,让我们创建一个数据工厂,并使用 PowerShell cmdlet 部署链接服务和管道 JSON 文件。 逐条运行以下 PowerShell 命令:
逐个设置变量。
资源组名称
$resourceGroupName = "ADFTutorialResourceGroup"
数据工厂名称。 必须全局唯一
$dataFactoryName = "MyDataFactory09102017"
管道名称
$pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
启动 PowerShell。 在完成本快速入门之前,请将 Azure PowerShell 保持打开状态。 如果将它关闭再重新打开,则需要再次运行下述命令。 若要查看目前提供数据工厂的 Azure 区域的列表,请在以下页面上选择感兴趣的区域,然后展开“分析”以找到“数据工厂”:可用产品(按区域)。 数据工厂使用的数据存储(Azure 存储、Azure SQL 数据库,等等)和计算资源(HDInsight 等)可以位于其他区域中。
运行以下命令并输入用于登录 Azure 门户的用户名和密码:
Connect-AzAccount
运行以下命令查看此帐户的所有订阅:
Get-AzSubscription
运行以下命令选择要使用的订阅。 请将 SubscriptionId 替换为自己的 Azure 订阅的 ID:
Select-AzSubscription -SubscriptionId "<SubscriptionId>"
创建资源组:ADFTutorialResourceGroup。
New-AzResourceGroup -Name $resourceGroupName -Location "East Us"
创建数据工厂。
$df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
执行以下命令查看输出:
$df
切换到在其中创建了 JSON 文件的文件夹,并运行以下命令部署 Azure 存储链接服务:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
运行以下命令部署按需 Spark 链接服务:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
运行以下命令部署管道:
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
启动并监视管道运行
启动管道运行。 该命令还会捕获管道运行 ID 用于将来的监视。
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName
运行以下脚本来持续检查管道运行状态,直到运行完成为止。
while ($True) { $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30) if(!$result) { Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow" } elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) { Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow" } else { Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow" $result break } ($result | Format-List | Out-String) Start-Sleep -Seconds 15 } Write-Host "Activity `Output` section:" -foregroundcolor "Yellow" $result.Output -join "`r`n" Write-Host "Activity `Error` section:" -foregroundcolor "Yellow" $result.Error -join "`r`n"
下面是示例运行的输出:
Pipeline run status: In Progress ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : DurationInMs : Status : InProgress Error : … Pipeline ' MySparkOnDemandPipeline' run finished. Result: ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : MyDataFactory09102017 ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime} LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : 9/20/2017 6:46:30 AM DurationInMs : 763466 Status : Succeeded Error : {errorCode, message, failureType, target} Activity Output section: "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/" "jobId": "0" "ExecutionProgress": "Succeeded" "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)" Activity Error section: "errorCode": "" "message": "" "failureType": "" "target": "MySparkActivity"
确认是否在 adftutorial 容器的
spark
文件夹中创建了包含 spark 程序的输出的、名为outputfiles
的文件夹。
相关内容
此示例中的管道将数据从 Azure Blob 存储中的一个位置复制到另一个位置。 你已了解如何执行以下操作:
- 创建数据工厂。
- 创作并部署链接服务。
- 创作并部署管道。
- 启动管道运行。
- 监视管道运行。
继续学习下一篇教程,了解如何通过运行 Azure HDInsight 群集上的 Hive 脚本,转换虚拟网络中的数据。