次の方法で共有


Data Warehouse からレイクハウスにデータを増分読み込みする

このチュートリアルでは、Data Warehouse からレイクハウスにデータを増分読み込む方法について説明します。

概要

ソリューションの概略図を次に示します。

データ ロジックの増分読み込みを示す図。

このソリューションを作成するための重要な手順を次に示します。

  1. 基準値列を選択する。 ソース データ テーブルのいずれか 1 つの列を選択します。実行ごとに新しいレコードまたは更新されたレコードを切り分ける目的で使用されます。 通常、行が作成または更新されたときに常にデータが増える列を選択します (last_modify_time、ID など)。 この列の最大値が基準値として使用されます。

  2. Data Warehouse に最後のウォーターマークの値を格納するテーブルを準備します

  3. 次のワークフローを含んだパイプラインを作成する

    このソリューションのパイプラインには、次のアクティビティがあります。

    • 2 つのルックアップ アクティビティを作成する。 1 つ目のルックアップ アクティビティは、前回の基準値を取得するために使用します。 2 つ目のルックアップ アクティビティは、新しい基準値を取得するために使用します。 これらの基準値は、コピー アクティビティに渡されます。
    • ウォーターマーク列の値が古いウォーターマーク値より大きく、新しいウォーターマーク値より小さいソース データ テーブルから行をコピーするコピーアクティビティを作成します。 次に、Data Warehouse からレイクハウスに新しいファイルとしてデータをコピーします。
    • ストアド プロシージャ アクティビティを作成する。これはパイプラインの次回実行に備えて基準値を更新するためのアクティビティです。

前提条件

  • Data Warehouse。 Data Warehouse をソース データ ストアとして使用します。 ない場合には、「Data Warehouse の作成」で作成する手順を参照してください。
  • レイクハウス。 レイクハウスをコピー先データ ストアとして使用します。 ない場合には、「レイクハウスの作成」で作成する手順を参照してください。 コピーしたデータを格納する IncrementalCopy という名前のフォルダーを作成します。

ソースの準備

増分コピー パイプラインを構成する前に、ソース Data Warehouse で準備する必要があるテーブルとストアド プロシージャをいくつか次に示します。

1. Data Warehouse にデータ ソース テーブルを作成する

Data Warehouse で次の SQL コマンドを実行し、データ ソース テーブルとして data_source_table という名前のテーブルを作成します。 このチュートリアルでは、増分コピーを実行するためのサンプル データとして使用します。

create table data_source_table
(
    PersonID int,
    Name varchar(255),
    LastModifytime DATETIME2(6)
);

INSERT INTO data_source_table
    (PersonID, Name, LastModifytime)
VALUES
    (1, 'aaaa','9/1/2017 12:56:00 AM'),
    (2, 'bbbb','9/2/2017 5:23:00 AM'),
    (3, 'cccc','9/3/2017 2:36:00 AM'),
    (4, 'dddd','9/4/2017 3:21:00 AM'),
    (5, 'eeee','9/5/2017 8:06:00 AM');

データ ソース テーブル内のデータを以下に示します:

PersonID | Name | LastModifytime
-------- | ---- | --------------
1        | aaaa | 2017-09-01 00:56:00.000
2        | bbbb | 2017-09-02 05:23:00.000
3        | cccc | 2017-09-03 02:36:00.000
4        | dddd | 2017-09-04 03:21:00.000
5        | eeee | 2017-09-05 08:06:00.000

このチュートリアルでは、LastModifytime を基準値列として使用します。

2. Data Warehouse に最後のウォーターマークの値を格納するテーブルを準備する

  1. Data Warehouse で次の SQL コマンドを実行し、watermarktable という名前のテーブルを作成し、最後のウォーターマーク値を格納します:

    create table watermarktable
    (
    TableName varchar(255),
    WatermarkValue DATETIME2(6),
    );
    
  2. 最後のウォーターマークの既定値をソース データ テーブルのテーブル名で設定します。 このチュートリアルでは、テーブル名は data_source_table、既定値は 1/1/2010 12:00:00 AM です。

    INSERT INTO watermarktable
    VALUES ('data_source_table','1/1/2010 12:00:00 AM')    
    
  3. テーブルの watermarktable のデータを確認します。

    Select * from watermarktable
    

    出力:

    TableName  | WatermarkValue
    ----------  | --------------
    data_source_table | 2010-01-01 00:00:00.000
    

