データをコピーし、成功および失敗に関するメール通知を送信する
適用対象: Azure Data Factory Azure Synapse Analytics
ヒント
企業向けのオールインワン分析ソリューション、Microsoft Fabric の Data Factory をお試しください。 Microsoft Fabric は、データ移動からデータ サイエンス、リアルタイム分析、ビジネス インテリジェンス、レポートまで、あらゆるものをカバーしています。 無料で新しい試用版を開始する方法について説明します。
このチュートリアルでは、いくつかの制御フロー機能を紹介する Data Factory パイプラインを作成します。 このパイプラインでは、Azure Blob Storage 内のコンテナーから同じストレージ アカウント内の別のコンテナーへの単純なコピーを行います。 コピー アクティビティが成功した場合、成功したコピー操作の詳細 (書き込まれたデータの量など) がパイプラインによって成功電子メールで送信されます。 コピー アクティビティが失敗した場合、コピー失敗の詳細 (エラー メッセージなど) がパイプラインによって失敗電子メールで送信されます。 チュートリアル全体を通じて、パラメーターを渡す方法が示されます。
シナリオの概要:
このチュートリアルでは、以下の手順を実行します。
- データ ファクトリを作成します。
- Azure Storage のリンクされたサービスを作成します。
- Azure BLOB データセットを作成します。
- コピー アクティビティと Web アクティビティを含むパイプラインを作成します。
- アクティビティの出力を後続のアクティビティに送信します。
- パラメーターの引き渡しとシステム変数を利用します。
- パイプラインの実行を開始します。
- パイプラインとアクティビティの実行を監視します。
このチュートリアルでは、Azure Portal を使用します。 その他のメカニズムを使用して Azure Data Factory を操作する場合は、目次から「クイック スタート」を参照してください。
前提条件
- Azure サブスクリプション。 Azure サブスクリプションをお持ちでない場合は、開始する前に無料アカウントを作成してください。
- Azure Storage アカウント。 BLOB ストレージをソース データ ストアとして使用します。 Azure ストレージ アカウントがない場合、ストレージ アカウントの作成手順については、「ストレージ アカウントの作成」を参照してください。
- Azure SQL データベース。 データベースをシンク データ ストアとして使用します。 Azure SQL Database のデータベースがない場合の作成手順については、Azure SQL Database のデータベースの作成に関する記事を参照してください。
BLOB テーブルを作成する
メモ帳を起動します。 次のテキストをコピーし、input.txt ファイルとしてディスクに保存します。
John,Doe Jane,Doe
Azure Storage Explorer などのツールを使用して、次の手順を実行します。
- adfv2branch コンテナーを作成します。
- adfv2branch コンテナーに input フォルダーを作成します。
- input.txt ファイルをコンテナーにアップロードします。
電子メール ワークフロー エンドポイントを作成する
パイプラインからメール送信をトリガーするには、Azure Logic Apps を使用してワークフローを定義します。 ロジック アプリ ワークフローの作成の詳細については、「従量課金ロジック アプリ ワークフローの例を作成する」を参照してください。
成功電子メールのワークフロー
CopySuccessEmail
という名前の、従量課金ロジック アプリ ワークフローを作成します。 [HTTP 要求の受信時] という名前の要求トリガーを追加し、[メールの送信] という名前の [Office 365 Outlook] アクションを追加します。 メッセージが表示されたら、Office 365 Outlook アカウントにサインインします。
要求トリガーの [要求本文の JSON スキーマ] ボックスに、次の JSON を入力します。
{
"properties": {
"dataFactoryName": {
"type": "string"
},
"message": {
"type": "string"
},
"pipelineName": {
"type": "string"
},
"receiver": {
"type": "string"
}
},
"type": "object"
}
ワークフロー デザイナーでの [要求] トリガーは、次の図のようになります。
[メールの送信] アクションに対しては、[要求本文の JSON スキーマ] で渡されるプロパティを利用して、メールの書式設定をカスタマイズします。 たとえば次のようになります。
ワークフローを保存します。 成功電子メール ワークフローの HTTP Post 要求 URL をメモしておきます。
//Success Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000
失敗電子メールのワークフロー
同じ手順に従って、CopyFailEmail
という名前の、別のロジック アプリ ワークフローを作成します。 要求トリガーでは、[要求本文の JSON スキーマ] の値は同じです。 Subject
のように電子メールの書式を変更し、失敗電子メールになるように調整します。 たとえば次のようになります。
ワークフローを保存します。 失敗電子メール ワークフローの HTTP Post 要求 URL をメモしておきます。
//Fail Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000
2 つのワークフローの URL を取得できました。
//Success Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000
//Fail Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000
Data Factory の作成
Web ブラウザー (Microsoft Edge または Google Chrome) を起動します。 現在、Data Factory の UI がサポートされる Web ブラウザーは Microsoft Edge と Google Chrome だけです。
左上のメニューを展開し、[リソースの作成] を選択します。 次に、>[統合]>[Data Factory] を選択します。
[新しいデータ ファクトリ] ページで、 [名前] に「ADFTutorialDataFactory」と入力します。
Azure データ ファクトリの名前は グローバルに一意にする必要があります。 次のエラーが発生した場合は、データ ファクトリの名前を変更して (yournameADFTutorialDataFactory など) 作成し直してください。 Data Factory アーティファクトの名前付け規則については、Data Factory の名前付け規則に関する記事を参照してください。
データ ファクトリ名 "ADFTutorialDataFactory" は利用できません。
データ ファクトリを作成する Azure サブスクリプションを選択します。
[リソース グループ] について、次の手順のいずれかを行います。
[Use existing (既存のものを使用)] を選択し、ドロップダウン リストから既存のリソース グループを選択します。
[新規作成] を選択し、リソース グループの名前を入力します。
リソース グループの詳細については、 リソース グループを使用した Azure のリソースの管理に関するページを参照してください。
バージョンとして [V2] を選択します。
データ ファクトリの 場所 を選択します。 サポートされている場所のみがドロップダウン リストに表示されます。 データ ファクトリで使用するデータ ストア (Azure Storage、Azure SQL Database など) やコンピューティング (HDInsight など) は他のリージョンに配置できます。
[ダッシュボードにピン留めする] をオンにします。
Create をクリックしてください。
作成が完了すると、図に示されているような [Data Factory] ページが表示されます。
[Azure Data Factory Studio を開く] タイルをクリックして、別のタブで Azure Data Factory ユーザー インターフェイス (UI) を起動します。
パイプラインを作成する
この手順では、1 つのコピー アクティビティと 2 つの Web アクティビティが含まれたパイプラインを作成します。 このパイプラインの作成には次の機能を使用します。
- データセットによってアクセスされるパイプラインのパラメーター。
- 成功/失敗電子メールを送信するロジック アプリ ワークフローを呼び出す Web アクティビティ。
- (成功および失敗に基づいた) あるアクティビティと別のアクティビティの接続
- アクティビティからの出力を後続のアクティビティへの入力として使用する
Data Factory UI のホーム ページで [調整] タイルをクリックします。
パイプラインのプロパティ ウィンドウで [パラメーター] タブに切り替え、 [新規] ボタンを使用して、種類が文字列の 3 つのパラメーター (sourceBlobContainer、sinkBlobContainer、receiver) を追加します。
- sourceBlobContainer - ソース BLOB データセットによって使用されるパイプライン内のパラメーターです。
- sinkBlobContainer - シンク BLOB データセットによって使用されるパイプライン内のパラメーターです。
- receiver - このパラメーターは、このパラメーターによって電子メール アドレスが指定されている受信者に成功または失敗の電子メールを送信するパイプライン内の 2 つの Web アクティビティによって使用されます。
[アクティビティ] ツールボックスで [Copy] を見つけ、パイプライン デザイナー画面に [Copy] アクティビティをドラッグ アンド ドロップします。
パイプライン デザイナーのサーフェスにドラッグした [Copy] アクティビティを選択します。 下部にある [コピー] アクティビティのプロパティ ウィンドウで、 [ソース] タブに切り替えて [+ 新規] をクリックします。 この手順でコピー アクティビティのソース データセットを作成します。
[新しいデータセット] ウィンドウで、上部にある [Azure] タブを選択し、次に [Azure Blob Storage] を選択し、[続行] を選択します。
[書式の選択] ウィンドウで、[DelimitedText] を選択し、[続行] を選択します。
[プロパティの設定] という新しいタブが表示されます。 データセットの名前を「SourceBlobDataset」に変更します。 [リンク サービス] ドロップダウンを選択し、[+ 新規] を選択して、ソース データセットに新しいリンク サービスを作成します。
[新しいリンク サービス] ウィンドウが表示され、リンク サービスに必要なプロパティを入力できます。
[New Linked Service](新しいリンクされたサービス) ウィンドウで、次の手順を完了します。
- [名前] に「AzureStorageLinkedService」と入力します。
- [ストレージ アカウント名] で Azure ストレージ アカウントを選択します。
- Create をクリックしてください。
次に表示される [プロパティの設定] ウィンドウで、[このデータセットを開く] を選択して、ファイル名のパラメーター化された値を入力します。
フォルダーには「
@pipeline().parameters.sourceBlobContainer
」、ファイル名には「emp.txt
」と入力します。[パイプライン] タブに戻り (または左側のツリービューでパイプラインをクリック)、デザイナーで [Copy] アクティビティを選択します。 ソース データセットとして新しいデータセットが選択されていることを確認します。
プロパティ ウィンドウで [シンク] タブに切り替えて、 [Sink Dataset](シンク データセット) の [+ 新規] をクリックします。 ソース データセットの作成と同様のこの手順では、コピー アクティビティのシンク データセットを作成します。
[新しいデータセット] ウィンドウで、[Azure Blob Storage] を選択し、[続行] をクリックし、その後 [書式の選択] ウィンドウでもう一度 [DelimitedText] を選択し、[続行] をもう一度クリックします。
データセットの [プロパティの設定] ページで、[名前] に SinkBlobDataset を入力し、LinkedService に AzureStorageLinkedService を選択します。
プロパティ ページの [詳細] セクションを展開し、[このデータセットを開く] を選択します。
データセットの [接続] タブで、[ファイル のパス] を編集します。 フォルダーには「
@pipeline().parameters.sinkBlobContainer
」、ファイル名には「@concat(pipeline().RunId, '.txt')
」と入力します。 この式では、ファイル名に現在のパイプライン実行の ID を使用します。 サポートされているシステム変数および式の一覧については、システム変数に関するページと式言語に関するページを参照してください。上部にあるパイプラインのタブに切り替えて戻ります。 検索ボックスで Web を検索し、[Web] アクティビティをパイプライン デザイナー画面にドラッグ アンド ドロップします。 アクティビティの名前を「SendSuccessEmailActivity」に設定します。 Web アクティビティでは、任意の REST エンドポイントを呼び出すことができます。 このアクティビティの詳細については、Web アクティビティに関する記事を参照してください。 このパイプラインでは、Web アクティビティを使用して Logic Apps 電子メール ワークフローを呼び出します。
[General](一般) タブから [設定] タブに切り替えて、次の手順を実行します。
[URL] で、成功電子メールを送信するロジック アプリ ワークフローの URL を指定します。
[メソッド] に [POST] を選択します。
[ヘッダー] セクションにある + [ヘッダーの追加] リンクをクリックします。
[コンテンツの種類] というヘッダーを追加し、それを application/json に設定します。
[本文] に次の JSON を指定します。
{ "message": "@{activity('Copy1').output.dataWritten}", "dataFactoryName": "@{pipeline().DataFactory}", "pipelineName": "@{pipeline().Pipeline}", "receiver": "@pipeline().parameters.receiver" }
メッセージの本文には次のプロパティが含まれます。
Message -
@{activity('Copy1').output.dataWritten
から渡される値。 前のコピー アクティビティのプロパティにアクセスし、dataWritten の値を渡します。 失敗の場合、@{activity('CopyBlobtoBlob').error.message
の代わりにエラー出力を渡します。Data Factory Name -
@{pipeline().DataFactory}
から渡される値。これはシステム変数であり、対応するデータ ファクトリ名へのアクセスを可能にします。 サポートされているシステム変数の一覧については、「システム変数」の記事を参照してください。Pipeline Name -
@{pipeline().Pipeline}
から渡される値。 これもシステム変数であり、対応するパイプライン名へのアクセスを可能にします。Receiver - "@pipeline().parameters.receiver") から渡される値。 パイプライン パラメーターにアクセスします。
Copy アクティビティの横にある緑のチェックボックス ボタンを Web アクティビティにドラッグ アンド ドロップして、[Copy] アクティビティを [Web] アクティビティに接続します。
[アクティビティ] ツール ボックスからパイプライン デザイナー画面に別の [Web] アクティビティをドラッグ アンド ドロップし、 [名前] を「SendFailureEmailActivity」に設定します。
[設定] タブに切り替えて、次の手順を実行します。
[URL] で、失敗電子メールを送信するロジック アプリ ワークフローの URL を指定します。
[メソッド] に [POST] を選択します。
[ヘッダー] セクションにある + [ヘッダーの追加] リンクをクリックします。
[コンテンツの種類] というヘッダーを追加し、それを application/json に設定します。
[本文] に次の JSON を指定します。
{ "message": "@{activity('Copy1').error.message}", "dataFactoryName": "@{pipeline().DataFactory}", "pipelineName": "@{pipeline().Pipeline}", "receiver": "@pipeline().parameters.receiver" }
パイプライン デザイナーで [Copy] アクティビティの右側にある赤い [X] ボタンを選択し、先ほど作成した SendFailureEmailActivity にドラッグ アンド ドロップします。
パイプラインを検証するために、ツール バーの [検証] ボタンをクリックします。 >> ボタンをクリックして [Pipeline Validation Output](パイプラインの検証の出力) ウィンドウを閉じます。
エンティティ (データセットやパイプラインなど) を Data Factory サービスに発行するには、 [すべて公開] を選択します。 [正常に発行されました] というメッセージが表示されるまで待機します。
成功するパイプラインの実行をトリガーする
パイプラインの実行をトリガーするために、ツール バーの [トリガー] をクリックし、 [Trigger Now](今すぐトリガー) をクリックします。
[Pipeline Run](パイプラインの実行) ウィンドウで、次の手順を実行します。
sourceBlobContainer パラメーターに「adftutorial/adfv2branch/input」を入力します。
sinkBlobContainer パラメーターに「adftutorial/adfv2branch/output」を入力します。
receiver のメール アドレスを入力します。
[完了] をクリックします。
成功したパイプラインの実行を監視する
パイプラインの実行を監視するには、左側の [監視] タブに切り替えます。 手動でトリガーしたパイプラインの実行が表示されます。 [最新の情報に更新] ボタンを使用して、一覧を更新します。
このパイプラインの実行に関連付けられているアクティビティの実行を表示するには、 [アクション] 列にある最初のリンクをクリックします。 上部の [パイプライン] をクリックすると、前のビューに戻ることができます。 [最新の情報に更新] ボタンを使用して、一覧を更新します。
失敗するパイプラインの実行をトリガーする
左側で [編集] タブに切り替えます。
パイプラインの実行をトリガーするために、ツール バーの [トリガー] をクリックし、 [Trigger Now](今すぐトリガー) をクリックします。
[Pipeline Run](パイプラインの実行) ウィンドウで、次の手順を実行します。
- sourceBlobContainer パラメーターに「adftutorial/dummy/input」を入力します。 ダミー フォルダーが adftutorial コンテナーに存在しないことを確認します。
- sinkBlobContainer パラメーターに「adftutorial/dummy/output」を入力します。
- receiver のメール アドレスを入力します。
- [完了] をクリックします。
失敗したパイプラインの実行を監視する
パイプラインの実行を監視するには、左側の [監視] タブに切り替えます。 手動でトリガーしたパイプラインの実行が表示されます。 [最新の情報に更新] ボタンを使用して、一覧を更新します。
エラーの詳細を表示するには、パイプラインの実行の [エラー] リンクをクリックします。
このパイプラインの実行に関連付けられているアクティビティの実行を表示するには、 [アクション] 列にある最初のリンクをクリックします。 [最新の情報に更新] ボタンを使用して、一覧を更新します。 パイプラインのコピー アクティビティが失敗したことがわかります。 Web アクティビティによる指定された受信者への失敗電子メールの送信は成功しました。
[アクション] 列の [エラー] リンクをクリックして、エラーの詳細を表示します。
関連するコンテンツ
このチュートリアルでは、以下の手順を実行しました。
- データ ファクトリを作成します。
- Azure Storage のリンクされたサービスを作成します。
- Azure BLOB データセットを作成します。
- コピー アクティビティと Web アクティビティを含むパイプラインを作成します。
- アクティビティの出力を後続のアクティビティに送信します。
- パラメーターの引き渡しとシステム変数を利用します。
- パイプラインの実行を開始します。
- パイプラインとアクティビティの実行を監視します。
これで、「概念」セクションに進み、Azure Data Factory の詳細について学習できるようになりました。