クイック スタート:.NET SDK を使用してデータ ファクトリとパイプラインを作成する
適用対象: Azure Data Factory Azure Synapse Analytics
ヒント
企業向けのオールインワン分析ソリューション、Microsoft Fabric の Data Factory をお試しください。 Microsoft Fabric は、データ移動からデータ サイエンス、リアルタイム分析、ビジネス インテリジェンス、レポートまで、あらゆるものをカバーしています。 無料で新しい試用版を開始する方法について説明します。
このクイックスタートでは、.NET SDK を使用して Azure データ ファクトリを作成する方法について説明します。 このデータ ファクトリに作成されたパイプラインは、データを Azure BLOB ストレージ内のあるフォルダーから別のフォルダーにコピーします。 Azure Data Factory を使用してデータを変換する方法のチュートリアルについては、Spark を使用したデータ変換のチュートリアルを参照してください。
Note
この記事では、Data Factory サービスの概要については詳しく取り上げません。 Azure Data Factory サービスの概要については、「Azure Data Factory の概要」をご覧ください。
前提条件
Azure サブスクリプション
Azure サブスクリプションをお持ちでない場合は、開始する前に 無料アカウント を作成してください。
Azure ロール
Data Factory インスタンスを作成するには、Azure へのサインインに使用するユーザー アカウントが、"共同作成者" ロールまたは "所有者" ロールのメンバーであるか、Azure サブスクリプションの "管理者" である必要があります。 サブスクリプションで自分が持っているアクセス許可を表示するには、Azure portal に移動し、右上にあるユーザー名を選択してください。" ... " アイコンを選択してその他のオプションを表示し、 [アクセス許可] を選択します。 複数のサブスクリプションにアクセスできる場合は、適切なサブスクリプションを選択します。
データセット、リンクされたサービス、パイプライン、トリガー、および統合ランタイムを含む Data Factory の子リソースを作成および管理するには、次の要件が適用されます。
- Azure portal で子リソースを作成および管理するには、リソース グループ レベル以上で Data Factory 共同作成者ロールに属している必要があります。
- PowerShell または SDK を使用して子リソースを作成および管理する場合は、リソース レベル以上での共同作成者ロールで十分です。
ロールにユーザーを追加する方法に関するサンプル手順については、ロールの追加に関する記事を参照してください。
詳細については、次の記事を参照してください。
Azure ストレージ アカウント
このクイックスタートでは、"ソース" データ ストアと "コピー先" データ ストアの両方に汎用の Azure Storage アカウント (具体的には Blob Storage) を使用します。 汎用の Azure Storage アカウントがない場合、作成方法については、ストレージ アカウントの作成に関するページを参照してください。
ストレージ アカウント名を取得する
このクイックスタートには、Azure Storage アカウントの名前が必要です。 以下の手順に従って、ご利用のストレージ アカウントの名前を取得してください。
- Web ブラウザーで Azure portal にアクセスし、Azure のユーザー名とパスワードを使用してサインインします。
- [Azure portal] メニューで [すべてのサービス] を選択してから、 [ストレージ]>[ストレージ アカウント] の順に選択します。 また、任意のページから検索して、 [ストレージ アカウント] を選択することもできます。
- [ストレージ アカウント] ページで、ご利用のストレージ アカウントを (必要に応じて) フィルターで抽出し、該当するストレージ アカウントを選択します。
また、任意のページから検索して、 [ストレージ アカウント] を選択することもできます。
BLOB コンテナーを作成する
このセクションでは、adftutorial という名前の BLOB コンテナーを Azure Blob Storage に作成します。
ストレージ アカウント ページで、 [概要]>[コンテナー] を選択します。
<アカウント名> - [コンテナー] ページのツールバーで、コンテナー を選択します。
[新しいコンテナー] ダイアログ ボックスで、名前に「adftutorial」と入力し、 [OK] を選択します。 <アカウント名> - [コンテナー] ページが更新され、コンテナーの一覧に adftutorial が含まれるようになります。
BLOB コンテナーの入力フォルダーとファイルを追加する
このセクションでは、作成したコンテナーに input という名前のフォルダーを作成し、入力フォルダーにサンプル ファイルをアップロードします。 開始する前に、メモ帳などのテキスト エディターを開き、次の内容を含む emp.txt という名前のファイルを作成します。
John, Doe
Jane, Doe
C:\ADFv2QuickStartPSH フォルダーにファイルを保存します (フォルダーがまだ存在しない場合は作成します)。Azure portal に戻り、次の手順を実行します。
中断した <アカウント名> - [コンテナー] ページで、コンテナーの更新された一覧から adftutorial を選択します。
- ウィンドウを閉じた場合、または別のページに移動した場合は、[Azure portal] にもう一度サインインします。
- [Azure portal] メニューで [すべてのサービス] を選択してから、 [ストレージ]>[ストレージ アカウント] の順に選択します。 また、任意のページから検索して、 [ストレージ アカウント] を選択することもできます。
- ストレージ アカウントを選択してから、 [コンテナー]>[adftutorial] を選択します。
adftutorial コンテナー ページのツールバーで、 [アップロード] を選択します。
[BLOB のアップロード] ページで、 [ファイル] ボックスを選択し、emp.txt ファイルを参照して選択します。
[詳細設定] の見出しを展開します。 次のようにページが表示されます。
[アップロード先のフォルダー] ボックスに「input」と入力します。
[アップロード] ボタンを選択します。 一覧に emp.txt ファイルとアップロードの状態が表示されます。
[閉じる] アイコン (X) を選択して、 [BLOB のアップロード] ページを閉じます。
adftutorial コンテナーのページを開いたままにしておきます。 このクイックスタートの最後で、このページを使用して出力を確認します。
Visual Studio
この記事のチュートリアルでは、Visual Studio 2019 を使用します。 Visual Studio 2013、2015、または 2017 での手順は少し異なります。
Microsoft Entra ID でアプリケーションを作成する
「方法: ポータルを使用して、リソースにアクセスできる Microsoft Entra アプリケーションとサービス プリンシパルを作成する」のセクションから、手順に従って次のタスクを実行します。
- 「Microsoft Entra アプリケーションを作成する」で、このチュートリアルで作成している .NET アプリケーションを表すアプリケーションを作成します。 サインオン URL については、この記事に示されているようにダミーの URL (
https://contoso.org/exampleapp
) を指定できます。 - 「サインインするための値を取得する」で、アプリケーション ID とテナント ID を取得し、それらの値をメモしておいてください。このチュートリアルで後ほど使用します。
- 「証明書とシークレット」で、認証キーを取得し、その値をメモしておいてください。このチュートリアルで後ほど使用します。
- 「アプリケーションをロールに割り当てる」で、アプリケーションがサブスクリプションにデータ ファクトリを作成できるように、サブスクリプション レベルでアプリケーションを共同作成者ロールに割り当てます。
Visual Studio プロジェクトを作成する
次に、Visual Studio で、C# .NET コンソール アプリケーションを作成します。
- Visual Studio を起動します。
- スタート ウィンドウで、 [新しいプロジェクトの作成]>[コンソール アプリ (.NET Framework)] の順に選択します。 .NET バージョン 4.5.2 以降が必要です。
- [プロジェクト名] に「ADFv2QuickStart」と入力します。
- [作成] を選択してプロジェクトを作成します。
NuGet パッケージのインストール
[ツール]>[NuGet パッケージ マネージャー]>[パッケージ マネージャー コンソール] の順に選択します。
[パッケージ マネージャー コンソール] ウィンドウで、次のコマンドを実行してパッケージをインストールします。 詳細については、Azure.ResourceManager.DataFactory NuGet パッケージを参照してください。
Install-Package Azure.ResourceManager.DataFactory -IncludePrerelease Install-Package Azure.Identity
Data Factory の作成
Program.cs を開き、次のステートメントを挿入して、名前空間への参照を追加します。
using Azure; using Azure.Core; using Azure.Core.Expressions.DataFactory; using Azure.Identity; using Azure.ResourceManager; using Azure.ResourceManager.DataFactory; using Azure.ResourceManager.DataFactory.Models; using Azure.ResourceManager.Resources; using System; using System.Collections.Generic;
変数を設定する次のコードを Main メソッドに追加します。 プレースホルダーを実際の値に置き換えてください。 現在 Data Factory が利用できる Azure リージョンの一覧については、次のページで目的のリージョンを選択し、 [分析] を展開して [Data Factory] を探してください。リージョン別の利用可能な製品 データ ファクトリで使用するデータ ストア (Azure Storage、Azure SQL Database など) やコンピューティング (HDInsight など) は他のリージョンに配置できます。
// Set variables string tenantID = "<your tenant ID>"; string applicationId = "<your application ID>"; string authenticationKey = "<your authentication key for the application>"; string subscriptionId = "<your subscription ID where the data factory resides>"; string resourceGroup = "<your resource group where the data factory resides>"; string region = "<the location of your resource group>"; string dataFactoryName = "<specify the name of data factory to create. It must be globally unique.>"; string storageAccountName = "<your storage account name to copy data>"; string storageKey = "<your storage account key>"; // specify the container and input folder from which all files // need to be copied to the output folder. string inputBlobContainer = "<blob container to copy data from, e.g. containername>"; string inputBlobPath = "<path to existing blob(s) to copy data from, e.g. inputdir/file>"; //specify the contains and output folder where the files are copied string outputBlobContainer = "<blob container to copy data from, e.g. containername>"; string outputBlobPath = "<the blob path to copy data to, e.g. outputdir/file>"; // name of the Azure Storage linked service, blob dataset, and the pipeline string storageLinkedServiceName = "AzureStorageLinkedService"; string blobDatasetName = "BlobDataset"; string pipelineName = "Adfv2QuickStartPipeline";
データ ファクトリを作成する次のコードを Main メソッドに追加します。
ArmClient armClient = new ArmClient( new ClientSecretCredential(tenantID, applicationId, authenticationKey, new TokenCredentialOptions { AuthorityHost = AzureAuthorityHosts.AzurePublicCloud }), subscriptionId, new ArmClientOptions { Environment = ArmEnvironment.AzurePublicCloud } ); ResourceIdentifier resourceIdentifier = SubscriptionResource.CreateResourceIdentifier(subscriptionId); SubscriptionResource subscriptionResource = armClient.GetSubscriptionResource(resourceIdentifier); Console.WriteLine("Get an existing resource group " + resourceGroupName + "..."); var resourceGroupOperation = subscriptionResource.GetResourceGroups().Get(resourceGroupName); ResourceGroupResource resourceGroupResource = resourceGroupOperation.Value; Console.WriteLine("Create a data factory " + dataFactoryName + "..."); DataFactoryData dataFactoryData = new DataFactoryData(AzureLocation.EastUS2); var dataFactoryOperation = resourceGroupResource.GetDataFactories().CreateOrUpdate(WaitUntil.Completed, dataFactoryName, dataFactoryData); Console.WriteLine(dataFactoryOperation.WaitForCompletionResponse().Content); // Get the data factory resource DataFactoryResource dataFactoryResource = dataFactoryOperation.Value;
リンクされたサービスを作成する
Azure Storage のリンクされたサービスを作成する次のコードを Main メソッドに追加します。
データ ストアおよびコンピューティング サービスをデータ ファクトリにリンクするには、リンクされたサービスをデータ ファクトリに作成します。 このクイックスタートでは、コピー ソースとシンク ストアの両方に対して Azure Blob Storage のリンク サービスを 1 つ作成するだけで済みます。サンプルでは "AzureBlobStorageLinkedService" という名前です。
// Create an Azure Storage linked service
Console.WriteLine("Create a linked service " + storageLinkedServiceName + "...");
AzureBlobStorageLinkedService azureBlobStorage = new AzureBlobStorageLinkedService()
{
ConnectionString = azureBlobStorageConnectionString
};
DataFactoryLinkedServiceData linkedServiceData = new DataFactoryLinkedServiceData(azureBlobStorage);
var linkedServiceOperation = dataFactoryResource.GetDataFactoryLinkedServices().CreateOrUpdate(WaitUntil.Completed, storageLinkedServiceName, linkedServiceData);
Console.WriteLine(linkedServiceOperation.WaitForCompletionResponse().Content);
データセットを作成する
次のコードを Main メソッドに追加して、区切りテキスト データセットを作成します。
ソースからシンクにコピーするデータを表すデータセットを定義します。 この例では、この区切りテキスト データセットは、前の手順で作成した Azure Blob Storage のリンク サービスを参照します。 データセットは 2 つのパラメーターを受け取り、その値はデータセットを使うアクティビティで設定されます。 パラメーターは、データが存在する/格納される場所を指す "コンテナー" と "folderPath" を構築するために使われます。
// Create an Azure Blob dataset
DataFactoryLinkedServiceReference linkedServiceReference = new DataFactoryLinkedServiceReference(DataFactoryLinkedServiceReferenceType.LinkedServiceReference, storageLinkedServiceName);
DelimitedTextDataset delimitedTextDataset = new DelimitedTextDataset(linkedServiceReference)
{
DataLocation = new AzureBlobStorageLocation
{
Container = DataFactoryElement<string>.FromExpression("@dataset().container"),
FileName = DataFactoryElement<string>.FromExpression("@dataset().path")
},
Parameters =
{
new KeyValuePair<string, EntityParameterSpecification>("container",new EntityParameterSpecification(EntityParameterType.String)),
new KeyValuePair<string, EntityParameterSpecification>("path",new EntityParameterSpecification(EntityParameterType.String))
},
FirstRowAsHeader = false,
QuoteChar = "\"",
EscapeChar = "\\",
ColumnDelimiter = ","
};
DataFactoryDatasetData datasetData = new DataFactoryDatasetData(delimitedTextDataset);
var datasetOperation = dataFactoryResource.GetDataFactoryDatasets().CreateOrUpdate(WaitUntil.Completed, blobDatasetName, datasetData);
Console.WriteLine(datasetOperation.WaitForCompletionResponse().Content);
パイプラインを作成する
コピー アクティビティが含まれているパイプラインを作成する次のコードを Main メソッドに追加します。
この例では、このパイプラインには 1 つのアクティビティが含まれており、4 つのパラメーター (入力 BLOB コンテナーとパス、出力 BLOB コンテナーとパス) を受け取ります。 これらのパラメーターの値は、パイプラインがトリガー/実行されたときに設定されます。 コピー アクティビティは、入力と出力として、前の手順で作成された同じ BLOB データセットを参照します。 データセットを入力データセットとして使う場合、入力コンテナーとパスを指定します。 また、データセットが出力データセットとして使われる場合は、出力コンテナーとパスを指定します。
// Create a pipeline with a copy activity
Console.WriteLine("Creating pipeline " + pipelineName + "...");
DataFactoryPipelineData pipelineData = new DataFactoryPipelineData()
{
Parameters =
{
new KeyValuePair<string, EntityParameterSpecification>("inputContainer",new EntityParameterSpecification(EntityParameterType.String)),
new KeyValuePair<string, EntityParameterSpecification>("inputPath",new EntityParameterSpecification(EntityParameterType.String)),
new KeyValuePair<string, EntityParameterSpecification>("outputContainer",new EntityParameterSpecification(EntityParameterType.String)),
new KeyValuePair<string, EntityParameterSpecification>("outputPath",new EntityParameterSpecification(EntityParameterType.String))
},
Activities =
{
new CopyActivity("CopyFromBlobToBlob",new DataFactoryBlobSource(),new DataFactoryBlobSink())
{
Inputs =
{
new DatasetReference(DatasetReferenceType.DatasetReference,blobDatasetName)
{
Parameters =
{
new KeyValuePair<string, BinaryData>("container", BinaryData.FromString("\"@pipeline().parameters.inputContainer\"")),
new KeyValuePair<string, BinaryData>("path", BinaryData.FromString("\"@pipeline().parameters.inputPath\""))
}
}
},
Outputs =
{
new DatasetReference(DatasetReferenceType.DatasetReference,blobDatasetName)
{
Parameters =
{
new KeyValuePair<string, BinaryData>("container", BinaryData.FromString("\"@pipeline().parameters.outputContainer\"")),
new KeyValuePair<string, BinaryData>("path", BinaryData.FromString("\"@pipeline().parameters.outputPath\""))
}
}
}
}
}
};
var pipelineOperation = dataFactoryResource.GetDataFactoryPipelines().CreateOrUpdate(WaitUntil.Completed, pipelineName, pipelineData);
Console.WriteLine(pipelineOperation.WaitForCompletionResponse().Content);
パイプラインの実行を作成する
パイプラインの実行をトリガーする次のコードを Main メソッドに追加します。
また、このコードは、パイプラインで指定された inputContainer、inputPath、outputContainer、outputPath パラメーターの値を、ソースおよびシンクの BLOB パスの実際の値に設定します。
// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
Dictionary<string, BinaryData> parameters = new Dictionary<string, BinaryData>()
{
{ "inputContainer",BinaryData.FromObjectAsJson(inputBlobContainer) },
{ "inputPath",BinaryData.FromObjectAsJson(inputBlobPath) },
{ "outputContainer",BinaryData.FromObjectAsJson(outputBlobContainer) },
{ "outputPath",BinaryData.FromObjectAsJson(outputBlobPath) }
};
var pipelineResource = dataFactoryResource.GetDataFactoryPipeline(pipelineName);
var runResponse = pipelineResource.Value.CreateRun(parameters);
Console.WriteLine("Pipeline run ID: " + runResponse.Value.RunId);
パイプラインの実行を監視する
データのコピーが完了するまで状態を継続的にチェックするために、次のコードを Main メソッドに追加します。
// Monitor the pipeline run Console.WriteLine("Checking pipeline run status..."); DataFactoryPipelineRunInfo pipelineRun; while (true) { pipelineRun = dataFactoryResource.GetPipelineRun(runResponse.Value.RunId.ToString()); Console.WriteLine("Status: " + pipelineRun.Status); if (pipelineRun.Status == "InProgress" || pipelineRun.Status == "Queued") System.Threading.Thread.Sleep(15000); else break; }
コピー アクティビティの実行の詳細 (たとえば、読み取りおよび書き込みされたデータのサイズ) を取得する次のコードを Main メソッドに追加します。
// Check the copy activity run details Console.WriteLine("Checking copy activity run details..."); var queryResponse = dataFactoryResource.GetActivityRun(pipelineRun.RunId.ToString(), new RunFilterContent(DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10))); var enumerator = queryResponse.GetEnumerator(); enumerator.MoveNext(); if (pipelineRun.Status == "Succeeded") Console.WriteLine(enumerator.Current.Output); else Console.WriteLine(enumerator.Current.Error); Console.WriteLine("\nPress any key to exit..."); Console.ReadKey();
コードの実行
アプリケーションをビルドして起動し、パイプラインの実行を確認します。
コンソールに、データ ファクトリ、リンクされたサービス、データセット、パイプライン、およびパイプラインの実行の作成の進捗状況が表示されます。 その後、パイプラインの実行状態が確認されます。 コピー アクティビティの実行の詳細と、読み取りおよび書き込みデータのサイズが表示されるまで待ちます。 次に、Azure Storage Explorer などのツールを使用して、変数で指定したように BLOB が "inputBlobPath" から "outputBlobPath" にコピーされていることを確認します。
サンプル出力
Create a data factory quickstart-adf...
{
"name": "quickstart-adf",
"type": "Microsoft.DataFactory/factories",
"properties": {
"provisioningState": "Succeeded",
"version": "2018-06-01"
},
"location": "eastus2"
}
Create a linked service AzureBlobStorage...
{
"name": "AzureBlobStorage",
"type": "Microsoft.DataFactory/factories/linkedservices",
"properties": {
"type": "AzureBlobStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;",
"encryptedCredential": "<encryptedCredential>"
}
}
}
Creating dataset BlobDelimitedDataset...
{
"name": "BlobDelimitedDataset",
"type": "Microsoft.DataFactory/factories/datasets",
"properties": {
"type": "DelimitedText",
"linkedServiceName": {
"type": "LinkedServiceReference",
"referenceName": "AzureBlobStorage"
},
"parameters": {
"container": {
"type": "String"
},
"path": {
"type": "String"
}
},
"typeProperties": {
"location": {
"container": {
"type": "Expression",
"value": "@dataset().container"
},
"type": "AzureBlobStorageLocation",
"fileName": {
"type": "Expression",
"value": "@dataset().path"
}
},
"columnDelimiter": ",",
"quoteChar": "\"",
"escapeChar": "\\",
"firstRowAsHeader": false
}
}
}
Creating pipeline Adfv2QuickStartPipeline...
{
"properties": {
"activities": [
{
"inputs": [
{
"type": "DatasetReference",
"referenceName": "BlobDelimitedDataset",
"parameters": {
"container": "@pipeline().parameters.inputContainer",
"path": "@pipeline().parameters.inputPath"
}
}
],
"outputs": [
{
"type": "DatasetReference",
"referenceName": "BlobDelimitedDataset",
"parameters": {
"container": "@pipeline().parameters.outputContainer",
"path": "@pipeline().parameters.outputPath"
}
}
],
"name": "CopyFromBlobToBlob",
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource"
},
"sink": {
"type": "BlobSink"
}
}
}
],
"parameters": {
"inputContainer": {
"type": "String"
},
"inputPath": {
"type": "String"
},
"outputContainer": {
"type": "String"
},
"outputPath": {
"type": "String"
}
}
}
}
Creating pipeline run...
Pipeline run ID: 3aa26ffc-5bee-4db9-8bac-ccbc2d7b51c1
Checking pipeline run status...
Status: InProgress
Status: Succeeded
Checking copy activity run details...
{
"dataRead": 1048,
"dataWritten": 1048,
"filesRead": 1,
"filesWritten": 1,
"sourcePeakConnections": 1,
"sinkPeakConnections": 1,
"copyDuration": 8,
"throughput": 1.048,
"errors": [],
"effectiveIntegrationRuntime": "AutoResolveIntegrationRuntime (East US 2)",
"usedDataIntegrationUnits": 4,
"billingReference": {
"activityType": "DataMovement",
"billableDuration": [
{
"meterType": "AzureIR",
"duration": 0.06666666666666667,
"unit": "DIUHours"
}
],
"totalBillableDuration": [
{
"meterType": "AzureIR",
"duration": 0.06666666666666667,
"unit": "DIUHours"
}
]
},
"usedParallelCopies": 1,
"executionDetails": [
{
"source": {
"type": "AzureBlobStorage"
},
"sink": {
"type": "AzureBlobStorage"
},
"status": "Succeeded",
"start": "2023-12-15T10:25:33.9991558Z",
"duration": 8,
"usedDataIntegrationUnits": 4,
"usedParallelCopies": 1,
"profile": {
"queue": {
"status": "Completed",
"duration": 5
},
"transfer": {
"status": "Completed",
"duration": 1,
"details": {
"listingSource": {
"type": "AzureBlobStorage",
"workingDuration": 0
},
"readingFromSource": {
"type": "AzureBlobStorage",
"workingDuration": 0
},
"writingToSink": {
"type": "AzureBlobStorage",
"workingDuration": 0
}
}
}
},
"detailedDurations": {
"queuingDuration": 5,
"transferDuration": 1
}
}
],
"dataConsistencyVerification": {
"VerificationResult": "NotVerified"
}
}
Press any key to exit...
出力を検証する
このパイプラインは、adftutorial BLOB コンテナーに対して output フォルダーを自動的に作成します。 そのうえで、input フォルダーから output フォルダーに emp.txt ファイルをコピーします。
- Azure portal で、前の「BLOB コンテナーの入力フォルダーとファイルを追加する」セクションで見た adftutorial コンテナー ページの [最新の情報に更新] を選択し、output フォルダーを確認します。
- フォルダー一覧で、 [output] を選択します。
- emp.txt が output フォルダーにコピーされていることを確認します。
リソースをクリーンアップする
プログラムによってデータ ファクトリを削除するには、次のコード行をプログラムに追加します。
Console.WriteLine("Deleting the data factory");
dataFactoryResource.Delete(WaitUntil.Completed);
次のステップ
このサンプルのパイプラインは、Azure Blob Storage 内のある場所から別の場所にデータをコピーするものです。 より多くのシナリオで Data Factory を使用する方法については、チュートリアルを参照してください。