演習 - マッピング データ フローを使用して、タイプ 1 の緩やかに変化するディメンションを設計して実装する

完了

この演習では、Azure Synapse 専用 SQL プールをソースとターゲットとして使用して、タイプ 1 の SCD のデータ フローを作成します。 このデータ フローを Synapse パイプラインに追加して、抽出、変換、読み込み (ETL) プロセスの一部として実行できます。

ソースとディメンション テーブルの設定

この演習では、Azure SQL、Azure ストレージなどのさまざまなシステムの種類のソース データから、Azure Synapse でディメンション テーブルを読み込む必要があります。この例では、Azure Synapse データベースにソース データを作成して、シンプルにします。

  1. Synapse Studio から、[データ] ハブに移動します。

    データ ハブ。

  2. [ワークスペース] タブ (1) を選択し、[データベース] を展開して、SQLPool01 (2) を右クリックします。 [New SQL script](新しい SQL スクリプト) (3) を選択し、[Empty script](空のスクリプト) (4) を選択します。

    データ ハブと、新しい SQL スクリプトを作成するためのコンテキスト メニューが表示されています。

  3. 次のスクリプトを空のスクリプト ウィンドウに貼り付け、[実行] または F5 キーを押してクエリを実行します。

    CREATE TABLE [dbo].[CustomerSource] (
        [CustomerID] [int] NOT NULL,
        [Title] [nvarchar](8),
        [FirstName] [nvarchar](50),
        [MiddleName] [nvarchar](50),
        [LastName] [nvarchar](50),
        [Suffix] [nvarchar](10),
        [CompanyName] [nvarchar](128),
        [SalesPerson] [nvarchar](256),
        [EmailAddress] [nvarchar](50),
        [Phone] [nvarchar](25)
    ) WITH ( HEAP )
    
    COPY INTO [dbo].[CustomerSource]
    FROM 'https://solliancepublicdata.blob.core.windows.net/dataengineering/dp-203/awdata/CustomerSource.csv'
    WITH (
        FILE_TYPE='CSV',
        FIELDTERMINATOR='|',
        FIELDQUOTE='',
        ROWTERMINATOR='0x0a',
        ENCODING = 'UTF16'
    )
    
    CREATE TABLE dbo.[DimCustomer](
        [CustomerID] [int] NOT NULL,
        [Title] [nvarchar](8) NULL,
        [FirstName] [nvarchar](50) NOT NULL,
        [MiddleName] [nvarchar](50) NULL,
        [LastName] [nvarchar](50) NOT NULL,
        [Suffix] [nvarchar](10) NULL,
        [CompanyName] [nvarchar](128) NULL,
        [SalesPerson] [nvarchar](256) NULL,
        [EmailAddress] [nvarchar](50) NULL,
        [Phone] [nvarchar](25) NULL,
        [InsertedDate] [datetime] NOT NULL,
        [ModifiedDate] [datetime] NOT NULL,
        [HashKey] [char](64)
    )
    WITH
    (
        DISTRIBUTION = REPLICATE,
        CLUSTERED COLUMNSTORE INDEX
    )
    

    スクリプトと [実行] ボタンの両方が強調表示されています。

マッピング データ フローを作成する

マッピング データ フローはパイプライン アクティビティで、コードを書かないデータ変換方法を指定する視覚的な方法を提供します。 次に、タイプ 1 の SCD を作成するマッピング データ フローを作成します。

  1. [開発] ハブに移動します。

    開発ハブ。

  2. [+] を選んでから、[データ フロー] を選びます。

    プラス ボタンと [データ フロー] メニュー項目が強調表示されています。

  3. 新しいデータ フローのプロパティ ペインで、[名前] フィールド (1) に「UpdateCustomerDimension」と入力し、[プロパティ] ボタン (2) を選択してプロパティ ペインを非表示にします。

    データ フローのプロパティ ペインが表示されています。

  4. キャンバス上の [ソースの追加] を選択します。

    データ フロー キャンバスの [ソースの追加] ボタンが強調表示されています。

  5. Source settings で、次のプロパティを構成します。

    • [出力ストリーム名]: 「SourceDB」を入力します
    • [ソースの種類]: Dataset を選択します
    • [オプション]: Allow schema drift をオンにし、他のオプションをオフのままにします
    • [サンプリング]: Disable を選択します
    • [データセット]: [+ 新規] を選択して、新しいデータセットを作成します

    [データセット] の横にある [新規] ボタンが強調表示されています。

  6. 新しい統合データセット ダイアログで、[Azure Synapse Analytics] を選択し、[続行] を選択します。

    Azure SQL Database と [続行] ボタンが強調表示されています。

  7. データセットのプロパティで、次のように構成します。

    • [名前]: 「CustomerSource」を入力します
    • [リンクされたサービス]: Synapse ワークスペースのリンクされたサービスを選択します
    • [テーブル名]: ドロップダウンの横にある [更新] ボタンを選択します

    フォームが説明に従って構成され、[更新] ボタンが強調表示されています。

  8. [値] フィールドにご自分の SQL プール名を入力し、[OK] を選択します。

    SQLPool01 パラメーターが強調表示されています。

  9. [テーブル名]dbo.CustomerSource を選択し、[スキーマのインポート]From connection/store を選択し、[OK] を選択して、データセットを作成します。

    フォームが説明に従って構成されています。

  10. 追加した CustomerSource データセットの横にある [開く] を選択します。

    新しいデータセットの横にある [開く] ボタンが強調表示されています。

  11. DBName の横にある [値] フィールドにご自分の SQL プール名を入力します。

  12. データ フロー エディターで、SourceDB アクティビティの下にある [ソースの追加] ボックスを選択します。 CustomerSource で使用したのと同じ手順に従って、このソースを DimCustomer テーブルとして構成します。

    • [出力ストリーム名]: 「DimCustomer」を入力します
    • [ソースの種類]: Dataset を選択します
    • [オプション]: Allow schema drift をオンにし、他のオプションをオフのままにします
    • [サンプリング]: Disable を選択します
    • [データセット]: [+ 新規] を選択して、新しいデータセットを作成します。 リンクされたサービスの Azure Synapse を使用して、DimCustomer テーブルを選択します。 ご自分の SQL プール名には必ず DBName を設定してください。

    [ソースの設定] で、ソースの追加、出力ストリーム名、データセット名が強調表示されています。

