Exercício – Criar e implementar uma dimensão de alteração lenta do Tipo 1 com fluxos de dados de mapeamento

Concluído

Neste exercício, você cria um fluxo de dados para uma SCD do Tipo 1 usando o pool de SQL dedicado do Azure Synapse como origem e destino. Esse fluxo de dados pode, então, ser adicionado a um pipeline do Synapse e executado como parte do processo de ETL (extração, transformação e carregamento).

Configurar a origem e a tabela de dimensões

Para este exercício, carregue uma tabela de dimensões no Azure Synapse de dados de origem que podem ser de vários tipos de sistema, como o SQL do Azure, o Armazenamento do Azure etc. Mantenha a simplicidade criando os dados de origem em seu banco de dados do Azure Synapse.

  1. No Synapse Studio, navegue até o hub Dados.

    Hub de dados.

  2. Selecione a guia Workspace(1), expanda Bancos de dados e clique com o botão direito do mouse em SQLPool01 (2). Selecione Novo script SQL (3) e escolha Script vazio (4).

    O hub de dados é exibido com os menus de contexto para criar um script SQL.

  3. Cole o seguinte script na janela de script vazio e selecione Executar ou pressione F5 para executar a consulta:

    CREATE TABLE [dbo].[CustomerSource] (
        [CustomerID] [int] NOT NULL,
        [Title] [nvarchar](8),
        [FirstName] [nvarchar](50),
        [MiddleName] [nvarchar](50),
        [LastName] [nvarchar](50),
        [Suffix] [nvarchar](10),
        [CompanyName] [nvarchar](128),
        [SalesPerson] [nvarchar](256),
        [EmailAddress] [nvarchar](50),
        [Phone] [nvarchar](25)
    ) WITH ( HEAP )
    
    COPY INTO [dbo].[CustomerSource]
    FROM 'https://solliancepublicdata.blob.core.windows.net/dataengineering/dp-203/awdata/CustomerSource.csv'
    WITH (
        FILE_TYPE='CSV',
        FIELDTERMINATOR='|',
        FIELDQUOTE='',
        ROWTERMINATOR='0x0a',
        ENCODING = 'UTF16'
    )
    
    CREATE TABLE dbo.[DimCustomer](
        [CustomerID] [int] NOT NULL,
        [Title] [nvarchar](8) NULL,
        [FirstName] [nvarchar](50) NOT NULL,
        [MiddleName] [nvarchar](50) NULL,
        [LastName] [nvarchar](50) NOT NULL,
        [Suffix] [nvarchar](10) NULL,
        [CompanyName] [nvarchar](128) NULL,
        [SalesPerson] [nvarchar](256) NULL,
        [EmailAddress] [nvarchar](50) NULL,
        [Phone] [nvarchar](25) NULL,
        [InsertedDate] [datetime] NOT NULL,
        [ModifiedDate] [datetime] NOT NULL,
        [HashKey] [char](64)
    )
    WITH
    (
        DISTRIBUTION = REPLICATE,
        CLUSTERED COLUMNSTORE INDEX
    )
    

    O script e o botão “Executar” estão realçados.

Criar um fluxo de dados de mapeamento

