Exercício - Projetar e implementar uma dimensão de mudança lenta do Tipo 1 com mapeamento de fluxos de dados

Concluído

Neste exercício, você cria um fluxo de dados para um SCD Tipo 1 usando o pool SQL dedicado do Azure Synapse como origem e destino. Esse fluxo de dados pode então ser adicionado a um pipeline Synapse e executado como parte do processo de extração, transformação, carga (ETL).

Tabela de origem e dimensão da configuração

Para este exercício, você deseja carregar uma tabela de dimensão no Azure Synapse a partir de dados de origem que podem ser de muitos tipos de sistema diferentes, como Azure SQL, armazenamento do Azure, etc. Para este exemplo, você mantém isso simples criando os dados de origem em seu banco de dados do Azure Synapse.

  1. No Synapse Studio, navegue até o hub de dados .

    Hub de dados.

  2. Selecione a guia Espaço de trabalho (1), expanda Bancos de dados e clique com o botão direito do mouse em SQLPool01 (2). Selecione Novo script SQL (3) e, em seguida, selecione Script vazio (4).

    O hub de dados é exibido com os menus de contexto para criar um novo script SQL.

  3. Cole o seguinte script na janela de script vazia e selecione Executar ou pressione F5 para executar a consulta:

    CREATE TABLE [dbo].[CustomerSource] (
        [CustomerID] [int] NOT NULL,
        [Title] [nvarchar](8),
        [FirstName] [nvarchar](50),
        [MiddleName] [nvarchar](50),
        [LastName] [nvarchar](50),
        [Suffix] [nvarchar](10),
        [CompanyName] [nvarchar](128),
        [SalesPerson] [nvarchar](256),
        [EmailAddress] [nvarchar](50),
        [Phone] [nvarchar](25)
    ) WITH ( HEAP )
    
    COPY INTO [dbo].[CustomerSource]
    FROM 'https://solliancepublicdata.blob.core.windows.net/dataengineering/dp-203/awdata/CustomerSource.csv'
    WITH (
        FILE_TYPE='CSV',
        FIELDTERMINATOR='|',
        FIELDQUOTE='',
        ROWTERMINATOR='0x0a',
        ENCODING = 'UTF16'
    )
    
    CREATE TABLE dbo.[DimCustomer](
        [CustomerID] [int] NOT NULL,
        [Title] [nvarchar](8) NULL,
        [FirstName] [nvarchar](50) NOT NULL,
        [MiddleName] [nvarchar](50) NULL,
        [LastName] [nvarchar](50) NOT NULL,
        [Suffix] [nvarchar](10) NULL,
        [CompanyName] [nvarchar](128) NULL,
        [SalesPerson] [nvarchar](256) NULL,
        [EmailAddress] [nvarchar](50) NULL,
        [Phone] [nvarchar](25) NULL,
        [InsertedDate] [datetime] NOT NULL,
        [ModifiedDate] [datetime] NOT NULL,
        [HashKey] [char](64)
    )
    WITH
    (
        DISTRIBUTION = REPLICATE,
        CLUSTERED COLUMNSTORE INDEX
    )
    

    O script e o botão Executar são realçados.

Criar um fluxo de dados de mapeamento

Mapeamento de fluxos de dados são atividades de pipeline que fornecem uma maneira visual de especificar como transformar dados, por meio de uma experiência sem código. Em seguida, você criará um fluxo de dados de mapeamento para criar um SCD Tipo 1.

  1. Navegue até o hub Revelar .

    Desenvolver hub.

  2. Selecione +e, em seguida, selecione Fluxo de dados.

    O botão de adição e o item de menu de fluxo de dados são realçados.

  3. No painel de propriedades do novo fluxo de dados, insira o campo Nome (1) e selecione o botão Propriedades (2) para ocultar o painel de propriedades. UpdateCustomerDimension

    O painel de propriedades do fluxo de dados é exibido.

  4. Selecione Adicionar código-fonte na tela.

    O botão Adicionar fonte é realçado na tela de fluxo de dados.

  5. Em Source settings, configure as seguintes propriedades:

    • Nome do fluxo de saída: Enter SourceDB
    • Tipo de fonte: Selecione Dataset
    • Opções: Marque Allow schema drift e deixe as outras opções desmarcadas
    • Amostragem: Selecione Disable
    • Conjunto de dados: Selecione + Novo para criar um novo conjunto de dados

    O botão Novo é realçado ao lado de Conjunto de dados.

  6. Na nova caixa de diálogo do conjunto de dados de integração, selecione Azure Synapse Analytics e, em seguida, selecione Continuar.

    O Banco de Dados SQL do Azure e o botão Continuar são realçados.

  7. Nas propriedades do conjunto de dados, configure o seguinte:

    • Nome: Digite CustomerSource
    • Serviço vinculado: selecione o serviço vinculado do espaço de trabalho Synapse
    • Nome da tabela: selecione o botão Atualizar ao lado da lista suspensa

    O formulário é configurado conforme descrito e o botão de atualização é realçado.

  8. No campo Valor, insira o nome do Pool SQL e selecione OK.

    O parâmetro SQLPool01 é realçado.

  9. Selecione dbo.CustomerSource em Nome da tabela, selecione From connection/store em Importar esquema e, em seguida, selecione OK para criar o conjunto de dados.

    O formulário é preenchido como descrito.

  10. Selecione Abrir ao lado do CustomerSource conjunto de dados que você adicionou.

    O botão Abrir é realçado ao lado do novo conjunto de dados.

  11. Insira o nome do Pool SQL no campo Valor ao lado de DBName.

  12. No editor de fluxo de dados, selecione a caixa Adicionar fonte abaixo da atividade SourceDB. Configure essa fonte como a tabela DimCustomer seguindo as mesmas etapas usadas para CustomerSource.

    • Nome do fluxo de saída: Enter DimCustomer
    • Tipo de fonte: Selecione Dataset
    • Opções: Marque Allow schema drift e deixe as outras opções desmarcadas
    • Amostragem: Selecione Disable
    • Conjunto de dados: Selecione + Novo para criar um novo conjunto de dados. Use o serviço vinculado do Azure Synapse e escolha a tabela DimCustomer. Certifique-se de definir o DBName para o nome do seu Pool SQL.

    Adicionar origem, Nome do fluxo de saída e Nome do conjunto de dados são realçados nas configurações de origem.

