Partilhar via


Carregue dados incrementalmente de várias tabelas no SQL Server para um banco de dados no Banco de Dados SQL do Azure usando o portal do Azure

APLICA-SE A: Azure Data Factory Azure Synapse Analytics

Gorjeta

Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange tudo, desde a movimentação de dados até ciência de dados, análises em tempo real, business intelligence e relatórios. Saiba como iniciar uma nova avaliação gratuitamente!

Neste tutorial, você cria um Azure Data Factory com um pipeline que carrega dados delta de várias tabelas em um banco de dados do SQL Server para um banco de dados no Banco de Dados SQL do Azure.

Vai executar os seguintes passos neste tutorial:

  • Prepare os arquivos de dados de origem e de destino.
  • Criar uma fábrica de dados.
  • Criar um integration runtime autoalojado.
  • Instalar o integration runtime.
  • Criar serviços ligados.
  • Crie conjuntos de dados de origem, de sink e de marca d'água.
  • Criar, executar e monitorizar um pipeline.
  • Reveja os resultados.
  • Adicionou ou atualizou os dados nas tabelas de origem.
  • Voltou a executar e a monitorizar o pipeline.
  • Reviu os resultados finais.

Descrição geral

Eis os passos importantes para criar esta solução:

  1. Selecionar a coluna de limite de tamanho.

    Selecione uma coluna por cada tabela no arquivo de dados de origem, que pode ser utilizada para identificar os registos novos ou atualizados para cada execução. Normalmente, os dados nesta coluna selecionada (por exemplo, last_modify_time ou ID) continuam a aumentar quando as linhas são criadas ou atualizadas. O valor máximo nesta coluna é utilizado como limite de tamanho.

  2. Preparar um arquivo de dados para armazenar o valor de limite de tamanho.

    Neste tutorial, vai armazenar o valor de marca d'água numa base de dados SQL.

  3. Criar um pipeline com as seguintes atividades:

    a. Criar uma atividade ForEach que itera através de uma lista de nomes de tabelas de origem que é transmitida como um parâmetro para o pipeline. Para cada tabela de origem, este invoca as seguintes atividades para efetuar o carregamento de diferenças para essa tabela.

    b. Criar duas atividades de pesquisa. Utilize a primeira atividade Pesquisa para obter o último valor de limite de tamanho. Utilize a segunda para obter o valor de limite de tamanho novo. Estes valores de limite de tamanho são transmitidos para a atividade Copy.

    c. Criar uma atividade Cópia que copia linhas do arquivo de dados de origem com o valor da coluna de limite de tamanho superior ao valor de limite de tamanho antigo e inferior ao valor novo. Em seguida, copia os dados delta do arquivo de dados de origem para o armazenamento de Blobs do Azure como um ficheiro novo.

    d. Crie uma atividade StoredProcedure, que atualiza o valor de marca d'água do pipeline que vai ser executado da próxima vez.

    Eis o diagrama de nível elevado da solução:

    Carregar dados de forma incremental

Se não tiver uma subscrição do Azure, crie uma conta gratuita antes de começar.

Pré-requisitos

  • SQL Server. Você usa um banco de dados do SQL Server como o armazenamento de dados de origem neste tutorial.
  • Base de Dados SQL do Azure. Você usa um banco de dados no Banco de Dados SQL do Azure como o armazenamento de dados do coletor. Se você não tiver um banco de dados no Banco de Dados SQL, consulte Criar um banco de dados no Banco de Dados SQL do Azure para conhecer as etapas para criar um.

Criar tabelas de origem na base de dados do SQL Server

  1. Abra o SQL Server Management Studio e ligue-se à base de dados do SQL Server.

  2. No Explorador de Servidores, clique com botão direito do rato na base de dados e escolha Nova Consulta.

  3. Execute o seguinte comando SQL na base de dados para criar tabelas com o nome customer_table e project_table:

    create table customer_table
    (
        PersonID int,
        Name varchar(255),
        LastModifytime datetime
    );
    
    create table project_table
    (
        Project varchar(255),
        Creationtime datetime
    );
    
    INSERT INTO customer_table
    (PersonID, Name, LastModifytime)
    VALUES
    (1, 'John','9/1/2017 12:56:00 AM'),
    (2, 'Mike','9/2/2017 5:23:00 AM'),
    (3, 'Alice','9/3/2017 2:36:00 AM'),
    (4, 'Andy','9/4/2017 3:21:00 AM'),
    (5, 'Anny','9/5/2017 8:06:00 AM');
    
    INSERT INTO project_table
    (Project, Creationtime)
    VALUES
    ('project1','1/1/2015 0:00:00 AM'),
    ('project2','2/2/2016 1:23:00 AM'),
    ('project3','3/4/2017 5:16:00 AM');
    
    