Fluxos de dados de mapeamento são atividades de pipeline que fornecem uma forma visual de especificar como transformar dados por meio de uma experiência sem código. Em seguida, você criará um fluxo de dados de mapeamento para criar um SCD tipo 1.

  1. Navegue até o hub Desenvolver.

    Hub de desenvolvimento.

  2. Selecione + e escolha +.

    O botão de adição e o item de menu “Fluxo de dados” estão realçados.

  3. No painel de propriedades do novo fluxo de dados, insira UpdateCustomerDimension no campo UpdateCustomerDimension(1) e selecione o botão Propriedades(2) para ocultar o painel de propriedades.

    O painel de propriedades do fluxo de dados é exibido.

  4. Selecione Adicionar origem na tela.

    O botão “Adicionar fonte” está realçado na tela do fluxo de dados.

  5. Em Source settings, configure as seguintes propriedades:

    • Nome do fluxo de saída: insira SourceDB
    • Tipo de origem: selecione Dataset
    • Opções: marque Allow schema drift e deixe as outras opções desmarcadas
    • Amostragem: selecione Disable
    • Conjunto de dados: selecione + Novo para criar um conjunto de dados

    O botão “Novo” está realçado ao lado de “Conjunto de dados”.

  6. Na caixa de diálogo do novo conjunto de dados da integração, selecione Azure Synapse Analytics e selecione Continuar.

    O Banco de dados SQL do Azure e o botão “Continuar” estão realçados.

  7. Nas propriedades do conjunto de dados, configure o seguinte:

    • Nome: insira CustomerSource
    • Serviço vinculado: selecione o serviço vinculado do workspace do Synapse
    • Nome da tabela: selecione o botão Atualizar ao lado do menu suspenso

    O formulário está configurado conforme descrito e o botão de atualização está realçado.

  8. No campo Valor, insira o nome do pool de SQL e selecione OK.

    O parâmetro SQLPool01 está realçado.

  9. Selecione dbo.CustomerSource em dbo.CustomerSource, selecione From connection/store em From connection/store e clique em OK para criar o conjunto de dados.

    O formulário está preenchido conforme descrito.

  10. Selecione Abrir ao lado do conjunto de dados CustomerSource que você adicionou.

    O botão “Abrir” está realçado ao lado do novo conjunto de dados.

  11. Insira o nome de seu pool de SQL no campo Valor ao lado de DBName.

  12. No editor de fluxo de dados, selecione a caixa Adicionar origem abaixo da atividade SourceDB. Configure essa origem como a tabela DimCustomer seguindo as mesmas etapas usadas para CustomerSource.

    • Nome do fluxo de saída: insira DimCustomer
    • Tipo de origem: selecione Dataset
    • Opções: marque Allow schema drift e deixe as outras opções desmarcadas
    • Amostragem: selecione Disable
    • Conjunto de dados: selecione + Novo para criar um conjunto de dados. Use o serviço vinculado do Azure Synapse e escolha a tabela DimCustomer. Defina o DBName como o nome do pool de SQL.

    “Adicionar fonte”, “Nome do fluxo de saída” e “Nome do conjunto de dados” estão realçados nas configurações da fonte.

