次の方法で共有


マッピング データ フローを使用して Delta Lake のデータを変換する

適用対象: Azure Data Factory Azure Synapse Analytics

ヒント

企業向けのオールインワン分析ソリューション、Microsoft Fabric の Data Factory をお試しください。 Microsoft Fabric は、データ移動からデータ サイエンス、リアルタイム分析、ビジネス インテリジェンス、レポートまで、あらゆるものをカバーしています。 無料で新しい試用版を開始する方法について説明します。

Azure Data Factory を初めて使用する場合は、「Azure Data Factory の概要」を参照してください。

このチュートリアルでは、Azure Data Lake Storage (ADLS) Gen2 内のデータを分析して変換し、Delta Lake に格納することができるデータ フローを、データ フロー キャンバスを使用して作成します。

前提条件

  • Azure サブスクリプション。 Azure サブスクリプションをお持ちでない場合は、開始する前に無料の Azure アカウントを作成してください。
  • Azure ストレージ アカウント。 ADLS ストレージを、ソースシンクのデータ ストアとして使用します。 ストレージ アカウントがない場合の作成手順については、Azure のストレージ アカウントの作成に関するページを参照してください。

このチュートリアルで変換するファイルは MoviesDB.csv です (こちらで入手できます)。 GitHub からファイルを取得するには、コンテンツを任意のテキスト エディターにコピーして、.csv ファイルとしてローカルに保存します。 ファイルをご自分のストレージ アカウントにアップロードするには、Azure portal を使用した BLOB のアップロードに関するページを参照してください。 各例では、'sample-data' という名前のコンテナーを参照しています。

Data Factory の作成

この手順では、データ ファクトリを作成し、Data Factory UX を開いて、データ ファクトリにパイプラインを作成します。

  1. Microsoft Edge または Google Chrome を開きます。 現在、Data Factory の UI がサポートされる Web ブラウザーは Microsoft Edge と Google Chrome だけです。

  2. 左側のメニューで、 [リソースの作成]>[統合]>[Data Factory] を選択します。

  3. [新しいデータ ファクトリ] ページで、 [名前] に「ADFTutorialDataFactory」と入力します。

  4. データ ファクトリを作成する Azure サブスクリプションを選択します。

  5. [リソース グループ] で、次の手順のいずれかを行います。

    a. [Use existing (既存のものを使用)] を選択し、ドロップダウン リストから既存のリソース グループを選択します。

    b. [新規作成] を選択し、リソース グループの名前を入力します。

    リソース グループの詳細については、リソース グループを使用した Azure のリソースの管理に関するページを参照してください。

  6. [バージョン] で、 [V2] を選択します。

  7. [場所] で、データ ファクトリの場所を選択します。 サポートされている場所のみがドロップダウン リストに表示されます。 データ ファクトリによって使用されるデータ ストア (Azure Storage、SQL Database など) やコンピューティング (Azure HDInsight など) は、他のリージョンに存在していてもかまいません。

  8. [作成] を選択します

  9. 作成が完了すると、その旨が通知センターに表示されます。 [リソースに移動] を選択して、Data factory ページに移動します。

  10. [Author & Monitor]\(作成と監視\) を選択して、別のタブで Data Factory (UI) を起動します。

データ フロー アクティビティが含まれるパイプラインの作成

この手順では、データ フロー アクティビティが含まれるパイプラインを作成します。

  1. ホーム ページで [調整] を選択します。

    ADF のホーム ページを示すスクリーンショット。

  2. パイプラインの [全般] タブで、パイプラインの名前として「DeltaLake」と入力します。

  3. [アクティビティ] ウィンドウで、 [移動と変換] アコーディオンを展開します。 ウィンドウから Data Flow アクティビティをパイプライン キャンバスにドラッグ アンド ドロップします。

    Data Flow アクティビティをドロップできるパイプライン キャンバスを示すスクリーンショット。

  4. [Data Flow の追加] ポップアップで、 [新しい Data Flow の作成] を選択し、データ フローに DeltaLake という名前を付けます。 完了したら [完了] を選択します。

    新しいデータ フローを作成するときにデータ フローの名前を指定する場所を示すスクリーンショット。

  5. パイプライン キャンバスの上部のバーで、 [Data Flow のデバッグ] スライダーをオンにスライドします。 デバッグ モードを使用すると、ライブ Spark クラスターに対する変換ロジックの対話型テストが可能になります。 Data Flow クラスターのウォームアップには 5 から 7 分かかるため、ユーザーが Data Flow の開発を計画している場合は、最初にデバッグを有効にすることをお勧めします。 詳細については、デバッグ モードに関するページを参照してください。

    [データ フローのデバッグ] のスライダーを示すスクリーンショット。

データ フロー キャンバスでの変換ロジックの作成

