データベースから制御テーブルを使用して差分コピーを行う
適用対象: Azure Data Factory Azure Synapse Analytics
ヒント
企業向けのオールインワン分析ソリューション、Microsoft Fabric の Data Factory をお試しください。 Microsoft Fabric は、データ移動からデータ サイエンス、リアルタイム分析、ビジネス インテリジェンス、レポートまで、あらゆるものをカバーしています。 無料で新しい試用版を開始する方法について説明します。
この記事では、高基準値を格納する外部制御テーブルを使用してデータベース テーブルから Azure に新規行や更新行を増分読み込みする際に使用できるテンプレートについて説明します。
このテンプレートでは、新規行や更新行を識別するため、ソース データベースのスキーマにタイムスタンプ列か増分キーが含まれている必要があります。
Note
新規行や更新行を識別するタイムスタンプ列がソース データベースに含まれているものの、差分コピーで使用する外部制御テーブルを作成したくない場合は、代わりに Azure Data Factory データ コピー ツールを使用してパイプラインを取得できます。 このツールは、トリガーでスケジュール設定される時間を変数として使用して、ソース データベースから新規行を読み取ります。
このソリューション テンプレートについて
このテンプレートは、まず古い基準値を取得し、その値を現在の基準値と比較します。 その後、2 つの基準値の比較に基づいて、変更箇所のみをソース データベースからコピーします。 最後に、次に差分データを読み込むときのために、新しい高基準値を外部制御テーブルに格納します。
このテンプレートには、4 つのアクティビティが含まれています。
- Lookup が、外部制御テーブルに格納されている古い高基準値を取得します。
- 別の Lookup アクティビティが、ソース データベースから現在の高基準値を取得します。
- Copy が、変更箇所のみをソース データベースからコピー先ストアにコピーします。 ソース データベースの変更箇所を識別するクエリは、SELECT * FROM Data_Source_Table WHERE TIMESTAMP_Column > "last high-watermark" and TIMESTAMP_Column <= "current high-watermark" のようになります。
- SqlServerStoredProcedure が、次回の差分コピーのために現在の高基準値を外部制御テーブルに書き込みます。
このテンプレートでは、次のパラメーターを定義します。
- Data_Source_Table_Name は、データの読み込み元になるソース データベース内のテーブルです。
- Data_Source_WaterMarkColumn は、新規行や更新行を識別するために使用されるソース テーブル内の列の名前です。 通常、この列の型は datetime や INT などです。
- Data_Destination_Container は、コピー先ストア内でデータがコピーされる場所のルート パスです。
- Data_Destination_Directory は、コピー先ストア内でデータがコピーされる場所のルートの下のディレクトリ パスです。
- Data_Destination_Table_Name は、コピー先ストア内でデータがコピーされる場所です (データのコピー先として "Azure Synapse Analytics" が選択されている場合に適用されます)。
- Data_Destination_Folder_Path は、コピー先ストア内でデータがコピーされる場所です (データのコピー先として "ファイル システム" または "Azure Data Lake Storage Gen1" が選択されている場合に適用されます)。
- Control_Table_Table_Name は、高基準値を格納する外部制御テーブルです。
- Control_Table_Column_Name は、高基準値を格納する外部制御テーブル内の列です。
このソリューション テンプレートの使用方法
読み込むソース テーブルを調べて、新規行や更新行を識別するために使用できる高基準値列を定義します。 この列の型には datetime や INT などがあります。 この列の値は、新規行が追加されると増加します。 次のサンプル ソース テーブル (data_source_table) では、高基準値列として LastModifytime 列を使用できます。
PersonID Name LastModifytime 1 aaaa 2017-09-01 00:56:00.000 2 bbbb 2017-09-02 05:23:00.000 3 cccc 2017-09-03 02:36:00.000 4 dddd 2017-09-04 03:21:00.000 5 eeee 2017-09-05 08:06:00.000 6 fffffff 2017-09-06 02:23:00.000 7 gggg 2017-09-07 09:01:00.000 8 hhhh 2017-09-08 09:01:00.000 9 iiiiiiiii 2017-09-09 09:01:00.000
SQL Server か Azure SQL Database に、差分データの読み込みに使用される高基準値を格納する制御テーブルを作成します。 次の例では、制御テーブルの名前は watermarktable です。 このテーブルにある WatermarkValue が高基準値を格納する列で、その型は datetime です。
create table watermarktable ( WatermarkValue datetime, ); INSERT INTO watermarktable VALUES ('1/1/2010 12:00:00 AM')
制御テーブルを作成するために使用したのと同じ SQL Server か Azure SQL Database インスタンスに、ストアド プロシージャを作成します。 このストアド プロシージャは、次回差分データを読み込むのに必要な新しい高基準値を外部制御テーブルに書き込むために使用されます。
CREATE PROCEDURE update_watermark @LastModifiedtime datetime AS BEGIN UPDATE watermarktable SET [WatermarkValue] = @LastModifiedtime END
[データベースからの差分コピー] テンプレートに移動します。 データのコピー元となるソース データベースへの新しい接続を作成します。
データのコピー先となるコピー先データ ストアへの新しい接続を作成します。
手順 2 と 3 で作成した外部制御テーブルとストアド プロシージャへの新しい接続を作成します。
[このテンプレートを使用] を選択します。
次の例に示すように、使用可能なパイプラインが表示されます。
[ストアド プロシージャ] を選択します。 [ストアド プロシージャ名] に [dbo].[update_watermark] を選択します。 [Import parameter (インポート パラメーター)] を選択し、 [動的なコンテンツの追加] を選択します。
@{activity('LookupCurrentWaterMark').output.firstRow.NewWatermarkValue} という内容を書き込んで、[完了] を選択します。
[デバッグ] を選択し、 [パラメーター] で入力し、 [完了] を選択します。
次の例に示すような結果が表示されます。
ソース テーブル内に新しい行を作成できます。 新規行を作成する SQL 言語の例を次に示します。
INSERT INTO data_source_table VALUES (10, 'newdata','9/10/2017 2:23:00 AM') INSERT INTO data_source_table VALUES (11, 'newdata','9/11/2017 9:01:00 AM')
パイプラインをもう一度実行するには、 [デバッグ] を選択し、 [パラメーター] に入力して、 [完了] を選択します。
宛先に新規行のみがコピーされたことがわかります。
(省略可能:)データの宛先として Azure Synapse Analytics を選択した場合、ステージング用に Azure BLOB ストレージへの接続も指定する必要があります。これは Azure Synapse Analytics の Polybase に必要です。 テンプレートによって、コンテナーのパスが生成されます。 パイプラインの実行後、コンテナーが BLOB ストレージに作成されているかどうかを確認します。