Criar tabelas de destino em seu banco de dados

  1. Abra o SQL Server Management Studio e conecte-se ao seu banco de dados no Banco de Dados SQL do Azure.

  2. No Explorador de Servidores, clique com botão direito do rato na base de dados e escolha Nova Consulta.

  3. Execute o seguinte comando SQL na base de dados para criar tabelas com o nome customer_table e project_table:

    create table customer_table
    (
        PersonID int,
        Name varchar(255),
        LastModifytime datetime
    );
    
    create table project_table
    (
        Project varchar(255),
        Creationtime datetime
    );
    
    

Crie outra tabela em seu banco de dados para armazenar o alto valor da marca d'água

  1. Execute o seguinte comando SQL em seu banco de dados para criar uma tabela nomeada watermarktable para armazenar o valor da marca d'água:

    create table watermarktable
    (
    
        TableName varchar(255),
        WatermarkValue datetime,
    );
    
  2. Inserir valores de marca d'água iniciais para ambas as tabelas de origem na tabela de marca d'água.

    
    INSERT INTO watermarktable
    VALUES 
    ('customer_table','1/1/2010 12:00:00 AM'),
    ('project_table','1/1/2010 12:00:00 AM');
    
    

Criar um procedimento armazenado em seu banco de dados

Execute o seguinte comando para criar um procedimento armazenado em seu banco de dados. Este procedimento armazenado atualiza o valor de limite de tamanho após cada execução de pipeline.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime 
WHERE [TableName] = @TableName

END

Criar tipos de dados e procedimentos armazenados adicionais em seu banco de dados

Execute a consulta a seguir para criar dois procedimentos armazenados e dois tipos de dados em seu banco de dados. São utilizados para intercalar os dados das tabelas de origem nas tabelas de destino.

Para facilitar o início da jornada, usamos diretamente esses Procedimentos Armazenados passando os dados delta por meio de uma variável de tabela e, em seguida, mesclando-os no armazenamento de destino. Tenha cuidado para não esperar que um número "grande" de linhas delta (mais de 100) seja armazenado na variável de tabela.

Se você precisar mesclar um grande número de linhas delta no repositório de destino, sugerimos que você use a atividade de cópia para copiar todos os dados delta em uma tabela temporária de "preparação" no repositório de destino primeiro e, em seguida, crie seu próprio procedimento armazenado sem usar a variável de tabela para mesclá-los da tabela de "preparação" para a tabela "final".

CREATE TYPE DataTypeforCustomerTable AS TABLE(
    PersonID int,
    Name varchar(255),
    LastModifytime datetime
);

GO

CREATE PROCEDURE usp_upsert_customer_table @customer_table DataTypeforCustomerTable READONLY
AS

BEGIN
  MERGE customer_table AS target
  USING @customer_table AS source
  ON (target.PersonID = source.PersonID)
  WHEN MATCHED THEN
      UPDATE SET Name = source.Name,LastModifytime = source.LastModifytime
  WHEN NOT MATCHED THEN
      INSERT (PersonID, Name, LastModifytime)
      VALUES (source.PersonID, source.Name, source.LastModifytime);
END

GO

CREATE TYPE DataTypeforProjectTable AS TABLE(
    Project varchar(255),
    Creationtime datetime
);

GO

CREATE PROCEDURE usp_upsert_project_table @project_table DataTypeforProjectTable READONLY
AS

BEGIN
  MERGE project_table AS target
  USING @project_table AS source
  ON (target.Project = source.Project)
  WHEN MATCHED THEN
      UPDATE SET Creationtime = source.Creationtime
  WHEN NOT MATCHED THEN
      INSERT (Project, Creationtime)
      VALUES (source.Project, source.Creationtime);
END