このチュートリアルでは、2 つのデータ フローを生成します。 最初のデータ フローは、映画の CSV ファイルから新しい Delta Lake を生成するためにシンクする単純なソースです。 最後に、Delta Lake の更新データに追従するフロー デザインを作成します。

最終的なフロー

チュートリアルの目標

  1. 前提条件の MoviesCSV データセット ソースを使用し、そこから新しい Delta Lake を形成します。
  2. 1988 年の映画の評価を '1' に更新するためのロジックを作成します。
  3. 1950 年のすべてのムービーを削除します。
  4. 1960 年のムービーを複製して 2021 年用の新しいムービーを挿入します。

空のデータ フロー キャンバスから開始する

  1. データ フロー エディター ウィンドウの上部にある [ソース変換] を選択し、次に、[ソース設定] ウィンドウの [データ セット] プロパティの横にある [+ 新規] を選択します。

    新しいソース データセットをデータ フローに追加する場所を示すスクリーンショット。

  2. 表示された [新しいデータセット] ウィンドウから [Azure Data Lake Storage Gen2] を選択し、次に [続行]を選択します。

    [新しいデータセット] ウィンドウで [Azure Data Lake Storage Gen2] を選択する場所を示すスクリーンショット。

  3. データセットの種類には [DelimitedText] を選択し、もう一度 [続行] を選択します。

    データセットの形式を選択する場所を示すスクリーンショット。

  4. データセットに 「MoviesCSV」 という名前を付け、 [リンク サービス][+ 新規] を選択して、新しくファイルにリンク サービスを作成します。

  5. 「前提条件」セクションで先に作成したストレージ アカウントの詳細を入力し、そこでアップロードした MoviesCSV ファイルを参照し選択します。

  6. リンク サービスを追加した後、[第一行をヘッダーとして選択] チェック ボックスをオンにし、[OK] を選択してソースを追加します。

  7. データ フロー設定ウィンドウの [プロジェクション] タブに移動し、[データ型の検出] を選択します。

  8. 次に、データ フロー エディター ウィンドウのソースの後にある + を選択し、下にスクロールして、[宛先] セクションの下にある [シンク] を選択し、新しいシンクをデータ フローに追加します。

    データ フローのシンク変換先を追加する場所を示すスクリーンショット。

  9. シンクの追加後に表示されるシンク設定の [シンク] タブで、[シンクの種類][インライン]を選択し、次に [インライン データセットの種類] では [Delta] を選択します。 次に、[リンク サービス] の [Azure Data Lake Storage Gen2] を選択します。

    インライン Delta データセットのシンクの詳細を示すスクリーンショット。

  10. サービスに Delta Lake を作成させたいストレージ コンテナー内のフォルダー名を選択します。

  11. 最後に、パイプライン デザイナーに戻り、[デバッグ] をクリックして、キャンバス上のこのデータ フロー アクティビティだけを使用し、デバッグ モードでパイプラインを実行します。 これにより、Azure Data Lake Storage Gen2 の中に新しい Delta Lake が生成されます。

  12. 次に、画面の左側にある [ファクトリ リソース] メニューから + を選択して新しいリソースを追加した後で、[データ フロー] を選択します。

    データ ファクトリで新しいデータ フローを作成する場所を示すスクリーンショット。

  13. 前と同様に、ソースとして MoviesCSV ファイルを再度選択し、[プロジェクション] タブから [データ型の検出] を選択します。

  14. ここでは、ソースを作成した後に、データ フロー エディター ウィンドウで + を選択し、ソースにフィルター変換を追加します。

    データ フローにフィルター条件を追加する場所を示すスクリーンショット。

  15. 1950、1960、1988 に一致する映画行のみを許可する [フィルター 設定] ウィンドウに、[フィルター オン] 条件を追加します。

    データセットの [年] 列でフィルターを追加する場所を示すスクリーンショット。

  16. 次に、[派生列] 変換を追加して、1988 年の各映画の評価を '1' に更新します。

    派生列の式を入力する場所を示すスクリーンショット。

  17. Update, insert, delete, and upsert ポリシーは、行の変更変換で作成されます。 派生列の後に、行の変更変換を追加します。

  18. 変更行ポリシーは、次のようになります。

    行の変更

  19. 変更行の種類ごとに適切なポリシーを設定したので、シンク変換に適切な更新ルールが設定されたことを確認します

    Sink

  20. ここでは、Azure Data Lake Storage Gen2 データ レイクへの Delta Lake シンクを使用し、さらに挿入、更新、削除を許可します。

  21. キー列は、映画の主キー列と年列で構成される複合キーであることに、注意してください。 これは、1960 年行を複製して仮の 2021 年ムービーを作成したためです。 こうして一意性を提供することによって、既存の行を検索するときの競合が回避されます。

完成したサンプルのダウンロード

レイクで行を更新または削除するためのデータ フローを使用する、Delta パイプラインのサンプル ソリューションをこちらで確認できます。

データ フローの式言語の詳細を確認します。