Azure Data Factory で Spark アクティビティを使用してクラウドのデータを変換する
適用対象: Azure Data Factory Azure Synapse Analytics
ヒント
企業向けのオールインワン分析ソリューション、Microsoft Fabric の Data Factory をお試しください。 Microsoft Fabric は、データ移動からデータ サイエンス、リアルタイム分析、ビジネス インテリジェンス、レポートまで、あらゆるものをカバーしています。 無料で新しい試用版を開始する方法について説明します。
このチュートリアルでは、Azure PowerShell を使用して Data Factory パイプラインを作成します。このパイプラインで、Spark アクティビティとオンデマンドの HDInsight のリンクされたサービスを使用してデータを変換します。 このチュートリアルでは、以下の手順を実行します。
- データ ファクトリを作成します。
- リンクされたサービスを作成してデプロイします。
- パイプラインを作成してデプロイします。
- パイプラインの実行を開始します。
- パイプラインの実行を監視します。
Azure サブスクリプションをお持ちでない場合は、開始する前に無料アカウントを作成してください。
前提条件
Note
Azure を操作するには、Azure Az PowerShell モジュールを使用することをお勧めします。 作業を始めるには、「Azure PowerShell をインストールする」を参照してください。 Az PowerShell モジュールに移行する方法については、「AzureRM から Az への Azure PowerShell の移行」を参照してください。
- Azure Storage アカウント。 Python スクリプトと入力ファイルを作成し、Azure ストレージにアップロードします。 Spark プログラムからの出力は、このストレージ アカウントに格納されます。 オンデマンドの Spark クラスターでは、同じストレージ アカウントがプライマリ ストレージとして使用されます。
- Azure PowerShell。 Azure PowerShell のインストールと構成の方法に関するページに記載されている手順に従います。
Python スクリプトを BLOB ストレージ アカウントにアップロードする
次の内容が含まれた、WordCount_Spark.py という名前の Python ファイルを作成します。
import sys from operator import add from pyspark.sql import SparkSession def main(): spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0]) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount") spark.stop() if __name__ == "__main__": main()
<storageAccountName> を Azure ストレージ アカウントの名前に置き換えます。 その後、ファイルを保存します。
Azure BLOB ストレージで、adftutorial という名前のコンテナーを作成します (存在しない場合)。
spark という名前のフォルダーを作成します。
spark フォルダーの下に、script という名前のサブフォルダーを作成します。
WordCount_Spark.py ファイルを script サブフォルダーにアップロードします。
入力ファイルをアップロードする
- minecraftstory.txt という名前のファイルを作成し、任意のテキストを入力しておきます。 このテキストの単語数が Spark プログラムによってカウントされます。
spark
フォルダーに、inputfiles
という名前のサブフォルダーを作成します。minecraftstory.txt
をinputfiles
サブフォルダーにアップロードします。
リンクされたサービスを作成する
このセクションでは、次の 2 つのリンクされたサービスを作成します。
- Azure ストレージ アカウントをデータ ファクトリにリンクする、Azure Storage のリンクされたサービス。 このストレージは、オンデマンドの HDInsight クラスターによって使用されます。 ここには、実行される Spark スクリプトも含まれています。
- オンデマンドの HDInsight のリンクされたサービス。 Azure Data Factory によって自動的に HDInsight クラスターが作成され、Spark プログラムが実行されます。その後、アイドル状態が事前に構成した時間を超えたら、HDInsight クラスターは削除されます。
Azure Storage のリンクされたサービス
任意のエディターを使用して JSON ファイルを作成し、Azure Storage のリンクされたサービスから次の JSON 定義をコピーして、ファイルを MyStorageLinkedService.json として保存します。
{
"name": "MyStorageLinkedService",
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
}
}
}
<storageAccountName> と <storageAccountKey> を Azure ストレージ アカウントの名前とキーで更新してください。
オンデマンドの HDInsight のリンクされたサービス
任意のエディターを使用して JSON ファイルを作成し、Azure HDInsight のリンクされたサービスから次の JSON 定義をコピーして、ファイルを MyOnDemandSparkLinkedService.json として保存します。
{
"name": "MyOnDemandSparkLinkedService",
"properties": {
"type": "HDInsightOnDemand",
"typeProperties": {
"clusterSize": 2,
"clusterType": "spark",
"timeToLive": "00:15:00",
"hostSubscriptionId": "<subscriptionID> ",
"servicePrincipalId": "<servicePrincipalID>",
"servicePrincipalKey": {
"value": "<servicePrincipalKey>",
"type": "SecureString"
},
"tenant": "<tenant ID>",
"clusterResourceGroup": "<resourceGroupofHDICluster>",
"version": "3.6",
"osType": "Linux",
"clusterNamePrefix":"ADFSparkSample",
"linkedServiceName": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
}
リンクされたサービスの定義で、以下のプロパティの値を更新します。
- hostSubscriptionId。 <subscriptionID> は、実際の Azure サブスクリプションの ID に置き換えてください。 オンデマンドの HDInsight クラスターは、このサブスクリプション内に作成されます。
- tenant。 <tenantID> は、実際の Azure テナントの ID に置き換えてください。
- servicePrincipalId、servicePrincipalKey。 <servicePrincipalID> と <servicePrincipalKey> を、Microsoft Entra ID のサービス プリンシパルの ID とキーに置き換えます。 このサービス プリンシパルは、サブスクリプションまたはクラスターが作成されるリソース グループの共同作成者ロールのメンバーである必要があります。 詳細については、 Microsoft Entra アプリケーションとサービス プリンシパルの作成 を参照してください。 [サービス プリンシパル ID] は "アプリケーション ID" に、 [サービス プリンシパル キー] は "クライアント シークレット" の値に相当します。
- clusterResourceGroup。 <resourceGroupOfHDICluster> は、HDInsight クラスターの作成先であるリソース グループの名前に置き換えてください。
Note
Azure HDInsight には、サポートされる各 Azure リージョンで使用できるコアの合計数に制限があります。 オンデマンドの HDInsight のリンクされるサービスの場合、HDInsight クラスターは、プライマリ ストレージとして使用される Azure ストレージと同じ場所に作成されます。 クラスターを正常に作成するための十分なコア クォータがあることを確認します。 詳細については、「Hadoop、Spark、Kafka などの HDInsight クラスターをセットアップする」を参照してください。
パイプラインを作成する
この手順では、Spark アクティビティがある新しいパイプラインを作成します。 アクティビティでは word count サンプルが使用されます。 まだコンテンツをダウンロードしていない場合は、この場所からダウンロードします。
任意のエディターで JSON ファイルを作成し、パイプライン定義から次の JSON 定義をコピーして、MySparkOnDemandPipeline.json として保存します。
{
"name": "MySparkOnDemandPipeline",
"properties": {
"activities": [
{
"name": "MySparkActivity",
"type": "HDInsightSpark",
"linkedServiceName": {
"referenceName": "MyOnDemandSparkLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"rootPath": "adftutorial/spark",
"entryFilePath": "script/WordCount_Spark.py",
"getDebugInfo": "Failure",
"sparkJobLinkedService": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
]
}
}
以下の点に注意してください。
- rootPath は、adftutorial コンテナーの spark フォルダーを指します。
- entryFilePath は、spark フォルダーの script サブフォルダーの WordCount_Spark.py ファイルを指します。
Data Factory の作成
JSON ファイルで、リンクされたサービスとパイプライン定義を作成しました。 次に、PowerShell コマンドレットを使用して、データ ファクトリを作成し、リンクされたサービスとパイプライン JSON ファイルをデプロイしましょう。 以下の PowerShell コマンドを 1 つずつ実行します。
変数を 1 つずつ設定します。
リソース グループ名
$resourceGroupName = "ADFTutorialResourceGroup"
データ ファクトリ名。 グローバルに一意であること
$dataFactoryName = "MyDataFactory09102017"
パイプライン名
$pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
PowerShellを起動します。 Azure PowerShell は、このクイックスタートが終わるまで開いたままにしておいてください。 Azure PowerShell を閉じて再度開いた場合は、これらのコマンドをもう一度実行する必要があります。 現在 Data Factory が利用できる Azure リージョンの一覧については、次のページで目的のリージョンを選択し、 [分析] を展開して [Data Factory] を探してください。リージョン別の利用可能な製品 データ ファクトリで使用するデータ ストア (Azure Storage、Azure SQL Database など) やコンピューティング (HDInsight など) は他のリージョンに配置できます。
次のコマンドを実行して、Azure Portal へのサインインに使用するユーザー名とパスワードを入力します。
Connect-AzAccount
次のコマンドを実行して、このアカウントのすべてのサブスクリプションを表示します。
Get-AzSubscription
次のコマンドを実行して、使用するサブスクリプションを選択します。 SubscriptionId は、実際の Azure サブスクリプションの ID に置き換えてください。
Select-AzSubscription -SubscriptionId "<SubscriptionId>"
リソース グループを作成します。ADFTutorialResourceGroup。
New-AzResourceGroup -Name $resourceGroupName -Location "East Us"
データ ファクトリを作成します。
$df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
次のコマンドを実行して、出力を表示します。
$df
JSON ファイルを作成したフォルダーに移動し、次のコマンドを実行して、Azure Storage のリンクされたサービスをデプロイします。
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
次のコマンドを実行し、オンデマンドの Spark のリンクされたサービスをデプロイします。
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
次のコマンドを実行し、パイプラインをデプロイします。
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
パイプラインの実行を開始して監視する
パイプラインの実行を開始します。 将来の監視のために、パイプラインの実行の ID もキャプチャされます。
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName
次のスクリプトを実行し、パイプラインの実行の状態を、完了するまで継続的にチェックします。
while ($True) { $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30) if(!$result) { Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow" } elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) { Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow" } else { Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow" $result break } ($result | Format-List | Out-String) Start-Sleep -Seconds 15 } Write-Host "Activity `Output` section:" -foregroundcolor "Yellow" $result.Output -join "`r`n" Write-Host "Activity `Error` section:" -foregroundcolor "Yellow" $result.Error -join "`r`n"
サンプル実行の出力結果を次に示します。
Pipeline run status: In Progress ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : DurationInMs : Status : InProgress Error : … Pipeline ' MySparkOnDemandPipeline' run finished. Result: ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : MyDataFactory09102017 ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime} LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : 9/20/2017 6:46:30 AM DurationInMs : 763466 Status : Succeeded Error : {errorCode, message, failureType, target} Activity Output section: "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/" "jobId": "0" "ExecutionProgress": "Succeeded" "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)" Activity Error section: "errorCode": "" "message": "" "failureType": "" "target": "MySparkActivity"
adftutorial コンテナーの
spark
フォルダーにoutputfiles
というフォルダーが作成され、Spark プログラムの出力が含まれていることを確認します。
関連するコンテンツ
このサンプルのパイプラインは、Azure Blob Storage 内のある場所から別の場所にデータをコピーするものです。 以下の方法を学習しました。
- データ ファクトリを作成します。
- リンクされたサービスを作成してデプロイします。
- パイプラインを作成してデプロイします。
- パイプラインの実行を開始します。
- パイプラインの実行を監視します。
仮想ネットワークにある Azure HDInsight クラスター上で Hive スクリプトを実行してデータを変換する方法については、次のチュートリアルに進んでください。