Adicionar transformações ao fluxo de dados

  1. Selecione + à direita da SourceDB fonte na tela e, em seguida, selecione Coluna derivada.

    O botão de adição e o item de menu de coluna derivado são realçados.

  2. Em Derived column's settings, configure as seguintes propriedades:

    • Nome do fluxo de saída: Enter CreateCustomerHash
    • Fluxo de entrada: Selecione SourceDB
    • Colunas: insira o seguinte:
    Column Expression Description
    Digite HashKey sha2(256, iifNull(Title,'') +FirstName +iifNull(MiddleName,'') +LastName +iifNull(Suffix,'') +iifNull(CompanyName,'') +iifNull(SalesPerson,'') +iifNull(EmailAddress,'') +iifNull(Phone,'')) Cria um hash SHA256 dos valores da tabela. Utilizamos isto para detetar alterações na linha, ao comparar o hash dos registos de entrada com o valor hash dos registos de destino, o que corresponde ao valor CustomerID. A iifNull função substitui valores nulos por cadeias de caracteres vazias. Caso contrário, os valores de hash tendem a duplicar quando entradas nulas estão presentes.

    O formulário de configurações da coluna Derivada é configurado conforme descrito.

  3. Selecione + à direita da CreateCustomerHash coluna derivada na tela e, em seguida, selecione Existe.

    O botão de adição e o item de menu existe são realçados.

  4. Em Exists settings, configure as seguintes propriedades:

    • Nome do fluxo de saída: Enter Exists
    • Fluxo à esquerda: Selecione CreateCustomerHash
    • Fluxo à direita: Selecione SynapseDimCustomer
    • Tipo de existência: Selecione Doesn't exist
    • Existem condições: Defina o seguinte para Esquerda e Direita:
    Esquerda: coluna de CreateCustomerHash Direita: coluna SynapseDimCustomer
    HashKey HashKey

    O formulário Configurações existentes é configurado conforme descrito.

  5. Selecione + à direita de Exists na tela e, em seguida, selecione Pesquisar.

    O botão de adição e o item de menu de pesquisa são realçados.

  6. Em Lookup settings, configure as seguintes propriedades:

    • Nome do fluxo de saída: Enter LookupCustomerID
    • Fluxo primário: Selecione Exists
    • Fluxo de pesquisa: Selecione SynapseDimCustomer
    • Corresponder várias linhas: Não verificado
    • Match em: Selecione Any row
    • Condições de pesquisa: defina o seguinte para Esquerda e Direita:
    Esquerda: Coluna do existe Direita: coluna SynapseDimCustomer
    CustomerID CustomerID

    O formulário Configurações de pesquisa é configurado conforme descrito.

  7. Selecione + à direita da LookupCustomerID tela e, em seguida, selecione Coluna derivada.

    O botão de adição e o item de menu de coluna derivado são realçados.

  8. Em Derived column's settings, configure as seguintes propriedades:

    • Nome do fluxo de saída: Enter SetDates
    • Fluxo de entrada: Selecione LookupCustomerID
    • Colunas: insira o seguinte:
    Column Expression Description
    Selecione InsertedDate iif(isNull(InsertedDate), currentTimestamp(), {InsertedDate}) Se o InsertedDate valor for null, insira o carimbo de data/hora atual. Caso contrário, use o InsertedDate valor.
    Selecione ModifiedDate currentTimestamp() Sempre atualize o ModifiedDate valor com o carimbo de data/hora atual.

    O formulário de configurações de outra coluna derivada é configurado como descrito.

    Nota

    Para inserir a segunda coluna, selecione + Adicionar acima da lista Colunas e, em seguida, selecione Adicionar coluna.

  9. Selecione + à direita da SetDates etapa da coluna derivada na tela e, em seguida, selecione Alterar linha.

    O botão de adição e o item de menu alterar linha são realçados.

  10. Em Alter row settings, configure as seguintes propriedades:

    • Nome do fluxo de saída: Enter AllowUpserts
    • Fluxo de entrada: Selecione SetDates
    • Alterar condições da linha: Insira o seguinte:
    Condição Expression Description
    Selecione Upsert if true() Defina a condição como true() on para Upsert if permitir upserts. Isso garante que todos os dados que passam pelas etapas no fluxo de dados de mapeamento serão inseridos ou atualizados no coletor.

    O formulário alterar configurações de linha é configurado como descrito.

  11. Selecione + à direita da AllowUpserts etapa de alterar linha na tela e, em seguida, selecione Coletor.

    O botão de adição e o item de menu da pia são destacados.

  12. Em Sink, configure as seguintes propriedades:

    • Nome do fluxo de saída: Enter Sink
    • Fluxo de entrada: Selecione AllowUpserts
    • Tipo de lavatório: Selecione Dataset
    • Conjunto de dados: Selecione DimCustomer
    • Opções: Marque Allow schema drift e desmarque Validate schema

    O formulário de propriedades do coletor é configurado conforme descrito.

  13. Selecione a guia Configurações e configure as seguintes propriedades:

    • Método de atualização: marque Allow upsert e desmarque todas as outras opções
    • Colunas-chave: Selecione List of columnse, em seguida, selecione CustomerID na lista
    • Ação da tabela: Selecionar None
    • Ativar preparo: Não verificado

    As configurações do coletor são definidas conforme descrito.

  14. Selecione a guia Mapeamento e desmarque Mapeamento automático. Configure o mapeamento de colunas de entrada conforme descrito abaixo:

    Colunas de entrada Colunas de saída
    SourceDB@CustomerID CustomerID
    SourceDB@Title Title
    SourceDB@FirstName FirstName
    SourceDB@MiddleName MiddleName
    SourceDB@LastName LastName
    SourceDB@Suffix Suffix
    SourceDB@CompanyName CompanyName
    SourceDB@SalesPerson SalesPerson
    SourceDB@EmailAddress EmailAddress
    SourceDB@Phone Phone
    InsertedDate InsertedDate
    ModifiedDate ModifiedDate
    CreateCustomerHash@HashKey HashKey

    As configurações de mapeamento são definidas conforme descrito.

  15. O fluxo de mapeamento concluído deve ter a seguinte aparência. Selecione Publicar tudo para salvar as alterações.

    O fluxo de dados concluído é exibido e Publicar tudo é realçado.

  16. Selecione Publicar.

    O botão de publicação é realçado.