Adicionar transformações ao fluxo de dados

  1. Selecione + à direita da origem SourceDB na tela e selecione +.

    O botão de adição e o item de menu “Coluna derivada” estão realçados.

  2. Em Derived column's settings, configure as seguintes propriedades:

    • Nome do fluxo de saída: insira CreateCustomerHash
    • Fluxo de entrada: selecione SourceDB
    • Colunas: insira o seguinte:
    Coluna Expression Descrição
    Digite HashKey sha2(256, iifNull(Title,'') +FirstName +iifNull(MiddleName,'') +LastName +iifNull(Suffix,'') +iifNull(CompanyName,'') +iifNull(SalesPerson,'') +iifNull(EmailAddress,'') +iifNull(Phone,'')) Cria um hash SHA256 dos valores da tabela. É usada para detectar alterações de linha comparando o hash dos registros de entrada com o valor de hash dos registros de destino, correspondendo ao valor CustomerID. A função iifNull substitui valores nulos por cadeias de caracteres vazias. Caso contrário, os valores de hash tendem a ser duplicados quando entradas nulas estão presentes.

    O formulário de configurações de “Coluna derivada” está definido conforme descrito.

  3. Selecione + à direita da coluna derivada CreateCustomerHash na tela e selecione +.

    O botão de adição e o item de menu “Existe” estão realçados.

  4. Em Exists settings, configure as seguintes propriedades:

    • Nome do fluxo de saída: insira Exists
    • Fluxo à esquerda: selecione CreateCustomerHash
    • Fluxo à direita: selecione SynapseDimCustomer
    • Tipo de existe: selecione Doesn't exist
    • Condições de existe: defina o seguinte para Esquerda e Direita:
    Esquerda: coluna de CreateCustomerHash Direita: coluna de SynapseDimCustomer
    HashKey HashKey

    O formulário de configurações do item de menu “Existe” está definido conforme descrito.

  5. Selecione + à direita de Exists na tela e selecione +.

    O botão de adição e o item do menu “Pesquisa” estão realçados.

  6. Em Lookup settings, configure as seguintes propriedades:

    • Nome do fluxo de saída: insira LookupCustomerID
    • Fluxo primário: selecione Exists
    • Fluxo de pesquisa: selecione SynapseDimCustomer
    • Corresponder várias linhas: desmarcado
    • Corresponder em: selecione Any row
    • Condições de pesquisa: defina o seguinte para Esquerda e Direita:
    Esquerda: coluna de Existe Direita: coluna de SynapseDimCustomer
    CustomerID CustomerID

    O formulário de configurações de pesquisa está definido conforme descrito.

  7. Selecione + à direita de LookupCustomerID na tela e selecione +.

    O botão de adição e o item de menu “Coluna derivada” estão realçados.

  8. Em Derived column's settings, configure as seguintes propriedades:

    • Nome do fluxo de saída: insira SetDates
    • Fluxo de entrada: selecione LookupCustomerID
    • Colunas: insira o seguinte:
    Coluna Expression Descrição
    Selecione InsertedDate iif(isNull(InsertedDate), currentTimestamp(), {InsertedDate}) Se o valor de InsertedDate for nulo, insira o carimbo de data/hora atual. Caso contrário, use o valor InsertedDate.
    Selecione ModifiedDate currentTimestamp() Sempre atualize o valor de ModifiedDate com o carimbo de data/hora atual.

    O formulário de configurações de outra coluna derivada está definido conforme descrito.

    Observação

    Para inserir a segunda coluna, selecione + Adicionar acima da lista Colunas e escolha Adicionar coluna.

  9. Selecione + à direita da etapa da coluna derivada SetDates na tela e selecione +.

    O botão de adição e o item de menu “Alterar linha” estão realçados.

  10. Em Alter row settings, configure as seguintes propriedades:

    • Nome do fluxo de saída: insira AllowUpserts
    • Fluxo de entrada: selecione SetDates
    • Condições de alterar linha: insira o seguinte:
    Condição Expression Descrição
    Selecione Upsert if true() Defina a condição como true() na condição Upsert if para permitir upserts. Isso garante que todos os dados que passam pelas etapas do fluxo de dados de mapeamento sejam inseridos ou atualizados no coletor.

    O formulário de configurações de alteração de linha está definido conforme descrito.

  11. Selecione + à direita da etapa de alterar linha AllowUpserts na tela e selecione +.

    O botão de adição e o item de menu “Coletor” estão realçados.

  12. Em Sink, configure as seguintes propriedades:

    • Nome do fluxo de saída: insira Sink
    • Fluxo de entrada: selecione AllowUpserts
    • Tipo de coletor: selecione Dataset
    • Conjunto de dados: selecione DimCustomer
    • Opções: marque Allow schema drift e desmarque Validate schema

    O formulário de propriedades do coletor está configurado conforme descrito.

  13. Selecione a guia Configurações e configure as seguintes propriedades:

    • Método de atualização: marque Allow upsert e desmarque todas as outras opções
    • Colunas de chave: selecione List of columns e escolha CustomerID na lista
    • Ação da tabela: selecione None
    • Habilitar preparo: desmarcado

    As configurações do coletor estão definidas conforme descrito.

  14. Selecione a guia Mapeamento e desmarque Mapeamento automático. Configure o mapeamento das colunas de entrada conforme descrito abaixo:

    Colunas de entrada Colunas de saída
    SourceDB@CustomerID CustomerID
    SourceDB@Title Title
    SourceDB@FirstName FirstName
    SourceDB@MiddleName MiddleName
    SourceDB@LastName LastName
    SourceDB@Suffix Suffix
    SourceDB@CompanyName CompanyName
    SourceDB@SalesPerson SalesPerson
    SourceDB@EmailAddress EmailAddress
    SourceDB@Phone Phone
    InsertedDate InsertedDate
    ModifiedDate ModifiedDate
    CreateCustomerHash@HashKey HashKey

    As configurações de mapeamento estão definidas conforme descrito.

  15. O fluxo de mapeamento completo deverá ter a aparência a seguir. Selecione Publicar tudo para salvar as alterações.

    O fluxo de dados concluído é exibido e “Publicar tudo” está realçado.

  16. Selecione Publicar.

    O botão “Publicar” está realçado.

Como testar o fluxo de dados

Você concluiu um fluxo de dados de SCD do Tipo 1. Se optar por testá-lo, você poderá adicionar esse fluxo de dados a um pipeline de integração do Synapse. Em seguida, você pode executar o pipeline uma vez para fazer a carga inicial dos dados de origem do cliente para o destino DimCustomer.

Cada execução adicional do pipeline vai comparar os dados na tabela de origem com o que já está na tabela de dimensões (usando o HashKey) e apenas atualizar os registros que foram alterados. Para testar isso, você pode atualizar um registro na tabela de origem, executar o pipeline novamente e verificar as atualizações do registro na tabela de dimensões.

Vamos usar a cliente Janet Gates como exemplo. A carga inicial mostra que LastName é Gates e CustomerId é 4.

O script é exibido com o registro inicial do cliente.

Este é um exemplo de instrução que atualizaria o sobrenome do cliente na tabela de origem.

UPDATE [dbo].[CustomerSource]
SET LastName = 'Lopez'
WHERE [CustomerId] = 4

Depois de atualizar o registro e executar o pipeline novamente, DimCustomer mostraria esses dados atualizados.

O script é exibido com o registro atualizado do cliente.

O registro do cliente atualizou com êxito o valor de LastName para corresponder ao registro de origem e atualizou o ModifiedDate, sem acompanhar o valor antigo de LastName. Esse é o comportamento esperado para uma SCD do Tipo 1. Se o histórico fosse necessário para o campo LastName, você modificaria a tabela e o fluxo de dados para um dos outros tipos de SCD descritos.