変換をデータ フローに追加する

  1. キャンバス上の SourceDB ソースの右側にある [+] を選んでから、[派生列] を選びます。

    プラス ボタンと [派生列] メニュー項目が強調表示されています。

  2. Derived column's settings で、次のプロパティを構成します。

    • [出力ストリーム名]: 「CreateCustomerHash」を入力します
    • [Incoming stream](受信ストリーム): SourceDB を選択します
    • [列]: 次のように入力します。
    説明
    HashKey」を入力 sha2(256, iifNull(Title,'') +FirstName +iifNull(MiddleName,'') +LastName +iifNull(Suffix,'') +iifNull(CompanyName,'') +iifNull(SalesPerson,'') +iifNull(EmailAddress,'') +iifNull(Phone,'')) テーブル値の SHA256 ハッシュを作成します。 これを使用して、入力レコードのハッシュとターゲット レコードのハッシュ値を比較し、CustomerID 値で照合することで、行の変更を検出します。 iifNull 関数を使用して、null 値を空の文字列に置き換えます。 それ以外の場合、null エントリが存在すると、ハッシュ値が重複する傾向があります。

    派生列の設定フォームが説明に従って構成されています。

  3. キャンバス上の CreateCustomerHash 派生列の右側にある [+] を選んでから、[存在する] を選びます。

    プラス ボタンと [存在する] メニュー項目の両方が強調表示されています。

  4. Exists settings で、次のプロパティを構成します。

    • [出力ストリーム名]: 「Exists」を入力します
    • [Left stream](左側のストリーム): CreateCustomerHash を選択します
    • [Right stream](右側のストリーム): SynapseDimCustomer を選択します
    • [Exist type](存在タイプ): Doesn't exist を選択します
    • [Exists conditions](存在条件): 左と右に次のように設定します。
    左: CreateCustomerHash の列 右: SynapseDimCustomer の列
    HashKey HashKey

    [exists 設定] フォームが説明に従って構成されています。

  5. キャンバス上の Exists の右側にある [+] を選んでから、[検索] を選びます。

    プラス ボタンと [検索] メニュー項目の両方が強調表示されています。

  6. Lookup settings で、次のプロパティを構成します。

    • [出力ストリーム名]: 「LookupCustomerID」を入力します
    • [Primary stream](プライマリ ストリーム): Exists を選択します
    • [Lookup stream](検索ストリーム): SynapseDimCustomer を選択します
    • [Match multiple rows](複数の行の一致): オフ
    • [Match on](一致対象): Any row を選択します
    • [Lookup conditions](検索条件): 左と右に次のように設定します。
    左: Exists の列 右: SynapseDimCustomer の列
    CustomerID CustomerID

    検索設定フォームが説明に従って構成されています。

  7. キャンバス上の LookupCustomerID の右側にある [+] を選んでから、[派生列] を選択します。

    プラス ボタンと [派生列] メニュー項目の両方が強調表示されています。

  8. Derived column's settings で、次のプロパティを構成します。

    • [出力ストリーム名]: 「SetDates」を入力します
    • [Incoming stream](受信ストリーム): LookupCustomerID を選択します
    • [列]: 次のように入力します。
    説明
    [InsertedDate] を選択します iif(isNull(InsertedDate), currentTimestamp(), {InsertedDate}) InsertedDate 値が null の場合、現在のタイムスタンプを挿入します。 それ以外の場合は、InsertedDate 値を使用します。
    [ModifiedDate] を選択します currentTimestamp() 常に ModifiedDate 値を現在のタイムスタンプで値を更新します。

    別の派生列の設定フォームが説明に従って構成されています。

    注意

    2 番目の列を挿入するには、[列] リストの上にある [+ 追加] を選択し、[列の追加] を選択します。

  9. キャンバス上の SetDates 派生列の右側にある [+] を選んでから、[行の変更] を選びます。

    プラス ボタンと行の変更メニュー項目の両方が強調表示されています。

  10. Alter row settings で、次のプロパティを構成します。

    • [出力ストリーム名]: 「AllowUpserts」を入力します
    • [Incoming stream](受信ストリーム): SetDates を選択します
    • [Alter row conditions](行の変更条件): 次のように入力します。
    条件 説明
    [Upsert if] を選択します true() Upsert if 条件で、条件を true() に設定して、アップサートを許可します。 これにより、マッピング データ フローのステップを通過するすべてのデータが確実にシンクに挿入または更新されるようになります。

    [行の変更の設定] フォームが説明に従って構成されています。

  11. キャンバス上の AllowUpserts 行の変更ステップの右側にある [+] を選んでから、[シンク] を選びます。

    プラス ボタンと [シンク] メニュー項目の両方が強調表示されています。

  12. Sink で、次のプロパティを構成します。

    • [出力ストリーム名]: 「Sink」を入力します
    • [Incoming stream](受信ストリーム): AllowUpserts を選択します
    • [Sink type](シンクの種類): Dataset を選択します
    • [データセット]: DimCustomer を選択します
    • [オプション]: Allow schema drift をオン、Validate schema をオフにします

    シンクのプロパティ フォームが説明に従って構成されています。

  13. [設定] タブを選択し、次のプロパティを構成します。

    • [更新方法]: Allow upsert をオン、他のすべてのオプションをオフにします
    • [キー列]: List of columns を選択し、リスト内の CustomerID を選択します
    • [Table action](テーブル アクション): None を選択します
    • [Enable staging](ステージングを有効にする): オフ

    シンク設定が説明に従って構成されています。

  14. [マッピング] タブを選択し、[Auto mapping](自動マッピング) をオフにします。 次に示すように、入力列のマッピングを構成します。

    入力列 出力列
    SourceDB@CustomerID CustomerID
    SourceDB@Title Title
    SourceDB@FirstName FirstName
    SourceDB@MiddleName MiddleName
    SourceDB@LastName LastName
    SourceDB@Suffix Suffix
    SourceDB@CompanyName CompanyName
    SourceDB@SalesPerson SalesPerson
    SourceDB@EmailAddress EmailAddress
    SourceDB@Phone Phone
    InsertedDate InsertedDate
    ModifiedDate ModifiedDate
    CreateCustomerHash@HashKey HashKey

    [マッピング] 設定が説明に従って構成されています。

  15. 完成したマッピング フローは次のようになります。 [すべて公開] を選択して、変更を保存します。

    完成したデータ フローが表示され、[すべて公開] が強調表示されています。

  16. [発行] を選びます。

    [公開] ボタンが強調表示されています。

