演習 - マッピング データ フローを使用して、タイプ 1 の緩やかに変化するディメンションを設計して実装する
この演習では、Azure Synapse 専用 SQL プールをソースとターゲットとして使用して、タイプ 1 の SCD のデータ フローを作成します。 このデータ フローを Synapse パイプラインに追加して、抽出、変換、読み込み (ETL) プロセスの一部として実行できます。
ソースとディメンション テーブルの設定
この演習では、Azure SQL、Azure ストレージなどのさまざまなシステムの種類のソース データから、Azure Synapse でディメンション テーブルを読み込む必要があります。この例では、Azure Synapse データベースにソース データを作成して、シンプルにします。
Synapse Studio から、[データ] ハブに移動します。
[ワークスペース] タブ (1) を選択し、[データベース] を展開して、SQLPool01 (2) を右クリックします。 [New SQL script](新しい SQL スクリプト) (3) を選択し、[Empty script](空のスクリプト) (4) を選択します。
次のスクリプトを空のスクリプト ウィンドウに貼り付け、[実行] または
F5
キーを押してクエリを実行します。CREATE TABLE [dbo].[CustomerSource] ( [CustomerID] [int] NOT NULL, [Title] [nvarchar](8), [FirstName] [nvarchar](50), [MiddleName] [nvarchar](50), [LastName] [nvarchar](50), [Suffix] [nvarchar](10), [CompanyName] [nvarchar](128), [SalesPerson] [nvarchar](256), [EmailAddress] [nvarchar](50), [Phone] [nvarchar](25) ) WITH ( HEAP ) COPY INTO [dbo].[CustomerSource] FROM 'https://solliancepublicdata.blob.core.windows.net/dataengineering/dp-203/awdata/CustomerSource.csv' WITH ( FILE_TYPE='CSV', FIELDTERMINATOR='|', FIELDQUOTE='', ROWTERMINATOR='0x0a', ENCODING = 'UTF16' ) CREATE TABLE dbo.[DimCustomer]( [CustomerID] [int] NOT NULL, [Title] [nvarchar](8) NULL, [FirstName] [nvarchar](50) NOT NULL, [MiddleName] [nvarchar](50) NULL, [LastName] [nvarchar](50) NOT NULL, [Suffix] [nvarchar](10) NULL, [CompanyName] [nvarchar](128) NULL, [SalesPerson] [nvarchar](256) NULL, [EmailAddress] [nvarchar](50) NULL, [Phone] [nvarchar](25) NULL, [InsertedDate] [datetime] NOT NULL, [ModifiedDate] [datetime] NOT NULL, [HashKey] [char](64) ) WITH ( DISTRIBUTION = REPLICATE, CLUSTERED COLUMNSTORE INDEX )
マッピング データ フローを作成する
マッピング データ フローはパイプライン アクティビティで、コードを書かないデータ変換方法を指定する視覚的な方法を提供します。 次に、タイプ 1 の SCD を作成するマッピング データ フローを作成します。
[開発] ハブに移動します。
[+] を選んでから、[データ フロー] を選びます。
新しいデータ フローのプロパティ ペインで、[名前] フィールド (1) に「
UpdateCustomerDimension
」と入力し、[プロパティ] ボタン (2) を選択してプロパティ ペインを非表示にします。キャンバス上の [ソースの追加] を選択します。
Source settings
で、次のプロパティを構成します。- [出力ストリーム名]: 「
SourceDB
」を入力します - [ソースの種類]:
Dataset
を選択します - [オプション]:
Allow schema drift
をオンにし、他のオプションをオフのままにします - [サンプリング]:
Disable
を選択します - [データセット]: [+ 新規] を選択して、新しいデータセットを作成します
- [出力ストリーム名]: 「
新しい統合データセット ダイアログで、[Azure Synapse Analytics] を選択し、[続行] を選択します。
データセットのプロパティで、次のように構成します。
- [名前]: 「
CustomerSource
」を入力します - [リンクされたサービス]: Synapse ワークスペースのリンクされたサービスを選択します
- [テーブル名]: ドロップダウンの横にある [更新] ボタンを選択します
- [名前]: 「
[値] フィールドにご自分の SQL プール名を入力し、[OK] を選択します。
[テーブル名] で
dbo.CustomerSource
を選択し、[スキーマのインポート] でFrom connection/store
を選択し、[OK] を選択して、データセットを作成します。追加した
CustomerSource
データセットの横にある [開く] を選択します。DBName
の横にある [値] フィールドにご自分の SQL プール名を入力します。データ フロー エディターで、SourceDB アクティビティの下にある [ソースの追加] ボックスを選択します。 CustomerSource で使用したのと同じ手順に従って、このソースを DimCustomer テーブルとして構成します。
- [出力ストリーム名]: 「
DimCustomer
」を入力します - [ソースの種類]:
Dataset
を選択します - [オプション]:
Allow schema drift
をオンにし、他のオプションをオフのままにします - [サンプリング]:
Disable
を選択します - [データセット]: [+ 新規] を選択して、新しいデータセットを作成します。 リンクされたサービスの Azure Synapse を使用して、DimCustomer テーブルを選択します。 ご自分の SQL プール名には必ず DBName を設定してください。
- [出力ストリーム名]: 「
変換をデータ フローに追加する
キャンバス上の
SourceDB
ソースの右側にある [+] を選んでから、[派生列] を選びます。Derived column's settings
で、次のプロパティを構成します。- [出力ストリーム名]: 「
CreateCustomerHash
」を入力します - [Incoming stream](受信ストリーム):
SourceDB
を選択します - [列]: 次のように入力します。
列 式 説明 「 HashKey
」を入力sha2(256, iifNull(Title,'') +FirstName +iifNull(MiddleName,'') +LastName +iifNull(Suffix,'') +iifNull(CompanyName,'') +iifNull(SalesPerson,'') +iifNull(EmailAddress,'') +iifNull(Phone,''))
テーブル値の SHA256 ハッシュを作成します。 これを使用して、入力レコードのハッシュとターゲット レコードのハッシュ値を比較し、 CustomerID
値で照合することで、行の変更を検出します。iifNull
関数を使用して、null 値を空の文字列に置き換えます。 それ以外の場合、null エントリが存在すると、ハッシュ値が重複する傾向があります。- [出力ストリーム名]: 「
キャンバス上の
CreateCustomerHash
派生列の右側にある [+] を選んでから、[存在する] を選びます。Exists settings
で、次のプロパティを構成します。- [出力ストリーム名]: 「
Exists
」を入力します - [Left stream](左側のストリーム):
CreateCustomerHash
を選択します - [Right stream](右側のストリーム):
SynapseDimCustomer
を選択します - [Exist type](存在タイプ):
Doesn't exist
を選択します - [Exists conditions](存在条件): 左と右に次のように設定します。
左: CreateCustomerHash の列 右: SynapseDimCustomer の列 HashKey
HashKey
- [出力ストリーム名]: 「
キャンバス上の
Exists
の右側にある [+] を選んでから、[検索] を選びます。Lookup settings
で、次のプロパティを構成します。- [出力ストリーム名]: 「
LookupCustomerID
」を入力します - [Primary stream](プライマリ ストリーム):
Exists
を選択します - [Lookup stream](検索ストリーム):
SynapseDimCustomer
を選択します - [Match multiple rows](複数の行の一致): オフ
- [Match on](一致対象):
Any row
を選択します - [Lookup conditions](検索条件): 左と右に次のように設定します。
左: Exists の列 右: SynapseDimCustomer の列 CustomerID
CustomerID
- [出力ストリーム名]: 「
キャンバス上の
LookupCustomerID
の右側にある [+] を選んでから、[派生列] を選択します。Derived column's settings
で、次のプロパティを構成します。- [出力ストリーム名]: 「
SetDates
」を入力します - [Incoming stream](受信ストリーム):
LookupCustomerID
を選択します - [列]: 次のように入力します。
列 式 説明 [ InsertedDate
] を選択しますiif(isNull(InsertedDate), currentTimestamp(), {InsertedDate})
InsertedDate
値が null の場合、現在のタイムスタンプを挿入します。 それ以外の場合は、InsertedDate
値を使用します。[ ModifiedDate
] を選択しますcurrentTimestamp()
常に ModifiedDate
値を現在のタイムスタンプで値を更新します。注意
2 番目の列を挿入するには、[列] リストの上にある [+ 追加] を選択し、[列の追加] を選択します。
- [出力ストリーム名]: 「
キャンバス上の
SetDates
派生列の右側にある [+] を選んでから、[行の変更] を選びます。Alter row settings
で、次のプロパティを構成します。- [出力ストリーム名]: 「
AllowUpserts
」を入力します - [Incoming stream](受信ストリーム):
SetDates
を選択します - [Alter row conditions](行の変更条件): 次のように入力します。
条件 式 説明 [ Upsert if
] を選択しますtrue()
Upsert if
条件で、条件をtrue()
に設定して、アップサートを許可します。 これにより、マッピング データ フローのステップを通過するすべてのデータが確実にシンクに挿入または更新されるようになります。- [出力ストリーム名]: 「
キャンバス上の
AllowUpserts
行の変更ステップの右側にある [+] を選んでから、[シンク] を選びます。Sink
で、次のプロパティを構成します。- [出力ストリーム名]: 「
Sink
」を入力します - [Incoming stream](受信ストリーム):
AllowUpserts
を選択します - [Sink type](シンクの種類):
Dataset
を選択します - [データセット]:
DimCustomer
を選択します - [オプション]:
Allow schema drift
をオン、Validate schema
をオフにします
- [出力ストリーム名]: 「
[設定] タブを選択し、次のプロパティを構成します。
- [更新方法]:
Allow upsert
をオン、他のすべてのオプションをオフにします - [キー列]:
List of columns
を選択し、リスト内のCustomerID
を選択します - [Table action](テーブル アクション):
None
を選択します - [Enable staging](ステージングを有効にする): オフ
- [更新方法]:
[マッピング] タブを選択し、[Auto mapping](自動マッピング) をオフにします。 次に示すように、入力列のマッピングを構成します。
入力列 出力列 SourceDB@CustomerID
CustomerID
SourceDB@Title
Title
SourceDB@FirstName
FirstName
SourceDB@MiddleName
MiddleName
SourceDB@LastName
LastName
SourceDB@Suffix
Suffix
SourceDB@CompanyName
CompanyName
SourceDB@SalesPerson
SalesPerson
SourceDB@EmailAddress
EmailAddress
SourceDB@Phone
Phone
InsertedDate
InsertedDate
ModifiedDate
ModifiedDate
CreateCustomerHash@HashKey
HashKey
完成したマッピング フローは次のようになります。 [すべて公開] を選択して、変更を保存します。
[発行] を選びます。
データ フローをテストするには
タイプ 1 の SCD データ フローが完成しました。 これのテストを選択した場合は、このデータ フローを Synapse 統合パイプラインに追加できます。 次に、パイプラインを 1 回実行して、顧客ソース データをターゲットの DimCustomer に初期読み込みすることができます。
パイプラインを追加実行するたびに、ソース テーブル内のデータとディメンション テーブル内に既に存在するデータが (HashKey を使用して) 比較され、変更のあったレコードのみが更新されます。 これをテストするには、ソース テーブル内のレコードを更新した後、パイプラインを再度実行して、ディメンション テーブル内のレコードが更新されることを確認します。
顧客の Janet Gates を例にとります。 初期読み込みで、LastName
が Gates、CustomerId
が 4 であることが示されます。
次に、ソース テーブル内の顧客の姓を更新するステートメントの例を示します。
UPDATE [dbo].[CustomerSource]
SET LastName = 'Lopez'
WHERE [CustomerId] = 4
レコードを更新し、パイプラインを再度実行すると、DimCustomer で、この更新されたデータが表示されます。
顧客レコードの LastName
値がソース レコードと一致するように正常に更新され、ModifiedDate
も古い LastName
値を追跡せずに更新されました。 これは、タイプ 1 の SCD の予想される動作です。 LastName
フィールドに履歴が必要な場合は、テーブルとデータ フローを、これまでに説明した他の SCD タイプのいずれかに変更します。