3. Data Warehouse にストアド プロシージャを作成する

次のコマンドを実行して、Data Warehouse にストアド プロシージャを作成します。 このストアド プロシージャは、最後のパイプライン実行後に最後の基準値を更新するために使用されます。

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

増分コピー用のパイプラインを構成する

手順 1: パイプラインを作成する

  1. Power BI に移動します。

  2. 画面の左下にある Power BI アイコンを選択し、[データ ファクトリ] を選択して Data Factory のホームページを開きます。

  3. Microsoft Fabric ワークスペースに移動します。

  4. [データ パイプライン] を選択し、パイプライン名を入力して新しいパイプラインを作成します。

    新たに作成されたワークスペースの新しいデータ パイプラインのボタンを示すスクリーンショット。

    新しいパイプラインを作成する際の名前を示すスクリーンショット。

手順 2: 最後のウォーターマークの検索回数アクティビティを追加する

この手順では、最後の基準値を取得する検索回数アクティビティを作成します。 前に設定された既定値 1/1/2010 12:00:00 AM が取得されます。

  1. [パイプライン アクティビティの追加] を選択し、ドロップダウン リストから [参照] を選択します。

  2. [全般] タブで、このアクティビティの名前を LookupOldWaterMarkActivity に変更します。

  3. [設定] タブで、次の構成を実行します:

    • [データ ストアの種類]: [ワークスペース] を選択します。
    • [ワークスペース データの種類]: [Data Warehouse] を選択します。
    • [Data Warehouse]: [Data Warehouse] を選択します。
    • [クエリの使用]: [テーブル] を選択します。
    • [テーブル]: [dbo.watermarktable] を選択します。
    • [最初の行のみ]: 選択済みです。

    古いウォーターマークの検索回数を示すスクリーンショット。

手順 3: 新しいウォーターマークの検索回数アクティビティを追加する

この手順では、新しい基準値を取得する検索回数アクティビティを作成します。 クエリを使用して、ソース データ テーブルから新しいウォーターマークを取得します。 data_source_tableLastModifytime 列の最大値が取得されます。

  1. 上部のバーで、[アクティビティ] タブの [検索回数] を選択して、2 番目の検索回数アクティビティを追加します。

  2. [全般] タブで、このアクティビティの名前を LookupOldWaterMarkActivity に変更します。

  3. [設定] タブで、次の構成を実行します:

    • [データ ストアの種類]: [ワークスペース] を選択します。

    • [ワークスペース データの種類]: [Data Warehouse] を選択します。

    • [Data Warehouse]: [Data Warehouse] を選択します。

    • [クエリの使用]: [クエリ] を選択します。

    • [クエリ]: 次のクエリを入力して、最後に変更された最大時間を新しい基準値として選択します:

      select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
      
    • [最初の行のみ]: 選択済みです。

    新しいウォーターマークの検索回数を示すスクリーンショット。

手順 4: 増分データをコピーするコピー アクティビティを追加する

