Exercício - Projetar e implementar uma dimensão de mudança lenta do Tipo 1 com mapeamento de fluxos de dados
Neste exercício, você cria um fluxo de dados para um SCD Tipo 1 usando o pool SQL dedicado do Azure Synapse como origem e destino. Esse fluxo de dados pode então ser adicionado a um pipeline Synapse e executado como parte do processo de extração, transformação, carga (ETL).
Tabela de origem e dimensão da configuração
Para este exercício, você deseja carregar uma tabela de dimensão no Azure Synapse a partir de dados de origem que podem ser de muitos tipos de sistema diferentes, como Azure SQL, armazenamento do Azure, etc. Para este exemplo, você mantém isso simples criando os dados de origem em seu banco de dados do Azure Synapse.
No Synapse Studio, navegue até o hub de dados .
Selecione a guia Espaço de trabalho (1), expanda Bancos de dados e clique com o botão direito do mouse em SQLPool01 (2). Selecione Novo script SQL (3) e, em seguida, selecione Script vazio (4).
Cole o seguinte script na janela de script vazia e selecione Executar ou pressione
F5
para executar a consulta:CREATE TABLE [dbo].[CustomerSource] ( [CustomerID] [int] NOT NULL, [Title] [nvarchar](8), [FirstName] [nvarchar](50), [MiddleName] [nvarchar](50), [LastName] [nvarchar](50), [Suffix] [nvarchar](10), [CompanyName] [nvarchar](128), [SalesPerson] [nvarchar](256), [EmailAddress] [nvarchar](50), [Phone] [nvarchar](25) ) WITH ( HEAP ) COPY INTO [dbo].[CustomerSource] FROM 'https://solliancepublicdata.blob.core.windows.net/dataengineering/dp-203/awdata/CustomerSource.csv' WITH ( FILE_TYPE='CSV', FIELDTERMINATOR='|', FIELDQUOTE='', ROWTERMINATOR='0x0a', ENCODING = 'UTF16' ) CREATE TABLE dbo.[DimCustomer]( [CustomerID] [int] NOT NULL, [Title] [nvarchar](8) NULL, [FirstName] [nvarchar](50) NOT NULL, [MiddleName] [nvarchar](50) NULL, [LastName] [nvarchar](50) NOT NULL, [Suffix] [nvarchar](10) NULL, [CompanyName] [nvarchar](128) NULL, [SalesPerson] [nvarchar](256) NULL, [EmailAddress] [nvarchar](50) NULL, [Phone] [nvarchar](25) NULL, [InsertedDate] [datetime] NOT NULL, [ModifiedDate] [datetime] NOT NULL, [HashKey] [char](64) ) WITH ( DISTRIBUTION = REPLICATE, CLUSTERED COLUMNSTORE INDEX )
Criar um fluxo de dados de mapeamento
Mapeamento de fluxos de dados são atividades de pipeline que fornecem uma maneira visual de especificar como transformar dados, por meio de uma experiência sem código. Em seguida, você criará um fluxo de dados de mapeamento para criar um SCD Tipo 1.
Navegue até o hub Revelar .
Selecione +e, em seguida, selecione Fluxo de dados.
No painel de propriedades do novo fluxo de dados, insira o campo Nome (1) e selecione o botão Propriedades (2) para ocultar o painel de propriedades.
UpdateCustomerDimension
Selecione Adicionar código-fonte na tela.
Em
Source settings
, configure as seguintes propriedades:- Nome do fluxo de saída: Enter
SourceDB
- Tipo de fonte: Selecione
Dataset
- Opções: Marque
Allow schema drift
e deixe as outras opções desmarcadas - Amostragem: Selecione
Disable
- Conjunto de dados: Selecione + Novo para criar um novo conjunto de dados
- Nome do fluxo de saída: Enter
Na nova caixa de diálogo do conjunto de dados de integração, selecione Azure Synapse Analytics e, em seguida, selecione Continuar.
Nas propriedades do conjunto de dados, configure o seguinte:
- Nome: Digite
CustomerSource
- Serviço vinculado: selecione o serviço vinculado do espaço de trabalho Synapse
- Nome da tabela: selecione o botão Atualizar ao lado da lista suspensa
- Nome: Digite
No campo Valor, insira o nome do Pool SQL e selecione OK.
Selecione
dbo.CustomerSource
em Nome da tabela, selecioneFrom connection/store
em Importar esquema e, em seguida, selecione OK para criar o conjunto de dados.Selecione Abrir ao lado do
CustomerSource
conjunto de dados que você adicionou.Insira o nome do Pool SQL no campo Valor ao lado de
DBName
.No editor de fluxo de dados, selecione a caixa Adicionar fonte abaixo da atividade SourceDB. Configure essa fonte como a tabela DimCustomer seguindo as mesmas etapas usadas para CustomerSource.
- Nome do fluxo de saída: Enter
DimCustomer
- Tipo de fonte: Selecione
Dataset
- Opções: Marque
Allow schema drift
e deixe as outras opções desmarcadas - Amostragem: Selecione
Disable
- Conjunto de dados: Selecione + Novo para criar um novo conjunto de dados. Use o serviço vinculado do Azure Synapse e escolha a tabela DimCustomer. Certifique-se de definir o DBName para o nome do seu Pool SQL.
- Nome do fluxo de saída: Enter
Adicionar transformações ao fluxo de dados
Selecione + à direita da
SourceDB
fonte na tela e, em seguida, selecione Coluna derivada.Em
Derived column's settings
, configure as seguintes propriedades:- Nome do fluxo de saída: Enter
CreateCustomerHash
- Fluxo de entrada: Selecione
SourceDB
- Colunas: insira o seguinte:
Column Expression Description Digite HashKey
sha2(256, iifNull(Title,'') +FirstName +iifNull(MiddleName,'') +LastName +iifNull(Suffix,'') +iifNull(CompanyName,'') +iifNull(SalesPerson,'') +iifNull(EmailAddress,'') +iifNull(Phone,''))
Cria um hash SHA256 dos valores da tabela. Utilizamos isto para detetar alterações na linha, ao comparar o hash dos registos de entrada com o valor hash dos registos de destino, o que corresponde ao valor CustomerID
. AiifNull
função substitui valores nulos por cadeias de caracteres vazias. Caso contrário, os valores de hash tendem a duplicar quando entradas nulas estão presentes.- Nome do fluxo de saída: Enter
Selecione + à direita da
CreateCustomerHash
coluna derivada na tela e, em seguida, selecione Existe.Em
Exists settings
, configure as seguintes propriedades:- Nome do fluxo de saída: Enter
Exists
- Fluxo à esquerda: Selecione
CreateCustomerHash
- Fluxo à direita: Selecione
SynapseDimCustomer
- Tipo de existência: Selecione
Doesn't exist
- Existem condições: Defina o seguinte para Esquerda e Direita:
Esquerda: coluna de CreateCustomerHash Direita: coluna SynapseDimCustomer HashKey
HashKey
- Nome do fluxo de saída: Enter
Selecione + à direita de
Exists
na tela e, em seguida, selecione Pesquisar.Em
Lookup settings
, configure as seguintes propriedades:- Nome do fluxo de saída: Enter
LookupCustomerID
- Fluxo primário: Selecione
Exists
- Fluxo de pesquisa: Selecione
SynapseDimCustomer
- Corresponder várias linhas: Não verificado
- Match em: Selecione
Any row
- Condições de pesquisa: defina o seguinte para Esquerda e Direita:
Esquerda: Coluna do existe Direita: coluna SynapseDimCustomer CustomerID
CustomerID
- Nome do fluxo de saída: Enter
Selecione + à direita da
LookupCustomerID
tela e, em seguida, selecione Coluna derivada.Em
Derived column's settings
, configure as seguintes propriedades:- Nome do fluxo de saída: Enter
SetDates
- Fluxo de entrada: Selecione
LookupCustomerID
- Colunas: insira o seguinte:
Column Expression Description Selecione InsertedDate
iif(isNull(InsertedDate), currentTimestamp(), {InsertedDate})
Se o InsertedDate
valor for null, insira o carimbo de data/hora atual. Caso contrário, use oInsertedDate
valor.Selecione ModifiedDate
currentTimestamp()
Sempre atualize o ModifiedDate
valor com o carimbo de data/hora atual.Nota
Para inserir a segunda coluna, selecione + Adicionar acima da lista Colunas e, em seguida, selecione Adicionar coluna.
- Nome do fluxo de saída: Enter
Selecione + à direita da
SetDates
etapa da coluna derivada na tela e, em seguida, selecione Alterar linha.Em
Alter row settings
, configure as seguintes propriedades:- Nome do fluxo de saída: Enter
AllowUpserts
- Fluxo de entrada: Selecione
SetDates
- Alterar condições da linha: Insira o seguinte:
Condição Expression Description Selecione Upsert if
true()
Defina a condição como true()
on paraUpsert if
permitir upserts. Isso garante que todos os dados que passam pelas etapas no fluxo de dados de mapeamento serão inseridos ou atualizados no coletor.- Nome do fluxo de saída: Enter
Selecione + à direita da
AllowUpserts
etapa de alterar linha na tela e, em seguida, selecione Coletor.Em
Sink
, configure as seguintes propriedades:- Nome do fluxo de saída: Enter
Sink
- Fluxo de entrada: Selecione
AllowUpserts
- Tipo de lavatório: Selecione
Dataset
- Conjunto de dados: Selecione
DimCustomer
- Opções: Marque
Allow schema drift
e desmarqueValidate schema
- Nome do fluxo de saída: Enter
Selecione a guia Configurações e configure as seguintes propriedades:
- Método de atualização: marque
Allow upsert
e desmarque todas as outras opções - Colunas-chave: Selecione
List of columns
e, em seguida, selecioneCustomerID
na lista - Ação da tabela: Selecionar
None
- Ativar preparo: Não verificado
- Método de atualização: marque
Selecione a guia Mapeamento e desmarque Mapeamento automático. Configure o mapeamento de colunas de entrada conforme descrito abaixo:
Colunas de entrada Colunas de saída SourceDB@CustomerID
CustomerID
SourceDB@Title
Title
SourceDB@FirstName
FirstName
SourceDB@MiddleName
MiddleName
SourceDB@LastName
LastName
SourceDB@Suffix
Suffix
SourceDB@CompanyName
CompanyName
SourceDB@SalesPerson
SalesPerson
SourceDB@EmailAddress
EmailAddress
SourceDB@Phone
Phone
InsertedDate
InsertedDate
ModifiedDate
ModifiedDate
CreateCustomerHash@HashKey
HashKey
O fluxo de mapeamento concluído deve ter a seguinte aparência. Selecione Publicar tudo para salvar as alterações.
Selecione Publicar.
Como testar o fluxo de dados
Você concluiu um fluxo de dados SCD Tipo 1. Se você optar por testá-lo, poderá adicionar esse fluxo de dados a um pipeline de integração do Sinapse. Em seguida, você pode executar o pipeline uma vez para fazer o carregamento inicial dos dados de origem do cliente para o destino DimCustomer.
Cada execução adicional do pipeline comparará os dados na tabela de origem com o que já está na tabela de dimensões (usando a HashKey) e atualizará apenas os registros que foram alterados. Para testar isso, você pode atualizar um registro na tabela de origem e, em seguida, executar o pipeline novamente e verificar as atualizações de registro na tabela de dimensões.
Tomemos como exemplo a cliente Janet Gates. A carga inicial mostra o LastName
é Gates e o CustomerId
é 4.
Aqui está uma instrução de exemplo que atualizaria o sobrenome do cliente na tabela de origem.
UPDATE [dbo].[CustomerSource]
SET LastName = 'Lopez'
WHERE [CustomerId] = 4
Depois de atualizar o registro e executar o pipeline novamente, DimCustomer mostraria esses dados atualizados.
O registro do cliente atualizou com êxito o LastName
valor para corresponder ao registro de origem e atualizou o ModifiedDate
, sem acompanhar o valor antigo LastName
. Esse é o comportamento esperado para um SCD Tipo 1. Se o histórico fosse necessário para o LastName
campo, você modificaria a tabela e o fluxo de dados para ser um dos outros tipos de SCD que você aprendeu.