Como testar o fluxo de dados

Você concluiu um fluxo de dados SCD Tipo 1. Se você optar por testá-lo, poderá adicionar esse fluxo de dados a um pipeline de integração do Sinapse. Em seguida, você pode executar o pipeline uma vez para fazer o carregamento inicial dos dados de origem do cliente para o destino DimCustomer.

Cada execução adicional do pipeline comparará os dados na tabela de origem com o que já está na tabela de dimensões (usando a HashKey) e atualizará apenas os registros que foram alterados. Para testar isso, você pode atualizar um registro na tabela de origem e, em seguida, executar o pipeline novamente e verificar as atualizações de registro na tabela de dimensões.

Tomemos como exemplo a cliente Janet Gates. A carga inicial mostra o LastName é Gates e o CustomerId é 4.

O script é exibido com o registro inicial do cliente.

Aqui está uma instrução de exemplo que atualizaria o sobrenome do cliente na tabela de origem.

UPDATE [dbo].[CustomerSource]
SET LastName = 'Lopez'
WHERE [CustomerId] = 4

Depois de atualizar o registro e executar o pipeline novamente, DimCustomer mostraria esses dados atualizados.

O script é exibido com o registro de cliente atualizado.

O registro do cliente atualizou com êxito o LastName valor para corresponder ao registro de origem e atualizou o ModifiedDate, sem acompanhar o valor antigo LastName . Esse é o comportamento esperado para um SCD Tipo 1. Se o histórico fosse necessário para o LastName campo, você modificaria a tabela e o fluxo de dados para ser um dos outros tipos de SCD que você aprendeu.