この手順では、コピー アクティビティを追加して、最後のウォーターマークと新しいウォーターマークの間の増分データを Data Warehouse からレイクハウスにコピーします。

  1. 上部のバーで [アクティビティ] を選択し、[データのコピー] ->[キャンバスに追加] を選択してコピー アクティビティを取得します。

  2. [全般] タブで、このアクティビティの名前を IncrementalCopyActivity に変更します。

  3. 2 つの検索回数アクティビティにコピー アクティビティに接続します。これには、検索回数アクティビティにアタッチされている緑のボタン (成功時) をコピー アクティビティにドラッグします。 コピー アクティビティの境界線の色が緑に変わったら、マウス ボタンを離します。

    検索回数アクティビティとコピー アクティビティの接続を示すスクリーンショット。

  4. [ソース] タブで、次の構成を実行します:

    • [データ ストアの種類]: [ワークスペース] を選択します。

    • [ワークスペース データの種類]: [Data Warehouse] を選択します。

    • [Data Warehouse]: [Data Warehouse] を選択します。

    • [クエリの使用]: [クエリ] を選択します。

    • クエリ: 次のクエリを入力して、最後のウォーターマークと新しいウォーターマークの間に増分データをコピーします。

      select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
      

    ソース構成のコピーを示すスクリーンショット。

  5. [コピー先] タブで、次の構成を実行します:

    • [データ ストアの種類]: [ワークスペース] を選択します。
    • [ワークスペース データ ストアの種類]: [レイクハウス] を選択します。
    • [レイクハウス]: [レイクハウス] を選択します。
    • [ルート フォルダー]: [ファイル] を選択します。
    • [ファイル パス]: コピーしたデータを格納するフォルダーを指定します。 [参照] を選択してフォルダーを選択します。 ファイル名として、[動的コンテンツの追加] を開き、開いたウィンドウに「@CONCAT('Incremental-', pipeline().RunId, '.txt')」と入力して、レイクハウスでコピーしたデータ ファイルのファイル名を作成します。
    • [ファイル形式]: データの形式の種類を選択します。

    コピー先構成のコピーを示すスクリーンショット。

手順 5: ストアド プロシージャ アクティビティを追加する

この手順では、ストアド プロシージャ アクティビティを追加して、次のパイプライン実行の最後のウォーターマーク値を更新します。

  1. 上部のバーで [アクティビティ] を選択し、[ストアド プロシージャ] を選択してストアド プロシージャ アクティビティを追加します。

  2. [全般] タブで、このアクティビティの名前を StoredProceduretoWriteWatermarkActivity に変更します。

  3. コピー アクティビティの緑の (成功時) 出力をストアド プロシージャ アクティビティに接続します。

  4. [設定] タブで、次の構成を実行します:

    • [データ ストアの種類]: [ワークスペース] を選択します。

    • [Data Warehouse]: [Data Warehouse] を選択します。

    • [ストアド プロシージャ名]: Data Warehouse で作成したストアド プロシージャを指定します: [dbo].[usp_write_watermark]

    • ストアド プロシージャ パラメータを展開します。 ストアド プロシージャのパラメーターの値を指定するには、[インポート] を選択し、各パラメーターに次の値を入力します:

      名前 Type
      LastModifiedtime DateTime @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}
      TableName String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}

    ストアド プロシージャ アクティビティの構成を示すスクリーンショット。

手順 6:パイプラインを実行し、結果を監視する

上部のバーで、[ホーム] タブの [実行] を選択します。次に、[保存して実行] を選択します。 パイプライン実行が開始され、[出力] タブでパイプラインを監視できます。

パイプライン実行の結果を示すスクリーンショット。

レイクハウスに移動すると、指定したフォルダーの下にデータ ファイルが見つかります。ファイルを選択して、コピーしたデータをプレビューできます。

最初のパイプライン実行のレイクハウス データを示すスクリーンショット。

最初のパイプライン実行のレイクハウス データ プレビューを示すスクリーンショット。

さらにデータを追加して増分コピーの結果を表示する

最初のパイプラインの実行が完了したら、Data Warehouse ソース テーブルにさらにデータを追加して、このパイプラインで増分データをコピーできるかどうかを確認してみましょう。

手順 1: ソースにデータを追加する

次のクエリを実行して、Data Warehouse に新しいデータを挿入します:

INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')

INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')

data_source_table の更新されたデータは次のとおりです:

PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000

手順 2: 別のパイプライン実行をトリガーし、結果を監視する

パイプライン ページに戻ります。 上部のバーで、[ホーム] タブの [実行] をもう一度選択します。 パイプライン実行が開始され、[出力] タブでパイプラインを監視できます。

レイクハウスに移動すると、新しいコピーされたデータ ファイルが指定したフォルダーの下にあります。そのファイルを選択してコピーされたデータをプレビューすることができます。 このファイルに増分データが表示されます。

2 番目のパイプライン実行のレイクハウス データを示すスクリーンショット。

2 番目のパイプライン実行のレイクハウス データ プレビューを示すスクリーンショット。

次に、Azure Blob Storage からレイクハウスへのコピーの詳細に進みます。