Criar uma fábrica de dados

  1. Abra o browser Microsoft Edge ou Google Chrome. Atualmente, a IU do Data Factory é suportada apenas nos browsers Microsoft Edge e Google Chrome.

  2. No menu à esquerda, selecione Criar um recurso>Integration>Data Factory:

    Seleção do Data Factory no

  3. Na página Nova fábrica de dados, introduza ADFMultiIncCopyTutorialDF em nome.

    O nome do Azure Data Factory deve ser globalmente exclusivo. Se vir um ponto de exclamação vermelho com o seguinte erro, altere o nome da fábrica de dados (por exemplo, oseunomeADFIncCopyTutorialDF) e tente criá-la novamente. Veja o artigo Data Factory – Naming Rules (Data Factory – Regras de Nomenclatura) para obter as regras de nomenclatura dos artefactos do Data Factory.

    Data factory name "ADFIncCopyTutorialDF" is not available

  4. Selecione a sua subscrição do Azure na qual pretende criar a fábrica de dados.

  5. No Grupo de Recursos, siga um destes passos:

  6. Selecione V2 para a versão.

  7. Selecione a localização da fábrica de dados. Só aparecem na lista pendente as localizações que são suportadas. Os arquivos de dados (Armazenamento do Azure, Base de Dados SQL do Azure, etc.) e as computações (HDInsight, etc.) utilizados pela fábrica de dados podem estar noutras regiões.

  8. Clique em Criar.

  9. Depois de concluída a criação, vai ver a página Data Factory, conforme mostrado na imagem.

    Home page para o Azure Data Factory, com o bloco Open Azure Data Factory Studio.

  10. Selecione Abrir no bloco Abrir o Estúdio do Azure Data Factory para iniciar a interface do usuário (UI) do Azure Data Factory em uma guia separada.

Criar um integration runtime autoalojado

Como está a mover dados de um arquivo de dados numa rede privada (no local) para um arquivo de dados do Azure, instale um runtime de integração autoalojado (IR) no seu ambiente no local. O IR autoalojado move os dados entre a sua rede privada e o Azure.

  1. Na home page da interface do usuário do Azure Data Factory, selecione a guia Gerenciar no painel mais à esquerda.

    O botão Gerenciar da página inicial

  2. Selecione Tempos de execução de integração no painel esquerdo e, em seguida, selecione +Novo.

    Criar um integration runtime

  3. Na janela Configuração do Integration Runtime, selecione Executar movimentação de dados e enviar atividades para cálculos externos e clique em Continuar.

  4. Selecione Auto-Hospedado e clique em Continuar.

  5. Digite MySelfHostedIR para Nome e clique em Criar.

  6. Clique em Clique aqui para iniciar a configuração rápida neste computador, na secção Opção 1: Configuração Rápida.

    Clicar na ligação de Configuração Rápida

  7. Na janela Configuração Rápida do Integration Runtime (Autoalojado), clique em Fechar.

    Configuração do runtime de integração - êxito

  8. No browser, na janela Configuração do Runtime de Integração, clique em Concluir.

  9. Confirme que vê MySelfHostedIR na lista de runtimes de integração.

Criar serviços ligados

Os serviços ligados são criados numa fábrica de dados para ligar os seus arquivos de dados e serviços de computação a essa fábrica de dados. Nesta seção, você cria serviços vinculados ao seu banco de dados do SQL Server e ao seu banco de dados no Banco de Dados SQL do Azure.

Criar o serviço ligado do SQL Server

Nesta etapa, você vincula seu banco de dados do SQL Server ao data factory.

  1. Na janela Ligações, mude do separador Runtimes de Integração para o separador Serviços Ligados e clique em + Novo.

    Novo serviço vinculado.

  2. Na janela Novo Serviço Ligado, selecione SQL Server e clique em Continuar.

  3. Na janela Novo Serviço Ligado, siga os passos abaixo:

    1. Introduza SqlServerLinkedService em Nome.
    2. Selecione MySelfHostedIR em Ligar através do runtime de integração. Este é um passo importante. O runtime de integração predefinido não consegue ligar a um arquivo de dados no local. Utilize o runtime de integração autoalojado que criou anteriormente.
    3. Em Nome do servidor, introduza o nome do computador que tem a base de dados do SQL Server.
    4. Em Nome da base de dados, introduza o nome da base de dados no SQL Server que tem a origem de dados. Criou uma tabela e inseriu dados nesta base de dados como parte dos pré-requisitos.
    5. Em Tipo de autenticação, selecione o tipo de autenticação que pretende utilizar para ligar à base de dados.
    6. Em Nome do utilizador, introduza o nome do utilizador que tem acesso à base de dados do SQL Server. Se precisar de utilizar um caráter de barra invertida (\) no nome da sua conta de utilizador ou no nome do seu servidor, utilize o caráter de escape (\). Um exemplo é mydomain\\myuser.
    7. Em Palavra-passe,introduza a palavra-passe do utilizador.
    8. Para testar se o Data Factory consegue ligar à base de dados do SQL Server, clique em Testar ligação. Corrija os erros até que a ligação seja bem-sucedida.
    9. Para salvar o serviço vinculado, clique em Concluir.

