Copie dados incrementalmente do Banco de Dados SQL do Azure para o Armazenamento de Blob usando o controle de alterações no portal do Azure
APLICA-SE A: Azure Data Factory Azure Synapse Analytics
Gorjeta
Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange tudo, desde a movimentação de dados até ciência de dados, análises em tempo real, business intelligence e relatórios. Saiba como iniciar uma nova avaliação gratuitamente!
Uma solução de integração de dados, que carrega dados incrementalmente após os carregamentos de dados iniciais é um cenário bastante utilizado. Os dados alterados dentro de um período em seu armazenamento de dados de origem podem ser facilmente fatiados (por exemplo, LastModifyTime
CreationTime
, ). Mas, em alguns casos, não há uma maneira explícita de identificar os dados delta da última vez que você processou os dados. Você pode usar a tecnologia de controle de alterações suportada por armazenamentos de dados, como o Banco de Dados SQL do Azure e o SQL Server, para identificar os dados delta.
Este tutorial descreve como usar o Azure Data Factory com o controle de alterações para carregar incrementalmente dados delta do Banco de Dados SQL do Azure no Armazenamento de Blobs do Azure. Para obter mais informações sobre o controle de alterações, consulte Controle de alterações no SQL Server.
Vai executar os seguintes passos neste tutorial:
- Prepare o armazenamento de dados de origem.
- Criar uma fábrica de dados.
- Criar serviços ligados.
- Crie uma origem, sink e conjuntos de dados de registo de alterações.
- Crie, execute e monitore o pipeline de cópia completa.
- Adicione ou atualize dados na tabela de origem.
- Crie, execute e monitore o pipeline de cópia incremental.
Solução de alto nível
Neste tutorial, você cria dois pipelines que executam as seguintes operações.
Nota
Este tutorial utiliza a Base de Dados SQL do Azure como o arquivo de dados de origem. Você também pode usar o SQL Server.
Carregamento inicial de dados históricos: você cria um pipeline com uma atividade de cópia que copia os dados inteiros do armazenamento de dados de origem (Banco de Dados SQL do Azure) para o armazenamento de dados de destino (Armazenamento de Blob do Azure):
- Habilite a tecnologia de controle de alterações no banco de dados de origem no Banco de Dados SQL do Azure.
- Obtenha o valor inicial de no banco de dados como a linha de
SYS_CHANGE_VERSION
base para capturar dados alterados. - Carregue dados completos do banco de dados de origem no Armazenamento de Blobs do Azure.
Carregamento incremental de dados delta em um cronograma: você cria um pipeline com as seguintes atividades e o executa periodicamente:
Crie duas atividades de pesquisa para obter os valores antigos e novos
SYS_CHANGE_VERSION
do Banco de Dados SQL do Azure.Crie uma atividade de cópia para copiar os dados inseridos, atualizados ou excluídos (os dados delta) entre os dois
SYS_CHANGE_VERSION
valores do Banco de Dados SQL do Azure para o Armazenamento de Blobs do Azure.Carregue os dados delta unindo as chaves primárias das linhas alteradas (entre dois
SYS_CHANGE_VERSION
valores) com os dados na tabela desys.change_tracking_tables
origem e, em seguida, mova os dados delta para o destino.Crie uma atividade de procedimento armazenado para atualizar o valor da próxima execução de
SYS_CHANGE_VERSION
pipeline.
Pré-requisitos
- Subscrição do Azure. Se não tiver uma, crie uma conta gratuita antes de começar.
- Base de Dados SQL do Azure. Você usa um banco de dados no Banco de Dados SQL do Azure como o armazenamento de dados de origem . Se você não tiver um, consulte Criar um banco de dados no Banco de Dados SQL do Azure para conhecer as etapas para criá-lo.
- Conta de armazenamento do Azure. Você usa o armazenamento de Blob como o armazenamento de dados do coletor . Se você não tiver uma conta de armazenamento do Azure, consulte Criar uma conta de armazenamento para conhecer as etapas para criar uma. Crie um contentor com o nome adftutorial.
Nota
Recomendamos que utilize o módulo Azure Az do PowerShell para interagir com o Azure. Para começar, consulte Instalar o Azure PowerShell. Para saber como migrar para o módulo do Az PowerShell, veja Migrar o Azure PowerShell do AzureRM para o Az.
Criar uma tabela de fonte de dados no Banco de Dados SQL do Azure
Abra o SQL Server Management Studio e conecte-se ao Banco de Dados SQL.
No Gerenciador de Servidores, clique com o botão direito do mouse no banco de dados e selecione Nova Consulta.
Execute o seguinte comando SQL em seu banco de dados para criar uma tabela nomeada
data_source_table
como o armazenamento de dados de origem:create table data_source_table ( PersonID int NOT NULL, Name varchar(255), Age int PRIMARY KEY (PersonID) ); INSERT INTO data_source_table (PersonID, Name, Age) VALUES (1, 'aaaa', 21), (2, 'bbbb', 24), (3, 'cccc', 20), (4, 'dddd', 26), (5, 'eeee', 22);
Habilite o controle de alterações em seu banco de dados e na tabela de origem (
data_source_table
) executando a seguinte consulta SQL.Nota
- Substitua
<your database name>
pelo nome do banco de dados no Banco de Dados SQL do Azure que temdata_source_table
. - Os dados alterados são mantidos por dois dias no exemplo atual. Se carregar os dados alterados a cada três ou mais dias, alguns dados alterados não são incluídos. Você precisa alterar o valor de para um número maior ou garantir que seu período para carregar os dados alterados seja dentro de
CHANGE_RETENTION
dois dias. Para obter mais informações, consulte Habilitar o controle de alterações para um banco de dados.
ALTER DATABASE <your database name> SET CHANGE_TRACKING = ON (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON) ALTER TABLE data_source_table ENABLE CHANGE_TRACKING WITH (TRACK_COLUMNS_UPDATED = ON)
- Substitua
Crie uma nova tabela e armazene chamado
ChangeTracking_version
com um valor padrão executando a seguinte consulta:create table table_store_ChangeTracking_version ( TableName varchar(255), SYS_CHANGE_VERSION BIGINT, ); DECLARE @ChangeTracking_version BIGINT SET @ChangeTracking_version = CHANGE_TRACKING_CURRENT_VERSION(); INSERT INTO table_store_ChangeTracking_version VALUES ('data_source_table', @ChangeTracking_version)
Nota
Se os dados não forem alterados depois de habilitar o controle de alterações para o Banco de dados SQL, o valor da versão de controle de alterações será
0
.Execute a consulta a seguir para criar um procedimento armazenado em seu banco de dados. O pipeline invoca esse procedimento armazenado para atualizar a versão de controle de alterações na tabela que você criou na etapa anterior.
CREATE PROCEDURE Update_ChangeTracking_Version @CurrentTrackingVersion BIGINT, @TableName varchar(50) AS BEGIN UPDATE table_store_ChangeTracking_version SET [SYS_CHANGE_VERSION] = @CurrentTrackingVersion WHERE [TableName] = @TableName END
Criar uma fábrica de dados
Abra o browser Microsoft Edge ou Google Chrome. Atualmente, apenas esses navegadores suportam a interface do usuário (UI) do Data Factory.
No portal do Azure, no menu à esquerda, selecione Criar um recurso.
Selecione Integration>Data Factory.
Na página Novo data factory, insira ADFTutorialDataFactory para o nome.
O nome do data factory deve ser globalmente exclusivo. Se você receber um erro dizendo que o nome escolhido não está disponível, altere o nome (por exemplo, para seunomeADFTutorialDataFactory) e tente criar o data factory novamente. Para obter mais informações, consulte Regras de nomenclatura do Azure Data Factory.
Selecione a subscrição do Azure na qual quer criar a fábrica de dados.
Em Grupo de Recursos, efetue um destes passos:
- Selecione Usar existente e, em seguida, selecione um grupo de recursos existente na lista suspensa.
- Selecione Criar novo e insira o nome de um grupo de recursos.
Para saber mais sobre os grupos de recursos, veja Utilizar grupos de recursos para gerir os recursos do Azure.
Em Versão, selecione V2.
Em Região, selecione a região para o data factory.
A lista suspensa exibe apenas os locais suportados. Os armazenamentos de dados (por exemplo, Armazenamento do Azure e Banco de Dados SQL do Azure) e cálculos (por exemplo, Azure HDInsight) que uma fábrica de dados usa podem estar em outras regiões.
Selecione Next: Configuração do Git. Configure o repositório seguindo as instruções em Método de configuração 4: Durante a criação de fábrica ou marque a caixa de seleção Configurar Git mais tarde .
Selecione Rever + criar.
Selecione Criar.
No painel, o bloco Implantando o Data Factory mostra o status.
Após a conclusão da criação, a página Data Factory será exibida. Selecione o bloco Iniciar estúdio para abrir a interface do usuário do Azure Data Factory em uma guia separada.
Criar serviços ligados
Os serviços ligados são criados numa fábrica de dados para ligar os seus arquivos de dados e serviços de computação a essa fábrica de dados. Nesta seção, você cria serviços vinculados à sua conta de armazenamento do Azure e ao seu banco de dados no Banco de Dados SQL do Azure.
Criar um serviço ligado do Armazenamento do Azure
Para vincular sua conta de armazenamento ao data factory:
- Na interface do usuário do Data Factory, na guia Gerenciar , em Conexões, selecione Serviços vinculados. Em seguida, selecione + Novo ou o botão Criar serviço vinculado.
- Na janela Novo Serviço Vinculado, selecione Armazenamento de Blob do Azure e selecione Continuar.
- Insira as seguintes informações:
- Em Nome, introduza AzureStorageLinkedService.
- Para Connect via integration runtime, selecione o integration runtime.
- Em Tipo de autenticação, selecione um método de autenticação.
- Para Nome da conta de armazenamento, selecione sua conta de armazenamento do Azure.
- Selecione Criar.
Criar um serviço ligado da Base de Dados SQL do Azure
Para vincular seu banco de dados ao data factory:
Na interface do usuário do Data Factory, na guia Gerenciar , em Conexões, selecione Serviços vinculados. Em seguida, selecione + Novo.
Na janela Novo Serviço Vinculado, selecione Banco de Dados SQL do Azure e selecione Continuar.
Introduza as informações seguintes:
- Em Name, insira AzureSqlDatabaseLinkedService.
- Em Nome do servidor, selecione o servidor.
- Em Nome do banco de dados, selecione seu banco de dados.
- Em Tipo de autenticação, selecione um método de autenticação. Este tutorial usa a autenticação SQL para demonstração.
- Em Nome de usuário, digite o nome do usuário.
- Em Senha, digite uma senha para o usuário. Ou forneça as informações para o Azure Key Vault - serviço vinculado AKV, nome secreto e versão secreta.
Selecione Testar ligação para testar a ligação.
Selecione Criar para criar o serviço vinculado.
Criar conjuntos de dados
Nesta seção, você cria conjuntos de dados para representar a fonte de dados e o destino dos dados, juntamente com o local para armazenar os SYS_CHANGE_VERSION
valores.
Criar um conjunto de dados para representar os dados de origem
Na interface do usuário do Data Factory, na guia Autor , selecione o sinal de adição (+). Em seguida, selecione Conjunto de dados ou selecione as reticências para ações do conjunto de dados.
Selecione Banco de Dados SQL do Azure e, em seguida, selecione Continuar.
Na janela Definir Propriedades, execute as seguintes etapas:
- Em Name, insira SourceDataset.
- Para Serviço vinculado, selecione AzureSqlDatabaseLinkedService.
- Em Nome da tabela, selecione dbo.data_source_table.
- Em Importar esquema, selecione a opção Da conexão/armazenamento .
- Selecione OK.
Criar um conjunto de dados para representar dados copiados para o armazenamento de dados do coletor
No procedimento a seguir, você cria um conjunto de dados para representar os dados copiados do armazenamento de dados de origem. Você criou o contêiner adftutorial no Armazenamento de Blobs do Azure como parte dos pré-requisitos. Crie o contentor se ainda não existir ou defina-o como o nome de um contentor existente. Neste tutorial, o nome do arquivo de saída é gerado dinamicamente a partir da expressão @CONCAT('Incremental-', pipeline().RunId, '.txt')
.
Na interface do usuário do Data Factory, na guia Autor , selecione +. Em seguida, selecione Conjunto de dados ou selecione as reticências para ações do conjunto de dados.
Selecione Armazenamento de Blobs do Azure e, em seguida, selecione Continuar.
Selecione o formato do tipo de dados como DelimitedText e, em seguida, selecione Continuar.
Na janela Definir propriedades, execute as seguintes etapas:
- Em Name, insira SinkDataset.
- Para Serviço vinculado, selecione AzureBlobStorageLinkedService.
- Em Caminho do arquivo, digite adftutorial/incchgtracking.
- Selecione OK.
Depois que o conjunto de dados aparecer na exibição em árvore, vá para a guia Conexão e selecione a caixa de texto Nome do arquivo . Quando a opção Adicionar conteúdo dinâmico for exibida, selecione-a.
A janela Construtor de expressões Pipeline é exibida. Cole
@concat('Incremental-',pipeline().RunId,'.csv')
na caixa de texto.Selecione OK.
Criar um conjunto de dados para representar os dados do controlo de alterações
No procedimento a seguir, você cria um conjunto de dados para armazenar a versão de controle de alterações. Você criou a table_store_ChangeTracking_version
tabela como parte dos pré-requisitos.
- Na IU do Data Factory, no separador Autor, selecione e, em seguida, selecione Conjunto de Dados. +
- Selecione Banco de Dados SQL do Azure e, em seguida, selecione Continuar.
- Na janela Definir Propriedades, execute as seguintes etapas:
- Em Name, insira ChangeTrackingDataset.
- Para Serviço vinculado, selecione AzureSqlDatabaseLinkedService.
- Em Nome da tabela, selecione dbo.table_store_ChangeTracking_version.
- Em Importar esquema, selecione a opção Da conexão/armazenamento .
- Selecione OK.
Criar um pipeline para a cópia completa
No procedimento a seguir, você cria um pipeline com uma atividade de cópia que copia todos os dados do armazenamento de dados de origem (Banco de Dados SQL do Azure) para o armazenamento de dados de destino (Armazenamento de Blob do Azure):
Na interface do usuário do Data Factory, na guia Autor, selecione +e, em seguida, selecione Pipeline>de pipeline.
Uma nova guia é exibida para configurar o pipeline. O pipeline também aparece na visualização em árvore. Na janela Propriedades, altere o nome do pipeline para FullCopyPipeline.
Na caixa de ferramentas Atividades, expanda Mover & transformar. Siga um dos seguintes passos:
- Arraste a atividade de cópia para a superfície do designer de pipeline.
- Na barra de pesquisa, em Atividades, procure a atividade de cópia de dados e defina o nome como FullCopyActivity.
Alterne para a guia Origem . Em Source Dataset, selecione SourceDataset.
Alterne para a guia Coletor . Em Sink Dataset, selecione SinkDataset.
Para validar a definição de pipeline, selecione Validar na barra de ferramentas. Confirme que não há nenhum erro de validação. Feche a saída de validação do pipeline.
Para publicar entidades (serviços vinculados, conjuntos de dados e pipelines), selecione Publicar tudo. Aguarde até ver a mensagem Publicação com êxito.
Para ver as notificações, selecione o botão Mostrar notificações .
Execute o pipeline da cópia completa
Na interface do usuário do Data Factory, na barra de ferramentas do pipeline, selecione Adicionar gatilho e, em seguida, selecione Gatilho agora.
Na janela Execução do pipeline, selecione OK.
Monitorize o pipeline da cópia completa
Na interface do usuário do Data Factory, selecione a guia Monitor . A execução do pipeline e seu status aparecem na lista. Para atualizar a lista, selecione Atualizar. Passe o cursor sobre a execução do pipeline para obter a opção Rerun ou Consumption .
Para exibir as execuções de atividade associadas à execução do pipeline, selecione o nome do pipeline na coluna Nome do pipeline . Há apenas uma atividade no pipeline, portanto, há apenas uma entrada na lista. Para voltar para a exibição de execuções de pipeline, selecione o link Todas as execuções de pipeline na parte superior.
Rever os resultados
A pasta incchgtracking do contêiner adftutorial inclui um arquivo chamado incremental-<GUID>.csv
.
O ficheiro deve conter os dados da sua base de dados:
PersonID,Name,Age
1,"aaaa",21
2,"bbbb",24
3,"cccc",20
4,"dddd",26
5,"eeee",22
5,eeee,PersonID,Name,Age
1,"aaaa",21
2,"bbbb",24
3,"cccc",20
4,"dddd",26
5,"eeee",22
Adicione mais dados à tabela de origem
Execute a seguinte consulta em seu banco de dados para adicionar uma linha e atualizar uma linha:
INSERT INTO data_source_table
(PersonID, Name, Age)
VALUES
(6, 'new','50');
UPDATE data_source_table
SET [Age] = '10', [name]='update' where [PersonID] = 1
Criar um pipeline para a cópia do delta
No procedimento a seguir, você cria um pipeline com atividades e o executa periodicamente. Quando você executa o pipeline:
- As atividades de pesquisa obtêm os valores antigos e novos
SYS_CHANGE_VERSION
do Banco de Dados SQL do Azure e os passam para a atividade de cópia. - A atividade de cópia copia os dados inseridos, atualizados ou excluídos entre os dois
SYS_CHANGE_VERSION
valores do Banco de Dados SQL do Azure para o Armazenamento de Blobs do Azure. - A atividade de procedimento armazenado atualiza o valor de para a próxima execução de
SYS_CHANGE_VERSION
pipeline.
Na interface do usuário do Data Factory, alterne para a guia Autor . Selecione +e, em seguida, selecione Pipeline>de pipeline.
Uma nova guia é exibida para configurar o pipeline. O pipeline também aparece na visualização em árvore. Na janela Propriedades, altere o nome do pipeline para IncrementalCopyPipeline.
Expanda Geral na caixa de ferramentas Atividades. Arraste a atividade de pesquisa para a superfície do designer de pipeline ou pesquise na caixa Atividades de pesquisa. Defina o nome da atividade como LookupLastChangeTrackingVersionActivity. Essa atividade obtém a versão de controle de alterações usada na última operação de cópia armazenada na
table_store_ChangeTracking_version
tabela.Alterne para a guia Configurações na janela Propriedades . Para Source Dataset, selecione ChangeTrackingDataset.
Arraste a atividade de pesquisa da caixa de ferramentas Atividades para a superfície do designer de pipeline. Defina o nome da atividade como LookupCurrentChangeTrackingVersionActivity. Esta atividade obtém a versão atual do controlo de alterações.
Alterne para a guia Configurações na janela Propriedades e siga as seguintes etapas:
Para Source dataset, selecione SourceDataset.
Para Consulta Usar, selecione Consulta.
Em Query, insira a seguinte consulta SQL:
SELECT CHANGE_TRACKING_CURRENT_VERSION() as CurrentChangeTrackingVersion
Na caixa de ferramentas Atividades, expanda Mover & transformar. Arraste a atividade de cópia de dados para a superfície do designer de pipeline. Defina o nome da atividade como IncrementalCopyActivity. Essa atividade copia os dados entre a última versão de controle de alterações e a versão atual de controle de alterações para o armazenamento de dados de destino.
Alterne para a guia Origem na janela Propriedades e siga as seguintes etapas:
Para Source dataset, selecione SourceDataset.
Para Consulta Usar, selecione Consulta.
Em Query, insira a seguinte consulta SQL:
SELECT data_source_table.PersonID,data_source_table.Name,data_source_table.Age, CT.SYS_CHANGE_VERSION, SYS_CHANGE_OPERATION from data_source_table RIGHT OUTER JOIN CHANGETABLE(CHANGES data_source_table, @{activity('LookupLastChangeTrackingVersionActivity').output.firstRow.SYS_CHANGE_VERSION}) AS CT ON data_source_table.PersonID = CT.PersonID where CT.SYS_CHANGE_VERSION <= @{activity('LookupCurrentChangeTrackingVersionActivity').output.firstRow.CurrentChangeTrackingVersion}
Alterne para a guia Coletor . Em Sink Dataset, selecione SinkDataset.
Conecte ambas as atividades de pesquisa à atividade de cópia, uma a uma. Arraste o botão verde anexado à atividade de pesquisa para a atividade de cópia.
Arraste a atividade do procedimento armazenado da caixa de ferramentas Atividades para a superfície do designer de pipeline. Defina o nome da atividade como StoredProceduretoUpdateChangeTrackingActivity. Esta atividade atualiza a versão de controlo de alterações na
table_store_ChangeTracking_version
tabela.Mude para o separador Definições e, em seguida, siga os seguintes passos:
- Para Serviço vinculado, selecione AzureSqlDatabaseLinkedService.
- Para Nome do Procedimento armazenado, selecione Update_ChangeTracking_Version.
- Selecione Importar.
- Na seção Parâmetros de procedimento armazenado, especifique os seguintes valores para os parâmetros:
Nome Tipo valor CurrentTrackingVersion
Int64 @{activity('LookupCurrentChangeTrackingVersionActivity').output.firstRow.CurrentChangeTrackingVersion}
TableName
String @{activity('LookupLastChangeTrackingVersionActivity').output.firstRow.TableName}
Conecte a atividade de cópia à atividade de procedimento armazenado. Arraste o botão verde anexado à atividade de cópia para a atividade de procedimento armazenado.
Selecione Validar na barra de ferramentas. Confirme que não há erros de validação. Feche a janela Relatório de Validação de Pipeline.
Publique entidades (serviços vinculados, conjuntos de dados e pipelines) no serviço Data Factory selecionando o botão Publicar tudo . Aguarde até que a mensagem Publicação bem-sucedida seja exibida.
Executar o pipeline da cópia incremental
Selecione Adicionar gatilho na barra de ferramentas do pipeline e, em seguida, selecione Gatilho agora.
Na janela Pipeline Run, selecione OK.
Monitorizar o pipeline da cópia incremental
Selecione a guia Monitor . A execução do pipeline e seu status aparecem na lista. Para atualizar a lista, selecione Atualizar.
Para exibir as execuções de atividade associadas à execução do pipeline, selecione o link IncrementalCopyPipeline na coluna Nome do pipeline . As execuções de atividade aparecem em uma lista.
Rever os resultados
O segundo arquivo aparece na pasta incchgtracking do contêiner adftutorial .
O arquivo deve ter apenas os dados delta do seu banco de dados. O registro com U
é a linha atualizada no banco de dados e I
é a linha adicionada.
PersonID,Name,Age,SYS_CHANGE_VERSION,SYS_CHANGE_OPERATION
1,update,10,2,U
6,new,50,1,I
As três primeiras colunas são dados alterados de data_source_table
. As duas últimas colunas são os metadados da tabela para o sistema de controle de alterações. A quarta coluna é o valor de SYS_CHANGE_VERSION
cada linha alterada. A quinta coluna é a operação: U
= update, I
= insert. Para obter detalhes sobre as informações do registo de alterações, consulte CHANGETABLE.
==================================================================
PersonID Name Age SYS_CHANGE_VERSION SYS_CHANGE_OPERATION
==================================================================
1 update 10 2 U
6 new 50 1 I
Conteúdos relacionados
Avance para o tutorial a seguir para aprender a copiar apenas arquivos novos e alterados, com base em LastModifiedDate
: