Compartilhar via


Carregar dados do Data Warehouse para o Lakehouse incrementalmente

Neste tutorial, você aprenderá como carregar dados do Data Warehouse para o Lakehouse incrementalmente.

Visão geral

A seguir está diagrama da solução de alto nível:

Diagrama mostrando a lógica para carregar dados incrementalmente.

Aqui estão as etapas importantes ao criar essa solução:

  1. Selecione a coluna de marca-d'água. Selecione uma coluna na tabela de dados de origem, que pode ser usada para fatiar os registros novos ou atualizados para cada execução. Normalmente, os dados nessa coluna selecionada (por exemplo, ID ou last_modify_time) seguem crescendo quando linhas são criadas ou atualizadas. O valor máximo dessa coluna é usado como uma marca-d'água.

  2. Prepare uma tabela para armazenar o último valor de marca d'água no seu Data Warehouse.

  3. Criar um pipeline com o seguinte fluxo de trabalho:

    O pipeline nesta solução tem as seguintes atividades:

    • Crie duas atividades de pesquisa. Use a primeira atividade de pesquisa para recuperar o último valor de marca-d'água. Use a segunda atividade de pesquisa para recuperar o novo valor de marca-d'água. Esses valores de marca-d'água são passados para a atividade de cópia.
    • Crie uma atividade de cópia que copie linhas da tabela de dados de origem com um valor da coluna de marca-d'água maior do que o antigo valor de marca-d'água e menor do que o novo valor de marca-d'água. Em seguida, a atividade copia os dados do Data Warehouse para o Lakehouse como um arquivo novo.
    • Crie uma atividade de procedimento armazenado que atualize o último valor de marca-d'água para a próxima execução do pipeline.

Pré-requisitos

  • Data Warehouse. Você usa o Data Warehouse como o armazenamento de dados de origem. Se você não tiver um, confira Criar um Data Warehouse para se informar sobre as etapas e criá-lo.
  • Lakehouse Você usa o Lakehouse como o armazenamento de dados de destino. Se você não tiver um, confira Criar um Lakehouse para se informar sobre as etapas e criá-lo. Crie uma pasta chamada IncrementalCopy para armazenar os dados copiados.

Como preparar sua origem

Aqui estão algumas das tabelas e procedimentos armazenados que você precisa preparar no seu Data Warehouse de origem antes de configurar o pipeline de cópia incremental.

1. Criar uma tabela de dados de origem no seu Data Warehouse

Execute o comando SQL a seguir no seu Data Warehouse para criar uma tabela chamada data_source_table como a tabela de dados de origem. Neste tutorial, você a usará como uma amostra de dados para fazer a cópia incremental.

create table data_source_table
(
    PersonID int,
    Name varchar(255),
    LastModifytime DATETIME2(6)
);

INSERT INTO data_source_table
    (PersonID, Name, LastModifytime)
VALUES
    (1, 'aaaa','9/1/2017 12:56:00 AM'),
    (2, 'bbbb','9/2/2017 5:23:00 AM'),
    (3, 'cccc','9/3/2017 2:36:00 AM'),
    (4, 'dddd','9/4/2017 3:21:00 AM'),
    (5, 'eeee','9/5/2017 8:06:00 AM');

Os dados na tabela de dados de origem são mostrados abaixo:

PersonID | Name | LastModifytime
-------- | ---- | --------------
1        | aaaa | 2017-09-01 00:56:00.000
2        | bbbb | 2017-09-02 05:23:00.000
3        | cccc | 2017-09-03 02:36:00.000
4        | dddd | 2017-09-04 03:21:00.000
5        | eeee | 2017-09-05 08:06:00.000

Neste tutorial, você usa o LastModifytime como a coluna de marca-d'água.

2. Preparar outra tabela no seu Data Warehouse para armazenar o último valor de marca d'água

  1. Execute o comando SQL a seguir no seu Data Warehouse para criar uma tabela chamada watermarktable para armazenar o último valor de marca d'água:

    create table watermarktable
    (
    TableName varchar(255),
    WatermarkValue DATETIME2(6),
    );
    
  2. Defina o valor padrão da última marca d'água com o nome de tabela da tabela de dados de origem. Neste tutorial, o nome da tabela é data_source_table e o valor padrão é 1/1/2010 12:00:00 AM.

    INSERT INTO watermarktable
    VALUES ('data_source_table','1/1/2010 12:00:00 AM')    
    
  3. Examine os dados na tabela watermarktable.

    Select * from watermarktable
    

    Saída:

    TableName  | WatermarkValue
    ----------  | --------------
    data_source_table | 2010-01-01 00:00:00.000
    

3. Criar um procedimento armazenado no seu Data Warehouse

Execute o comando a seguir para criar um procedimento armazenado no seu Data Warehouse. Esse procedimento armazenado é usado para ajudar a atualizar o último valor de marca d'água após a última execução do pipeline.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

Configurar um pipeline para a cópia incremental

Etapa 1: Criar um pipeline

  1. Navegue até o Power BI.

  2. Selecione o ícone do Power BI no canto inferior esquerdo da tela e, em seguida, selecione Data factory para abrir a página inicial do Data Factory.

  3. Navegue até seu workspace do Microsoft Fabric.

  4. Selecione Pipeline de dados e insira um nome de pipeline para criar um novo pipeline.

    Captura de tela mostrando o botão Novo pipeline de dados no workspace recém-criado.

    Captura de tela mostrando o nome da criação de um novo pipeline.

Etapa 2: Adicionar uma atividade de pesquisa para a última marca d'água

Nessa etapa, você criará uma atividade de pesquisa para obter o último valor de marca d'água. O valor padrão 1/1/2010 12:00:00 AM definido antes será obtido.

  1. Selecione Adicionar atividade de pipeline e selecione Pesquisa na lista suspensa.

  2. Na guia Geral, renomeie essa atividade como LookupOldWaterMarkActivity.

  3. Na guia Configurações, execute a seguinte configuração:

    • Tipo de armazenamento de dados: selecione Workspace.
    • Tipo de armazenamento de dados do workspace: selecione Data Warehouse.
    • Data Warehouse: selecione seu Data Warehouse.
    • Usar consulta: escolha Tabela.
    • Tabela: escolha dbo.watermarktable.
    • Somente a primeira linha: selecionada.

    Captura de tela mostrando uma pesquisa da marca d'água antiga.

Etapa 3: Adicionar uma atividade de pesquisa para a marca d'água nova

Nessa etapa, você criará uma atividade de pesquisa para obter o novo valor de marca d'água. Você usará uma consulta para obter a marca d'água nova na tabela de dados de origem. O valor máximo da coluna LastModifytime na data_source_table será obtido.

  1. Na barra superior, selecione Pesquisa na guia Atividades para adicionar a segunda atividade de pesquisa.

  2. Na guia Geral, renomeie essa atividade como LookupNewWaterMarkActivity.

  3. Na guia Configurações, execute a seguinte configuração:

    • Tipo de armazenamento de dados: selecione Workspace.

    • Tipo de armazenamento de dados do workspace: selecione Data Warehouse.

    • Data Warehouse: selecione seu Data Warehouse.

    • Usar consulta: escolha Consulta.

    • Consulta: insira a consulta a seguir para escolher o valor máximo da hora da última modificação como a nova marca d'água:

      select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
      
    • Somente a primeira linha: selecionada.

    Captura de tela mostrando uma pesquisa da marca d'água nova.

Etapa 4: Adicionar a atividade de cópia para copiar dados incrementais

Nessa etapa, você adicionará uma atividade de cópia para copiar os dados incrementais entre a última marca d'água e a nova marca d'água do Data Warehouse para o Lakehouse.

  1. Selecione Atividades na barra superior e selecione Copiar dados ->Adicionar à tela para obter a atividade de cópia.

  2. Na guia Geral, renomeie essa atividade como IncrementalCopyActivity.

  3. Conecte ambas as atividades de Pesquisa à atividade de cópia arrastando o botão verde (Concluída com sucesso) anexado às atividades de pesquisa para a atividade de cópia. Solte o botão do mouse quando a cor da borda da atividade de cópia ficar verde.

    Captura de tela mostrando as atividades de pesquisa e cópia sendo conectadas.

  4. Na guia Origem, execute a seguinte configuração:

    • Tipo de armazenamento de dados: selecione Workspace.

    • Tipo de armazenamento de dados do workspace: selecione Data Warehouse.

    • Data Warehouse: selecione seu Data Warehouse.

    • Usar consulta: escolha Consulta.

    • Consulta: insira a consulta a seguir para copiar dados incrementais entre a última marca d'água e a marca d'água nova.

      select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
      

    Captura de tela mostrando a configuração de copiar origem.

  5. Na guia Destino, execute a seguinte configuração:

    • Tipo de armazenamento de dados: selecione Workspace.
    • Tipo de armazenamento de dados do workspace: selecione Lakehouse.
    • Lakehouse: selecione seu Lakehouse.
    • Pasta raiz: escolha Arquivos.
    • Caminho do arquivo: especifique a pasta na qual você quer armazenar seus dados copiados. Selecione Procurar para selecionar sua pasta. No que se refere ao nome do arquivo, abra Adicionar conteúdo dinâmico e digite @CONCAT('Incremental-', pipeline().RunId, '.txt') na janela que foi aberta para criar nomes de arquivos para o seu arquivo de dados copiados no Lakehouse.
    • Formato do arquivo: selecione o tipo de formato dos seus dados.

    Captura de tela mostrando a configuração de destino da cópia.

Etapa 5: Adicionar uma atividade de procedimento armazenado

Nessa etapa, você adicionará uma atividade de procedimento armazenado para atualizar o último valor de marca d'água para a próxima execução do pipeline.

  1. Selecione Atividades na barra superior e selecione Procedimento armazenado para adicionar uma atividade de procedimento armazenado.

  2. Na guia Geral, renomeie essa atividade como StoredProceduretoWriteWatermarkActivity.

  3. Conecte o resultado verde (Concluída com sucesso) da atividade de cópia à atividade de procedimento armazenado.

  4. Na guia Configurações, execute a seguinte configuração:

    • Tipo de armazenamento de dados: selecione Workspace.

    • Data Warehouse: selecione seu Data Warehouse.

    • Nome do procedimento armazenado: especifique o procedimento armazenado que você criou no seu Data Warehouse: [dbo].[usp_write_watermark].

    • Expanda Parâmetros de procedimento armazenado. Para especificar valores para os parâmetros de procedimento armazenado, selecione Importar e insira os seguintes valores para os parâmetros:

      Nome Tipo Valor
      LastModifiedtime Datetime @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}
      TableName String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}

    Captura de tela mostrando a configuração da atividade de procedimento armazenado.

Etapa 6: Executar o pipeline e monitorar o resultado

Na barra superior, selecione Executar na guia Página Inicial. Em seguida, selecione Salvar e executar. O pipeline começa a ser executado e você pode monitorar o pipeline na guia Resultado.

Captura de tela mostrando os resultados da execução do pipeline.

Vá para o seu Lakehouse. Você irá descobrir que o arquivo de dados está na pasta que você especificou e você pode selecionar o arquivo para visualizar os dados copiados.

Captura de tela mostrando os dados do Lakehouse para a primeira execução do pipeline.

Captura de tela mostrando a visualização dos dados do Lakehouse para a primeira execução do pipeline.

Adicionar mais dados para ver os resultados da cópia incremental

Após concluir a primeira execução do pipeline, vamos tentar adicionar mais dados à tabela de origem no seu Data Warehouse para ver se esse pipeline pode copiar seus dados incrementais.

Etapa 1: Adicionar mais dados à origem

Insira novos dados no seu Data Warehouse executando a seguinte consulta:

INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')

INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')

Os dados atualizados para a data_source_table são:

PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000

Etapa 2: Disparar outra execução de pipeline e monitorar o resultado

Volte para a sua página do pipeline. Na barra superior, selecione novamente Executar na guia Página Inicial. O pipeline começa a ser executado e você pode monitorar o pipeline na guia Resultado.

Vá para o seu Lakehouse. Você irá descobrir que o novo arquivo de dados copiados está na pasta que você especificou e você pode selecionar o arquivo para visualizar os dados copiados. Você verá que seus dados incrementais são mostrados nesse arquivo.

Captura de tela mostrando os dados do Lakehouse para a segunda execução do pipeline.

Captura de tela mostrando a visualização dos dados do Lakehouse para a segunda execução do pipeline.

Em seguida, avance para saber mais sobre a cópia do Armazenamento de Blobs do Azure para o Lakehouse.