Criar o serviço ligado da Base de Dados SQL do Azure

Neste último passo, vai criar um serviço ligado para ligar a sua base de dados do SQL Server à fábrica de dados. Nesta etapa, você vincula seu banco de dados de destino/coletor ao data factory.

  1. Na janela Ligações, mude do separador Runtimes de Integração para o separador Serviços Ligados e clique em + Novo.

  2. Na janela Novo Serviço Ligado, selecione Base de Dados SQL do Azure e clique em Continuar.

  3. Na janela Novo Serviço Ligado, siga os passos abaixo:

    1. Introduza AzureSqlDatabaseLinkedService em Nome.
    2. Em Nome do servidor, selecione o nome do servidor na lista suspensa.
    3. Em Nome do banco de dados, selecione o banco de dados no qual você criou customer_table e project_table como parte dos pré-requisitos.
    4. Em Nome de usuário, insira o nome do usuário que tem acesso ao banco de dados.
    5. Em Palavra-passe,introduza a palavra-passe do utilizador.
    6. Para testar se o Data Factory consegue ligar à base de dados do SQL Server, clique em Testar ligação. Corrija os erros até que a ligação seja bem-sucedida.
    7. Para salvar o serviço vinculado, clique em Concluir.
  4. Verifique se vê dois serviços ligados na lista.

    Dois serviços ligados

Criar conjuntos de dados

Neste passo, vai criar conjuntos de dados para representar a origem de dados, o destino de dados e o local para armazenar o limite de tamanho.

Criar um conjunto de dados de origem

  1. No painel esquerdo, clique em + (mais) e clique em Conjunto de Dados.

  2. Na janela Novo Conjunto de Dados, selecione SQL Server, clique em Continuar.

  3. Verá um novo separador aberto no browser para configurar o conjunto de dados. Você também vê um conjunto de dados na exibição em árvore. No separador Geral da janela Propriedades ao fundo, introduza SourceDataset em Nome.

  4. Mude para o separador Ligação, na janela Propriedades e clique emSqlServerLinkedService em Serviço ligado. Não selecione uma tabela aqui. A atividade Cópia no pipeline utiliza uma consulta SQL para carregar os dados em vez de carregar a tabela inteira.

    Conjunto de dados de origem - ligação

Criar um conjunto de dados de sink

  1. No painel esquerdo, clique em + (mais) e clique em Conjunto de Dados.

  2. Na janela Novo Conjunto de Dados, selecione Banco de Dados SQL do Azure e clique em Continuar.

  3. Verá um novo separador aberto no browser para configurar o conjunto de dados. Você também vê um conjunto de dados na exibição em árvore. No separador Geral da janela Propriedades ao fundo, introduza SinkDataset em Nome.

  4. Mude para o separador Parâmetros da janela Propriedades, e siga os seguintes abaixo:

    1. Clique em Novo na secção Criar/atualizar parâmetros.

    2. Introduza SinkTableName em nome e Cadeia em tipo. Este conjunto de dados assume SinkTableName como um parâmetro. O parâmetro SinkTableName é definido pelo pipeline dinamicamente durante o runtime. A atividade ForEach no pipeline itera através de uma lista de nomes de tabelas e transmite o nome da tabela para este conjunto de dados em cada iteração.

      Conjunto de Dados de Sink - propriedades

  5. Alterne para a guia Conexão na janela Propriedades e selecione AzureSqlDatabaseLinkedService para serviço vinculado. Na propriedade Tabela, clique em Adicionar conteúdo dinâmico.

  6. Na janela Adicionar conteúdo dinâmico, selecione SinkTableName na seção Parâmetros.

  7. Depois de clicar em Concluir, você verá "@dataset(). SinkTableName" como o nome da tabela.

    Conjunto de Dados de Sink - ligação

Criar um conjunto de dados para uma marca d'água

Neste passo, vai criar um conjunto de dados para armazenar um valor de limite superior de tamanho.

  1. No painel esquerdo, clique em + (mais) e clique em Conjunto de Dados.

  2. Na janela Novo Conjunto de Dados, selecione Banco de Dados SQL do Azure e clique em Continuar.

  3. No separador Geral da janela Propriedades ao fundo, introduza WatermarkDataset em Nome.

  4. Mude para o separador Ligação e siga os passos abaixo:

    1. Selecione AzureSqlDatabaseLinkedService em Serviço ligado.

    2. Selecione [dbo].[watermarktable] em Tabela.

      Conjunto de Dados de Marca d'água - ligação

Criar um pipeline

O pipeline aceita uma lista de nomes de tabela como parâmetro. A atividade ForEach itera através da lista de nomes de tabelas e realiza as seguintes operações:

  1. Utilize a atividade Pesquisa para obter o valor do limite de tamanho antigo (o valor inicial ou o que foi utilizado na última iteração).

  2. Utilize a atividade Pesquisa para obter o valor do limite de tamanho novo (o valor máximo da coluna de limites de tamanho na tabela de origem).

  3. Utilize a atividade Cópia para copiar dados entre estes dois valores de limite de tamanho da base de dados de origem para a base de dados de destino.

  4. Utilize a atividade StoredProcedure para atualizar o valor de limite de tamanho antigo a ser utilizado no primeiro passo da iteração seguinte.

Criar o pipeline

  1. No painel do lado esquerdo, clique em + (mais) e clique em Pipeline.

  2. No painel Geral, em Propriedades, especifique IncrementalCopyPipeline para Name. Em seguida, feche o painel clicando no ícone Propriedades no canto superior direito.

  3. Na guia Parâmetros, execute as seguintes etapas:

    1. Clique em + Novo.
    2. Introduza tableList no parâmetro Nome.
    3. Selecione Array para o tipo de parâmetro.
  4. Na caixa de ferramentas Atividades, expanda Iteração e Condições e arraste e largue a atividade ForEach na superfície de estruturador do pipeline. No separador Geral da janela Propriedades, introduza IterateSQLTables.

  5. Mude para o separador Definições e introduza @pipeline().parameters.tableList em itens. A atividade ForEach itera através da lista de tabelas e realiza a operação de cópia incremental.

    Atividade ForEach - definições

  6. Selecione a atividade ForEach no pipeline, se ainda não estiver selecionada. Clique no botão Editar (Ícone de lápis).

  7. Na caixa de ferramentas Atividades, expanda Geral, arraste e largue a atividade Lookup na superfície de estruturador do pipeline e introduza LookupOldWaterMarkActivity em Nome.

  8. Mude para o separador Definições da janela Propriedades e siga os passos seguintes:

    1. Selecione WatermarkDataset em Conjunto de Dados de Origem.

    2. Selecione Consulta em Utilize Consulta.

    3. Introduza a seguinte consulta SQL em consulta.

      select * from watermarktable where TableName  =  '@{item().TABLE_NAME}'
      

      Primeira Atividade Pesquisar - definições

  9. Arraste e largue a atividade Pesquisar da caixa de ferramentas Atividades e introduza LookupNewWaterMarkActivity em Nome.

  10. Mudar para o separador Definições.

    1. Selecione SourceDataset em Conjunto de Dados de Origem.

    2. Selecione Consulta em Utilize Consulta.

    3. Introduza a seguinte consulta SQL em consulta.

      select MAX(@{item().WaterMark_Column}) as NewWatermarkvalue from @{item().TABLE_NAME}
      

      Segunda Atividade Pesquisar - definições

  11. Arraste e largue a atividade Copiar da caixa de ferramentas Atividades e introduza IncrementalCopyActivity em Nome.

  12. Ligue as atividades Pesquisar à atividade Copiar, uma a uma. Para ligar, comece a arrastar a caixa verde anexada à atividade Pesquisar e largue-a na atividade Copiar. Largue o botão do rato quando a cor do limite da atividade Copiar mudar para azul.

    Ligar atividades Pesquisar à atividade Copiar

  13. Selecione a atividade Copiar no pipeline. Mude para o separador Origem na janela Propriedades.

    1. Selecione SourceDataset em Conjunto de Dados de Origem.

    2. Selecione Consulta em Utilize Consulta.

    3. Introduza a seguinte consulta SQL em consulta.

      select * from @{item().TABLE_NAME} where @{item().WaterMark_Column} > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and @{item().WaterMark_Column} <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'        
      

      Atividade Copy - definições da origem

  14. Mude para o separador Sink e selecione SinkDataset em Conjunto de Dados de Sink.

  15. Efetue os seguintes passos:

    1. Nas propriedades do conjunto de dados, para o parâmetro SinkTableName, digite @{item().TABLE_NAME}.

    2. Para a propriedade Nome do Procedimento Armazenado, digite @{item().StoredProcedureNameForMergeOperation}.

    3. Para a propriedade Tipo de tabela, digite @{item().TableType}.

    4. Para Nome do parâmetro Tipo de tabela, digite @{item().TABLE_NAME}.

      Atividade Copiar - parâmetros

  16. Arraste e largue a atividade Stored Procedure da caixa de ferramentas Atividades para a superfície de desenho do pipeline. Ligue a atividade Copiar à atividade Procedimento Armazenado.

  17. Selecione a atividade Procedimento Armazenado no pipeline e introduza StoredProceduretoWriteWatermarkActivity em Nome, no separador Geral da janela Propriedades.

  18. Mude para o separador Conta do SQL e selecione AzureSqlDatabaseLinkedService em Serviço Ligado.

    Atividade Stored Procedure - Conta do SQL

  19. Mude para o separador Procedimento Armazenado e siga os passos abaixo:

    1. Para Nome do procedimento armazenado , selecione [dbo].[usp_write_watermark].

    2. Selecione Parâmetro de importação.

    3. Especifique os seguintes valores para os parâmetros:

      Nome Tipo valor
      LastModifiedtime DateTime @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}
      TableName String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}

      Atividade de procedimento armazenado - definições do procedimento armazenado

  20. Selecione Publicar tudo para publicar as entidades criadas no serviço Data Factory.

  21. Aguarde até ver a mensagem Publicação com êxito. Para ver as notificações, clique na ligação Mostrar Notificações. Clique em X para fechar a janela de notificações.

Executar o pipeline

  1. Na barra de ferramentas do pipeline, clique em Adicionar gatilho e clique em Disparar Agora.

  2. Na janela Execução de Pipeline, introduza o seguinte valor no parâmetro tableList e clique em Concluir.

    [
        {
            "TABLE_NAME": "customer_table",
            "WaterMark_Column": "LastModifytime",
            "TableType": "DataTypeforCustomerTable",
            "StoredProcedureNameForMergeOperation": "usp_upsert_customer_table"
        },
        {
            "TABLE_NAME": "project_table",
            "WaterMark_Column": "Creationtime",
            "TableType": "DataTypeforProjectTable",
            "StoredProcedureNameForMergeOperation": "usp_upsert_project_table"
        }
    ]
    

    Argumentos de Execução de Pipeline

Monitorizar o pipeline

  1. Mude para o separador Monitorizar, no lado esquerdo. Irá ver o estado da execução do pipeline acionada pelo acionador manual. Você pode usar links na coluna NOME DO PIPELINE para exibir detalhes da atividade e executar novamente o pipeline.

  2. Para ver as execuções de atividade associadas à execução do pipeline, selecione o link na coluna NOME DO PIPELINE. Para obter detalhes sobre a atividade executada, selecione o link Detalhes (ícone de óculos) na coluna NOME DA ATIVIDADE.

  3. Selecione Todas as execuções de pipeline na parte superior para voltar à visualização Execuções de pipeline. Para atualizar a vista, selecione Atualizar.

Rever os resultados

No SQL Server Management Studio, execute as seguintes consultas na base de dados SQL de destino para verificar que os dados foram copiados das tabelas de origem para as tabelas de destino.

Consulta

select * from customer_table

Saída

===========================================
PersonID	Name	LastModifytime
===========================================
1	        John	2017-09-01 00:56:00.000
2	        Mike	2017-09-02 05:23:00.000
3	        Alice	2017-09-03 02:36:00.000
4	        Andy	2017-09-04 03:21:00.000
5	        Anny	2017-09-05 08:06:00.000

Consulta

select * from project_table

Saída