データ フローをテストするには

タイプ 1 の SCD データ フローが完成しました。 これのテストを選択した場合は、このデータ フローを Synapse 統合パイプラインに追加できます。 次に、パイプラインを 1 回実行して、顧客ソース データをターゲットの DimCustomer に初期読み込みすることができます。

パイプラインを追加実行するたびに、ソース テーブル内のデータとディメンション テーブル内に既に存在するデータが (HashKey を使用して) 比較され、変更のあったレコードのみが更新されます。 これをテストするには、ソース テーブル内のレコードを更新した後、パイプラインを再度実行して、ディメンション テーブル内のレコードが更新されることを確認します。

顧客の Janet Gates を例にとります。 初期読み込みで、LastName が Gates、CustomerId が 4 であることが示されます。

スクリプトと、最初の顧客レコードが表示されています。

次に、ソース テーブル内の顧客の姓を更新するステートメントの例を示します。

UPDATE [dbo].[CustomerSource]
SET LastName = 'Lopez'
WHERE [CustomerId] = 4

レコードを更新し、パイプラインを再度実行すると、DimCustomer で、この更新されたデータが表示されます。

スクリプトと、更新された顧客レコードが表示されています。

顧客レコードの LastName 値がソース レコードと一致するように正常に更新され、ModifiedDate も古い LastName 値を追跡せずに更新されました。 これは、タイプ 1 の SCD の予想される動作です。 LastName フィールドに履歴が必要な場合は、テーブルとデータ フローを、これまでに説明した他の SCD タイプのいずれかに変更します。