Carregar dados do Data Warehouse para o Lakehouse incrementalmente
Neste tutorial, você aprenderá como carregar dados do Data Warehouse para o Lakehouse incrementalmente.
Visão geral
A seguir está diagrama da solução de alto nível:
Aqui estão as etapas importantes ao criar essa solução:
Selecione a coluna de marca-d'água. Selecione uma coluna na tabela de dados de origem, que pode ser usada para fatiar os registros novos ou atualizados para cada execução. Normalmente, os dados nessa coluna selecionada (por exemplo, ID ou last_modify_time) seguem crescendo quando linhas são criadas ou atualizadas. O valor máximo dessa coluna é usado como uma marca-d'água.
Prepare uma tabela para armazenar o último valor de marca d'água no seu Data Warehouse.
Criar um pipeline com o seguinte fluxo de trabalho:
O pipeline nesta solução tem as seguintes atividades:
- Crie duas atividades de pesquisa. Use a primeira atividade de pesquisa para recuperar o último valor de marca-d'água. Use a segunda atividade de pesquisa para recuperar o novo valor de marca-d'água. Esses valores de marca-d'água são passados para a atividade de cópia.
- Crie uma atividade de cópia que copie linhas da tabela de dados de origem com um valor da coluna de marca-d'água maior do que o antigo valor de marca-d'água e menor do que o novo valor de marca-d'água. Em seguida, a atividade copia os dados do Data Warehouse para o Lakehouse como um arquivo novo.
- Crie uma atividade de procedimento armazenado que atualize o último valor de marca-d'água para a próxima execução do pipeline.
Pré-requisitos
- Data Warehouse. Você usa o Data Warehouse como o armazenamento de dados de origem. Se você não tiver um, confira Criar um Data Warehouse para se informar sobre as etapas e criá-lo.
- Lakehouse Você usa o Lakehouse como o armazenamento de dados de destino. Se você não tiver um, confira Criar um Lakehouse para se informar sobre as etapas e criá-lo. Crie uma pasta chamada IncrementalCopy para armazenar os dados copiados.
Como preparar sua origem
Aqui estão algumas das tabelas e procedimentos armazenados que você precisa preparar no seu Data Warehouse de origem antes de configurar o pipeline de cópia incremental.
1. Criar uma tabela de dados de origem no seu Data Warehouse
Execute o comando SQL a seguir no seu Data Warehouse para criar uma tabela chamada data_source_table como a tabela de dados de origem. Neste tutorial, você a usará como uma amostra de dados para fazer a cópia incremental.
create table data_source_table
(
PersonID int,
Name varchar(255),
LastModifytime DATETIME2(6)
);
INSERT INTO data_source_table
(PersonID, Name, LastModifytime)
VALUES
(1, 'aaaa','9/1/2017 12:56:00 AM'),
(2, 'bbbb','9/2/2017 5:23:00 AM'),
(3, 'cccc','9/3/2017 2:36:00 AM'),
(4, 'dddd','9/4/2017 3:21:00 AM'),
(5, 'eeee','9/5/2017 8:06:00 AM');
Os dados na tabela de dados de origem são mostrados abaixo:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
Neste tutorial, você usa o LastModifytime como a coluna de marca-d'água.
2. Preparar outra tabela no seu Data Warehouse para armazenar o último valor de marca d'água
Execute o comando SQL a seguir no seu Data Warehouse para criar uma tabela chamada watermarktable para armazenar o último valor de marca d'água:
create table watermarktable ( TableName varchar(255), WatermarkValue DATETIME2(6), );
Defina o valor padrão da última marca d'água com o nome de tabela da tabela de dados de origem. Neste tutorial, o nome da tabela é data_source_table e o valor padrão é
1/1/2010 12:00:00 AM
.INSERT INTO watermarktable VALUES ('data_source_table','1/1/2010 12:00:00 AM')
Examine os dados na tabela watermarktable.
Select * from watermarktable
Saída:
TableName | WatermarkValue ---------- | -------------- data_source_table | 2010-01-01 00:00:00.000
3. Criar um procedimento armazenado no seu Data Warehouse
Execute o comando a seguir para criar um procedimento armazenado no seu Data Warehouse. Esse procedimento armazenado é usado para ajudar a atualizar o último valor de marca d'água após a última execução do pipeline.
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
Configurar um pipeline para a cópia incremental
Etapa 1: Criar um pipeline
Navegue até o Power BI.
Selecione o ícone do Power BI no canto inferior esquerdo da tela e, em seguida, selecione Data factory para abrir a página inicial do Data Factory.
Navegue até seu workspace do Microsoft Fabric.
Selecione Pipeline de dados e insira um nome de pipeline para criar um novo pipeline.
Etapa 2: Adicionar uma atividade de pesquisa para a última marca d'água
Nessa etapa, você criará uma atividade de pesquisa para obter o último valor de marca d'água. O valor padrão 1/1/2010 12:00:00 AM
definido antes será obtido.
Selecione Adicionar atividade de pipeline e selecione Pesquisa na lista suspensa.
Na guia Geral, renomeie essa atividade como LookupOldWaterMarkActivity.
Na guia Configurações, execute a seguinte configuração:
- Tipo de armazenamento de dados: selecione Workspace.
- Tipo de armazenamento de dados do workspace: selecione Data Warehouse.
- Data Warehouse: selecione seu Data Warehouse.
- Usar consulta: escolha Tabela.
- Tabela: escolha dbo.watermarktable.
- Somente a primeira linha: selecionada.
Etapa 3: Adicionar uma atividade de pesquisa para a marca d'água nova
Nessa etapa, você criará uma atividade de pesquisa para obter o novo valor de marca d'água. Você usará uma consulta para obter a marca d'água nova na tabela de dados de origem. O valor máximo da coluna LastModifytime na data_source_table será obtido.
Na barra superior, selecione Pesquisa na guia Atividades para adicionar a segunda atividade de pesquisa.
Na guia Geral, renomeie essa atividade como LookupNewWaterMarkActivity.
Na guia Configurações, execute a seguinte configuração:
Tipo de armazenamento de dados: selecione Workspace.
Tipo de armazenamento de dados do workspace: selecione Data Warehouse.
Data Warehouse: selecione seu Data Warehouse.
Usar consulta: escolha Consulta.
Consulta: insira a consulta a seguir para escolher o valor máximo da hora da última modificação como a nova marca d'água:
select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
Somente a primeira linha: selecionada.
Etapa 4: Adicionar a atividade de cópia para copiar dados incrementais
Nessa etapa, você adicionará uma atividade de cópia para copiar os dados incrementais entre a última marca d'água e a nova marca d'água do Data Warehouse para o Lakehouse.
Selecione Atividades na barra superior e selecione Copiar dados ->Adicionar à tela para obter a atividade de cópia.
Na guia Geral, renomeie essa atividade como IncrementalCopyActivity.
Conecte ambas as atividades de Pesquisa à atividade de cópia arrastando o botão verde (Concluída com sucesso) anexado às atividades de pesquisa para a atividade de cópia. Solte o botão do mouse quando a cor da borda da atividade de cópia ficar verde.
Na guia Origem, execute a seguinte configuração:
Tipo de armazenamento de dados: selecione Workspace.
Tipo de armazenamento de dados do workspace: selecione Data Warehouse.
Data Warehouse: selecione seu Data Warehouse.
Usar consulta: escolha Consulta.
Consulta: insira a consulta a seguir para copiar dados incrementais entre a última marca d'água e a marca d'água nova.
select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
Na guia Destino, execute a seguinte configuração:
- Tipo de armazenamento de dados: selecione Workspace.
- Tipo de armazenamento de dados do workspace: selecione Lakehouse.
- Lakehouse: selecione seu Lakehouse.
- Pasta raiz: escolha Arquivos.
- Caminho do arquivo: especifique a pasta na qual você quer armazenar seus dados copiados. Selecione Procurar para selecionar sua pasta. No que se refere ao nome do arquivo, abra Adicionar conteúdo dinâmico e digite
@CONCAT('Incremental-', pipeline().RunId, '.txt')
na janela que foi aberta para criar nomes de arquivos para o seu arquivo de dados copiados no Lakehouse. - Formato do arquivo: selecione o tipo de formato dos seus dados.
Etapa 5: Adicionar uma atividade de procedimento armazenado
Nessa etapa, você adicionará uma atividade de procedimento armazenado para atualizar o último valor de marca d'água para a próxima execução do pipeline.
Selecione Atividades na barra superior e selecione Procedimento armazenado para adicionar uma atividade de procedimento armazenado.
Na guia Geral, renomeie essa atividade como StoredProceduretoWriteWatermarkActivity.
Conecte o resultado verde (Concluída com sucesso) da atividade de cópia à atividade de procedimento armazenado.
Na guia Configurações, execute a seguinte configuração:
Tipo de armazenamento de dados: selecione Workspace.
Data Warehouse: selecione seu Data Warehouse.
Nome do procedimento armazenado: especifique o procedimento armazenado que você criou no seu Data Warehouse: [dbo].[usp_write_watermark].
Expanda Parâmetros de procedimento armazenado. Para especificar valores para os parâmetros de procedimento armazenado, selecione Importar e insira os seguintes valores para os parâmetros:
Nome Tipo Valor LastModifiedtime Datetime @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue} TableName String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}
Etapa 6: Executar o pipeline e monitorar o resultado
Na barra superior, selecione Executar na guia Página Inicial. Em seguida, selecione Salvar e executar. O pipeline começa a ser executado e você pode monitorar o pipeline na guia Resultado.
Vá para o seu Lakehouse. Você irá descobrir que o arquivo de dados está na pasta que você especificou e você pode selecionar o arquivo para visualizar os dados copiados.
Adicionar mais dados para ver os resultados da cópia incremental
Após concluir a primeira execução do pipeline, vamos tentar adicionar mais dados à tabela de origem no seu Data Warehouse para ver se esse pipeline pode copiar seus dados incrementais.
Etapa 1: Adicionar mais dados à origem
Insira novos dados no seu Data Warehouse executando a seguinte consulta:
INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
Os dados atualizados para a data_source_table são:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000
Etapa 2: Disparar outra execução de pipeline e monitorar o resultado
Volte para a sua página do pipeline. Na barra superior, selecione novamente Executar na guia Página Inicial. O pipeline começa a ser executado e você pode monitorar o pipeline na guia Resultado.
Vá para o seu Lakehouse. Você irá descobrir que o novo arquivo de dados copiados está na pasta que você especificou e você pode selecionar o arquivo para visualizar os dados copiados. Você verá que seus dados incrementais são mostrados nesse arquivo.
Conteúdo relacionado
Em seguida, avance para saber mais sobre a cópia do Armazenamento de Blobs do Azure para o Lakehouse.