===================================
Project	    Creationtime
===================================
project1	2015-01-01 00:00:00.000
project2	2016-02-02 01:23:00.000
project3	2017-03-04 05:16:00.000

Consulta

select * from watermarktable

Saída

======================================
TableName	    WatermarkValue
======================================
customer_table	2017-09-05 08:06:00.000
project_table	2017-03-04 05:16:00.000

Tenha em atenção que os valores de limite de tamanho de ambas as tabelas foram atualizados.

Adicione mais dados às tabelas de origem

Execute a seguinte consulta na base de dados do SQL Server de origem para atualizar uma linha existente em customer_table. Insira uma linha nova em project_table.

UPDATE customer_table
SET [LastModifytime] = '2017-09-08T00:00:00Z', [name]='NewName' where [PersonID] = 3

INSERT INTO project_table
(Project, Creationtime)
VALUES
('NewProject','10/1/2017 0:00:00 AM');

Volte a executar o pipeline

  1. Na janela do browser, mude para o separador Editar no lado esquerdo.

  2. Na barra de ferramentas do pipeline, clique em Adicionar gatilho e clique em Disparar Agora.

  3. Na janela Execução de Pipeline, introduza o seguinte valor no parâmetro tableList e clique em Concluir.

    [
        {
            "TABLE_NAME": "customer_table",
            "WaterMark_Column": "LastModifytime",
            "TableType": "DataTypeforCustomerTable",
            "StoredProcedureNameForMergeOperation": "usp_upsert_customer_table"
        },
        {
            "TABLE_NAME": "project_table",
            "WaterMark_Column": "Creationtime",
            "TableType": "DataTypeforProjectTable",
            "StoredProcedureNameForMergeOperation": "usp_upsert_project_table"
        }
    ]
    

Monitorizar o pipeline novamente

  1. Mude para o separador Monitorizar, no lado esquerdo. Irá ver o estado da execução do pipeline acionada pelo acionador manual. Você pode usar links na coluna NOME DO PIPELINE para exibir detalhes da atividade e executar novamente o pipeline.

  2. Para ver as execuções de atividade associadas à execução do pipeline, selecione o link na coluna NOME DO PIPELINE. Para obter detalhes sobre a atividade executada, selecione o link Detalhes (ícone de óculos) na coluna NOME DA ATIVIDADE.

  3. Selecione Todas as execuções de pipeline na parte superior para voltar à visualização Execuções de pipeline. Para atualizar a vista, selecione Atualizar.

Rever os resultados finais

No SQL Server Management Studio, execute as seguintes consultas no banco de dados SQL de destino para verificar se os dados atualizados/novos foram copiados das tabelas de origem para as tabelas de destino.

Consulta

select * from customer_table

Saída

===========================================
PersonID	Name	LastModifytime
===========================================
1	        John	2017-09-01 00:56:00.000
2	        Mike	2017-09-02 05:23:00.000
3	        NewName	2017-09-08 00:00:00.000
4	        Andy	2017-09-04 03:21:00.000
5	        Anny	2017-09-05 08:06:00.000

Repare nos valores novos de Name e LastModifytime para PersonID relativamente ao número 3.

Consulta

select * from project_table

Saída

===================================
Project	    Creationtime
===================================
project1	2015-01-01 00:00:00.000
project2	2016-02-02 01:23:00.000
project3	2017-03-04 05:16:00.000
NewProject	2017-10-01 00:00:00.000

Repare que a entrada NewProject foi adicionada a project_table.

Consulta

select * from watermarktable

Saída

======================================
TableName	    WatermarkValue
======================================
customer_table	2017-09-08 00:00:00.000
project_table	2017-10-01 00:00:00.000

Tenha em atenção que os valores de limite de tamanho de ambas as tabelas foram atualizados.

Neste tutorial, executou os passos seguintes:

  • Prepare os arquivos de dados de origem e de destino.
  • Criar uma fábrica de dados.
  • Criou um integration runtime autoalojado (IR).
  • Instalar o integration runtime.
  • Criar serviços ligados.
  • Crie conjuntos de dados de origem, de sink e de marca d'água.
  • Criar, executar e monitorizar um pipeline.
  • Reveja os resultados.
  • Adicionou ou atualizou os dados nas tabelas de origem.
  • Voltou a executar e a monitorizar o pipeline.
  • Reviu os resultados finais.

Avance para o tutorial seguinte para saber como transformar dados através de um cluster do Spark no Azure: