マッピング データ フローを使用して安全にデータを変換する
適用対象: Azure Data Factory Azure Synapse Analytics
ヒント
企業向けのオールインワン分析ソリューション、Microsoft Fabric の Data Factory をお試しください。 Microsoft Fabric は、データ移動からデータ サイエンス、リアルタイム分析、ビジネス インテリジェンス、レポートまで、あらゆるものをカバーしています。 無料で新しい試用版を開始する方法について説明します。
Azure Data Factory を初めて使用する場合は、「Azure Data Factory の概要」を参照してください。
このチュートリアルでは、Data Factory ユーザー インターフェイス (UI) を使い、Data Factory マネージド仮想ネットワーク内でマッピング データ フローを使用して Azure Data Lake Storage Gen2 ソースから Data Lake Storage Gen2 シンク (どちらも、選択したネットワークへの接続のみを許可) にデータをコピーして変換するパイプラインを作成します。 このチュートリアルの構成パターンは、マッピング データ フローを使用してデータを変換するときに拡張することができます。
このチュートリアルでは、次の手順を実行します。
- データ ファクトリを作成します。
- データ フロー アクティビティが含まれるパイプラインを作成します。
- 4 つの変換を使用して、マッピング データ フローを構築します。
- パイプラインをテスト実行します。
- データ フロー アクティビティを監視します。
前提条件
- Azure サブスクリプション。 Azure サブスクリプションをお持ちでない場合は、開始する前に無料の Azure アカウントを作成してください。
- Azure ストレージ アカウント。 Data Lake Storage を、ソースとシンクのデータ ストアとして使用します。 ストレージ アカウントがない場合の作成手順については、Azure のストレージ アカウントの作成に関するページを参照してください。 ストレージ アカウントで、選択したネットワークからのアクセスのみが許可されていることを確認します。
このチュートリアルで変換するファイルは、GitHub コンテンツ サイトにある moviesDB.csv です。 GitHub からファイルを取得するには、コンテンツを任意のテキスト エディターにコピーして、.csv ファイルとしてローカルに保存します。 ファイルをご自分のストレージ アカウントにアップロードするには、Azure portal を使用した BLOB のアップロードに関するページを参照してください。 例では、sample-data という名前のコンテナーを参照します。
Data Factory の作成
この手順では、データ ファクトリを作成し、Data Factory UI を開いて、データ ファクトリにパイプラインを作成します。
Microsoft Edge または Google Chrome を開きます。 現在、Data Factory UI をサポートしている Web ブラウザーは Microsoft Edge と Google Chrome のみです。
左側のメニューで、 [リソースの作成]>[分析]>[Data Factory] の順に選択します。
[新しいデータ ファクトリ] ページで、 [名前] に「ADFTutorialDataFactory」と入力します。
データ ファクトリの名前は "グローバルに一意" にする必要があります。 データ ファクトリの名前の値に関するエラー メッセージが表示された場合は、別の名前を入力してください (yournameADFTutorialDataFactory など)。 Data Factory アーティファクトの名前付け規則については、Data Factory の名前付け規則に関するページを参照してください。
データ ファクトリを作成する Azure サブスクリプションを選択します。
[リソース グループ] で、次の手順のいずれかを行います。
- [Use existing (既存のものを使用)] を選択し、ドロップダウン リストから既存のリソース グループを選択します。
- [新規作成] を選択し、リソース グループの名前を入力します。
リソース グループの詳細については、リソース グループを使用した Azure のリソースの管理に関するページを参照してください。
[バージョン] で、 [V2] を選択します。
[場所] で、データ ファクトリの場所を選択します。 サポートされている場所のみがドロップダウン リストに表示されます。 データ ファクトリによって使用されるデータ ストア (Azure Storage、Azure SQL Database など) やコンピューティング (Azure HDInsight など) は、他のリージョンに存在していてもかまいません。
[作成] を選択します
作成が完了すると、その旨が通知センターに表示されます。 [リソースに移動] を選択して、 [Data Factory] ページに移動します。
[Azure Data Factory Studio を開く] を選択し、別のタブで Data Factory UI を起動します。
Data Factory マネージド仮想ネットワークに Azure IR を作成する
この手順では、Azure IR を作成し、Data Factory マネージド仮想ネットワークを有効にします。
Data Factory ポータルで、 [管理] に移動し、 [新規] を選択して新しい Azure IR を作成します。
[Integration runtime setup](統合ランタイムのセットアップ) ページで、必要な機能に基づいて作成する統合ランタイムを選択します。 このチュートリアルでは、 [Azure、セルフホステッド] を選択し、 [続行] をクリックします。
[Azure] を選択し、 [続行] をクリックして、Azure Integration ランタイムを作成します。
[仮想ネットワークの構成 (プレビュー)] で、 [有効化] を選択します。
[作成] を選択します
データ フロー アクティビティが含まれるパイプラインの作成
この手順では、データ フロー アクティビティが含まれるパイプラインを作成します。
Azure Data Factory のホーム ページで、 [Orchestrate](調整) を選択します。
パイプラインの [プロパティ] ウィンドウで、パイプラインの名前として TransformMovies と入力します。
[アクティビティ] ウィンドウで、 [移動と変換] を展開します。 ウィンドウから [Data Flow] アクティビティをパイプライン キャンバスにドラッグします。
[Adding data flow](データ フローの追加) ポップアップで、 [Create new data flow](新しいデータ フローの作成) を選択し、 [Mapping Data Flow](マッピング データ フロー) を選択します。 完了したら、 [OK] をクリックします。
[プロパティ] ウィンドウで、データ フローに TransformMovies という名前を付けます。
パイプライン キャンバスの上部のバーで、 [Data Flow のデバッグ] スライダーをオンにスライドします。 デバッグ モードを使用すると、ライブ Spark クラスターに対する変換ロジックの対話型テストが可能になります。 Data Flow クラスターのウォームアップには 5 から 7 分かかるため、ユーザーが Data Flow の開発を計画している場合は、最初にデバッグを有効にすることをお勧めします。 詳細については、デバッグ モードに関するページを参照してください。
データ フロー キャンバスでの変換ロジックの作成
データ フローを作成すると、データ フロー キャンバスに自動的に飛ばされます。 この手順では、Data Lake Storage 内の moviesDB.csv ファイルを取得し、1910 年から 2000 年までのコメディの平均評価を集計するデータ フローを作成します。 次に、このファイルを Data Lake Storage ストレージに書き戻します。
ソース変換を追加する
この手順では、Data Lake Storage Gen2 をソースとして設定します。
データ フロー キャンバスで [Add Source](ソースの追加) ボックスを選択して、ソースを追加します。
ソースに MoviesDB という名前を付けます。 [新規] を選択して、新しいソース データセットを作成します。
[Azure Data Lake Storage Gen2] を選択し、 [続行] を選択します。
[DelimitedText] を選択してから、 [続行] を選択します。
データセットに MoviesDB という名前を付けます。 リンクされたサービスのドロップダウンで、 [新規] を選択します。
リンクされたサービスの作成画面で、Data Lake Storage Gen2 のリンクされたサービスに ADLSGen2 という名前を付けて、使用する認証方法を指定します。 次に、接続の資格情報を入力します。 このチュートリアルでは、アカウント キーを使用してストレージ アカウントに接続しています。
[Interactive authoring](インタラクティブな作成) を必ず有効にしてください。 これは有効になるまでに 1 分程かかる場合があります。
[接続テスト] を選択します。 プライベート エンドポイントを作成および承認しなければストレージ アカウントにアクセスできないようになっているため、これは失敗するはずです。 エラー メッセージ内に、プライベート エンドポイントを作成するためのリンクが表示されます。それをたどることで、マネージド プライベート エンドポイントを作成できます。 代わりに、 [管理] タブに直接移動し、こちらのセクションの指示に従って、マネージド プライベート エンドポイントを作成する方法もあります。
ダイアログ ボックスは開いたままにして、ストレージ アカウントに移動します。
このセクションの手順に従って、プライベート リンクを承認します。
ダイアログ ボックスに戻ります。 もう一度 [接続テスト] を選択し、 [作成] を選択して、リンクされたサービスをデプロイします。
データセットの作成画面で、 [ファイル パス] フィールドの下にファイルが配置されている場所を入力します。 このチュートリアルでは、moviesDB.csv ファイルはコンテナー sample-data に配置されています。 ファイルにはヘッダーが含まれているので、 [先頭の行を見出しとして使用] チェック ボックスをオンにします。 ストレージ内のファイルからヘッダー スキーマを直接インポートするには、 [From connection/store](接続/ストアから) を選択します。 完了したら、 [OK] をクリックします。
デバッグ クラスターが起動している場合は、ソース変換の [データのプレビュー] タブに移動し、 [更新] を選択して、データのスナップショットを取得します。 データ プレビューを使用すると、変換が正しく構成されていることを確認できます。
マネージド プライベート エンドポイントを作成する
前述の接続をテストした際にハイパーリンクを使用しなかった場合は、パスに従います。 次に、作成したリンクされたサービスに接続するマネージド プライベート エンドポイントを作成する必要があります。
[管理] タブに移動します。
Note
Data Factory インスタンスでは、そのすべてで [管理] タブを使用できない場合があります。 表示されない場合は、 [作成者]>[接続]>[プライベート エンドポイント] を選択して、プライベート エンドポイントにアクセスできます。
[マネージド プライベート エンドポイント] セクションに移動します。
[マネージド プライベート エンドポイント] で、 [+ 新規] を選択します。
リストから [Azure Data Lake Storage Gen2] タイルを選択し、 [続行] を選択します。
作成したストレージ アカウントの名前を入力します。
[作成] を選択します
数秒後に、作成されたプライベート リンクに承認が必要であることが表示されます。
作成したプライベート エンドポイントを選択します。 ストレージ アカウント レベルでプライベート エンドポイントを承認できるハイパーリンクが表示されます。
ストレージ アカウントでプライベート リンクを承認する
ストレージ アカウントで、 [設定] セクションの [プライベート エンドポイント接続] に移動します。
作成したプライベート エンドポイントのチェック ボックスをオンにし、 [承認] を選択します。
説明を追加し、 [はい] を選択します。
Data Factory の [管理] タブにある [マネージド プライベート エンドポイント] セクションに戻ります。
約 1 分後に、プライベート エンドポイントの承認の旨が表示されます。
フィルター変換を追加する
データ フロー キャンバス上のソース ノードの横にあるプラス アイコンを選択して、新しい変換を追加します。 最初に追加する変換は、 [フィルター] です。
フィルター変換に FilterYears という名前を付けます。 [フィルター適用] の横にある式ボックスを選択して、式ビルダーを開きます。 ここでフィルター条件を指定します。
データ フローの式ビルダーでは、さまざまな変換で使用する式を対話形式で作成できます。 式には、組み込み関数、入力スキーマの列、ユーザー定義のパラメーターを含めることができます。 式の作成方法の詳細については、データ フローの式ビルダーに関するページを参照してください。
このチュートリアルでは、1910 年から 2000 年の間に公開された、ジャンルがコメディの映画をフィルター処理します。 現在、年は文字列になっているため、
toInteger()
関数を使用して整数に変換する必要があります。 以上演算子 (>=) と以下演算子 (<=) を使って、年のリテラル値 1910 と 2000 に対する比較を行います。 これらの式を and (&&) 演算子を使用して結合します。 式は次のようになります。toInteger(year) >= 1910 && toInteger(year) <= 2000
コメディ映画を見つけるには、
rlike()
関数を使用して、ジャンル列でパターン 'Comedy' を検索します。rlike
式を年の比較と結合すると、次の式が得られます。toInteger(year) >= 1910 && toInteger(year) <= 2000 && rlike(genres, 'Comedy')
デバッグ クラスターがアクティブになっている場合は、 [更新] を選択して使用された入力と比較した式の出力を表示して、ロジックを確認できます。 データ フローの式言語を使用してこのロジックを実現する方法に対する正解は複数あります。
式の操作が完了したら、 [Save and Finish](保存して終了する) を選択します。
フィルターが正しく機能していることを確認するには、データ プレビューをフェッチします。
集計変換を追加する
次に追加する変換は、 [Schema modifier](スキーマ修飾子) の下にある [集計] 変換です。
集計変換に AggregateComedyRating という名前を付けます。 [グループ化] タブで、ドロップダウン ボックスから [year] を選択し、映画の公開年ごとに集計をグループ化します。
[集計] タブに移動します。左側のテキスト ボックスで、集計列に AverageComedyRating という名前を付けます。 式ビルダーを使用して集計式を入力するには、右側の式ボックスを選択します。
列 [Rating] の平均値を取得するには、
avg()
集計関数を使用します。 [Rating] は文字列で、avg()
で受け入れられるのは数値入力なので、toInteger()
関数を使用して値を数値に変換する必要があります。 式は次のようになります。avg(toInteger(Rating))
完了したら、 [Save and Finish](保存して終了する) を選択します。
変換出力を表示するには、 [Data Preview](データのプレビュー) タブに移動します。 year と AverageComedyRating の 2 つの列だけがあることに注目してください。
シンク変換を追加する
次に、 [Destination](変換先) の下で [シンク] 変換を追加します。
シンクに Sink という名前を付けます。 [新規] を選択して、シンク データセットを作成します。
[新しいデータセット] ページで [Azure Data Lake Storage Gen2] を選択し、 [続行] を選択します。
[形式の選択] ページで、 [DelimitedText] を選択してから、 [続行] を選択します。
シンク データセットに MoviesSink という名前を付けます。 リンクされたサービスでは、ソース変換用に作成したものと同じ ADLSGen2 のリンクされたサービスを選択します。 データの書き込み先となる出力フォルダーを入力します。 このチュートリアルでは、コンテナー sample-data 内のフォルダー output に書き込んでいます。 フォルダーは、事前に存在している必要はなく、動的に作成することができます。 [先頭の行を見出しとして使用] チェック ボックスを選択し、 [スキーマのインポート] で [なし] を選択します。 [OK] を選択します。
これで、データ フローの構築が完了しました。 これをパイプラインで実行する準備ができました。
データ フローの実行と監視
パイプラインを発行する前にデバッグすることができます。 この手順では、データ フロー パイプラインのデバッグ実行をトリガーします。 データのプレビューではデータが書き込まれませんが、デバッグ実行ではシンクの変換先にデータが書き込まれます。
パイプライン キャンバスに移動します。 [デバッグ] を選択して、デバッグ実行をトリガーします。
データ フロー アクティビティのパイプライン デバッグでは、アクティブなデバッグ クラスターが使用されますが、それでも初期化には少なくとも 1 分かかります。 進行状況は [出力] タブで追跡することができます。実行が正常に完了したら、眼鏡のアイコンを選択して実行の詳細を確認します。
詳細ページには、各変換手順で使用した行の数と所要時間が表示されます。
変換を選択すると、データの列とパーティション分割に関する詳細情報が表示されます。
このチュートリアルに正しく従った場合は、シンク フォルダーに 83 行と 2 列が書き込まれているはずです。 BLOB ストレージをチェックすることで、データが正しいことを確認できます。
まとめ
このチュートリアルでは、Data Factory UI を使用して、Data Factory マネージド仮想ネットワーク内でマッピング データ フローを使用して Data Lake Storage Gen2 ソースから Data Lake Storage Gen2 シンク (どちらも、選択したネットワークへの接続のみを許可) にデータをコピーして変換するパイプラインを作成しました。