Carregue dados incrementalmente da Instância Gerenciada SQL do Azure para o Armazenamento do Azure usando a captura de dados de alteração (CDC)
APLICA-SE A: Azure Data Factory Azure Synapse Analytics
Gorjeta
Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange tudo, desde a movimentação de dados até ciência de dados, análises em tempo real, business intelligence e relatórios. Saiba como iniciar uma nova avaliação gratuitamente!
Neste tutorial, você cria uma fábrica de dados do Azure com um pipeline que carrega dados delta com base em informações de captura de dados de alteração (CDC) no banco de dados da Instância Gerenciada SQL do Azure de origem para um armazenamento de blob do Azure.
Vai executar os seguintes passos neste tutorial:
- Preparar o arquivo de dados de origem
- Criar uma fábrica de dados.
- Criar serviços ligados.
- Criar conjuntos de dados de origem e sink.
- Criar, depurar e executar o pipeline para verificar se há dados alterados
- Modificar dados na tabela de origem
- Conclua, execute e monitore o pipeline de cópia incremental completo
Descrição geral
A tecnologia Change Data Capture suportada por armazenamentos de dados, como as Instâncias Gerenciadas SQL (MI) do Azure e o SQL Server, pode ser usada para identificar dados alterados. Este tutorial descreve como usar o Azure Data Factory com a tecnologia SQL Change Data Capture para carregar incrementalmente dados delta da Instância Gerenciada SQL do Azure no Armazenamento de Blobs do Azure. Para obter informações mais concretas sobre a tecnologia SQL Change Data Capture, consulte Alterar captura de dados no SQL Server.
Fluxo de trabalho ponto a ponto
Aqui estão as etapas típicas do fluxo de trabalho de ponta a ponta para carregar dados incrementalmente usando a tecnologia Change Data Capture.
Nota
O SQL MI do Azure e o SQL Server dão suporte à tecnologia Change Data Capture. Este tutorial usa a Instância Gerenciada SQL do Azure como o armazenamento de dados de origem. Também pode utilizar um SQL Server local.
Solução de alto nível
Neste tutorial, você cria um pipeline que executa as seguintes operações:
- Crie uma atividade de pesquisa para contar o número de registros alterados na tabela CDC do Banco de dados SQL e passá-la para uma atividade de Condição SE.
- Crie uma Condição If para verificar se há registros alterados e, em caso afirmativo, invoque a atividade de cópia.
- Crie uma atividade de cópia para copiar os dados inseridos/atualizados/excluídos entre a tabela CDC para o Armazenamento de Blobs do Azure.
Se não tiver uma subscrição do Azure, crie uma conta gratuita antes de começar.
Pré-requisitos
- Instância Gerenciada SQL do Azure. A base de dados é utilizada como o arquivo de dados de origem. Se você não tiver uma Instância Gerenciada SQL do Azure, consulte o artigo Criar uma Instância Gerenciada do Banco de Dados SQL do Azure para conhecer as etapas para criar uma.
- Conta do Armazenamento do Azure. O armazenamento de blobs é utilizado como arquivo de dados de sink. Se não tiver uma conta de armazenamento do Azure, veja o artigo Criar uma conta de armazenamento para obter os passos para criar uma. Crie um contêiner chamado raw.
Criar uma tabela de fonte de dados no Banco de Dados SQL do Azure
Inicie o SQL Server Management Studio e conecte-se ao seu servidor de Instâncias Gerenciadas SQL do Azure.
No Explorador de Servidores, clique com botão direito do rato em base de dados e escolha Nova Consulta.
Execute o seguinte comando SQL em seu banco de dados de Instâncias Gerenciadas SQL do Azure para criar uma tabela nomeada
customers
como armazenamento da fonte de dados.create table customers ( customer_id int, first_name varchar(50), last_name varchar(50), email varchar(100), city varchar(50), CONSTRAINT "PK_Customers" PRIMARY KEY CLUSTERED ("customer_id") );
Habilite o mecanismo Change Data Capture em seu banco de dados e na tabela de origem (clientes) executando a seguinte consulta SQL:
Nota
- Substitua <o nome> do esquema de origem pelo esquema do seu Azure SQL MI que tem a tabela de clientes.
- A captura de dados de alteração não faz nada como parte das transações que alteram a tabela que está sendo rastreada. Em vez disso, as operações de inserção, atualização e exclusão são gravadas no log de transações. Os dados depositados em tabelas de alteração crescerão de forma incontrolável se você não remover os dados de forma periódica e sistemática. Para obter mais informações, consulte Habilitar a captura de dados de alteração para um banco de dados
EXEC sys.sp_cdc_enable_db EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 1
Insira dados na tabela de clientes executando o seguinte comando:
insert into customers (customer_id, first_name, last_name, email, city) values (1, 'Chevy', 'Leward', 'cleward0@mapy.cz', 'Reading'), (2, 'Sayre', 'Ateggart', 'sateggart1@nih.gov', 'Portsmouth'), (3, 'Nathalia', 'Seckom', 'nseckom2@blogger.com', 'Portsmouth');
Nota
Nenhuma alteração histórica na tabela é capturada antes da captura de dados de alteração ser habilitada.
Criar uma fábrica de dados
Siga as etapas no artigo Guia de início rápido: criar uma fábrica de dados usando o portal do Azure para criar uma fábrica de dados se você ainda não tiver uma com a qual trabalhar.
Criar serviços ligados
Os serviços ligados são criados numa fábrica de dados para ligar os seus arquivos de dados e serviços de computação a essa fábrica de dados. Nesta seção, você cria serviços vinculados à sua conta de Armazenamento do Azure e ao Azure SQL MI.
Criar o serviço ligado do Armazenamento do Azure.
Neste passo, vai ligar a sua Conta de Armazenamento do Azure à fábrica de dados.
Clique em Ligações e em + Nova.
Na janela Novo Serviço Ligado, selecione Armazenamento de Blobs do Azure e clique em Continuar.
Na janela Novo Serviço Ligado, siga os passos abaixo:
- Introduza AzureStorageLinkedService em Nome.
- Selecione a sua conta de Armazenamento do Azure em Nome da conta de armazenamento.
- Clique em Guardar.
Crie o serviço vinculado Banco de Dados SQL MI do Azure.
Nesta etapa, você vincula seu banco de dados SQL MI do Azure ao data factory.
Nota
Para aqueles que usam SQL MI, consulte aqui para obter informações sobre o acesso via ponto de extremidade público versus privado. Se estiver usando um ponto de extremidade privado, seria necessário executar esse pipeline usando um tempo de execução de integração auto-hospedado. O mesmo se aplicaria àqueles que executam o SQL Server on-prem, em cenários de VM ou VNet.
Clique em Ligações e em + Nova.
Na janela Novo Serviço Vinculado, selecione Instância Gerenciada do Banco de Dados SQL do Azure e clique em Continuar.
Na janela Novo Serviço Ligado, siga os passos abaixo:
- Insira AzureSqlMI1 para o campo Nome .
- Selecione seu servidor SQL para o campo Nome do servidor.
- Selecione seu banco de dados SQL para o campo Nome do banco de dados .
- Introduza o nome do utilizador no campo Nome de utilizador.
- Introduza a palavra-passe do utilizador no campo Palavra-passe.
- Clique em Testar ligação para testar a ligação.
- Clique em Guardar para guardar o serviço ligado.
Criar conjuntos de dados
Nesta etapa, você cria conjuntos de dados para representar a fonte de dados e o destino dos dados.
Criar um conjunto de dados para representar os dados de origem
Neste passo, vai criar um conjunto de dados para representar os dados de origem.
Na vista de árvore, clique em + (mais) e em Conjunto de Dados.
Selecione Instância Gerenciada do Banco de Dados SQL do Azure e clique em Continuar.
Na guia Definir propriedades, defina o nome do conjunto de dados e as informações de conexão:
- Selecione AzureSqlMI1 para serviço vinculado.
- Selecione [dbo].[ dbo_customers_CT] para Nome da tabela. Nota: esta tabela foi criada automaticamente quando o CDC foi ativado na tabela de clientes. Os dados alterados nunca são consultados diretamente desta tabela, mas são extraídos através das funções CDC.
Crie um conjunto de dados para representar os dados copiados para o arquivo de dados de sink.
Neste passo, cria um conjunto de dados para representar os dados que são copiados do arquivo de dados de origem. Você criou o contêiner do data lake em seu Armazenamento de Blobs do Azure como parte dos pré-requisitos. Crie o contentor se ainda não existir ou defina-o com o nome de um contentor existente. Neste tutorial, o nome do arquivo de saída é gerado dinamicamente usando o tempo de disparo, que será configurado posteriormente.
Na vista de árvore, clique em + (mais) e em Conjunto de Dados.
Selecione Armazenamento de Blobs do Azure e clique em Continuar.
Selecione DelimitedText e clique em Continuar.
Na guia Definir Propriedades, defina o nome do conjunto de dados e as informações de conexão:
- Selecione AzureStorageLinkedService em Serviço ligado.
- Insira raw para a parte do contêiner do filePath.
- Ativar Primeira linha como cabeçalho
- Clique em Ok
Criar um pipeline para copiar os dados alterados
Nesta etapa, você cria um pipeline, que primeiro verifica o número de registros alterados presentes na tabela de alterações usando uma atividade de pesquisa. Uma atividade de condição SE verifica se o número de registros alterados é maior que zero e executa uma atividade de cópia para copiar os dados inseridos/atualizados/excluídos do Banco de Dados SQL do Azure para o Armazenamento de Blobs do Azure. Por fim, um gatilho de janela de tombamento é configurado e os horários de início e término serão passados para as atividades como os parâmetros da janela de início e fim.
Na interface do usuário do Data Factory, alterne para a guia Editar . Clique em + (mais) no painel esquerdo e clique em Pipeline.
Verá um separador novo para configurar o pipeline. Também verá o pipeline na vista de árvore. Na janela Propriedades, altere o nome do pipeline para IncrementalCopyPipeline.
Expanda Geral na caixa de ferramentas Atividades e arraste e largue a atividade Lookup na superfície de desenho do pipeline. Defina o nome da atividade como GetChangeCount. Essa atividade obtém o número de registros na tabela de alterações para uma determinada janela de tempo.
Alterne para as Configurações na janela Propriedades:
Especifique o nome do conjunto de dados SQL MI para o campo Conjunto de dados de origem.
Selecione a opção Consulta e insira o seguinte na caixa de consulta:
DECLARE @from_lsn binary(10), @to_lsn binary(10); SET @from_lsn =sys.fn_cdc_get_min_lsn('dbo_customers'); SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal', GETDATE()); SELECT count(1) changecount FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, 'all')
- Ativar apenas a primeira linha
Clique no botão Visualizar dados para garantir que uma saída válida seja obtida pela atividade de pesquisa
Expanda Iteração & condicionais na caixa de ferramentas Atividades e arraste e solte a atividade Se Condição na superfície do designer de pipeline. Defina o nome da atividade como HasChangedRows.
Alterne para Atividades na janela Propriedades:
- Insira a seguinte expressão
@greater(int(activity('GetChangeCount').output.firstRow.changecount),0)
- Clique no ícone de lápis para editar a condição True.
- Expanda Geral na caixa de ferramentas Atividades e arraste e solte uma atividade Esperar na superfície do designer de pipeline. Esta é uma atividade temporária para depurar a condição If e será alterada posteriormente no tutorial.
- Clique no breadcrumb IncrementalCopyPipeline para retornar ao pipeline principal.
Execute o pipeline no modo de depuração para verificar se o pipeline é executado com êxito.
Em seguida, retorne à etapa Condição verdadeira e exclua a atividade Esperar . Na caixa de ferramentas Atividades, expanda Mover & transformar e arraste e solte uma atividade Copiar na superfície do designer de pipeline. Defina o nome da atividade como IncrementalCopyActivity.
Mude para o separador Origem, na janela Propriedades, e siga os passos abaixo:
Especifique o nome do conjunto de dados SQL MI para o campo Conjunto de dados de origem.
Selecione Consulta em Utilize Consulta.
Insira o seguinte para Consulta.
DECLARE @from_lsn binary(10), @to_lsn binary(10); SET @from_lsn =sys.fn_cdc_get_min_lsn('dbo_customers'); SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal', GETDATE()); SELECT * FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, 'all')
Clique em Visualizar para verificar se a consulta retorna as linhas alteradas corretamente.
Alterne para a guia Coletor e especifique o conjunto de dados do Armazenamento do Azure para o campo Conjunto de Dados do Coletor.
Clique novamente na tela principal do pipeline e conecte a atividade Pesquisa à atividade Se Condição, uma a uma. Arraste o botão verde anexado à atividade Pesquisa para a atividade Se Condição .
Clique em Validar, na barra de ferramentas. Confirme que não há erros de validação. Clique em >> para fechar a janela Relatório de Validação do Pipeline.
Clique em Depurar para testar o pipeline e verificar se um arquivo foi gerado no local de armazenamento.
Publique entidades (serviços vinculados, conjuntos de dados e pipelines) no serviço Data Factory clicando no botão Publicar tudo . Aguarde até ver a mensagem Publicação com êxito.
Configurar o gatilho da janela de tombamento e os parâmetros da janela CDC
Nesta etapa, você cria um gatilho de janela de tombamento para executar o trabalho em uma agenda frequente. Você usará as variáveis de sistema WindowStart e WindowEnd do gatilho da janela de tombamento e as passará como parâmetros para seu pipeline a serem usados na consulta CDC.
Navegue até a guia Parâmetros do pipeline IncrementalCopyPipeline e, usando o botão + Novo , adicione dois parâmetros (triggerStartTime e triggerEndTime) ao pipeline, que representarão a hora de início e de término da janela de tombamento. Para fins de depuração, adicione valores padrão no formato AAAA-MM-DD HH24:MI:SS.FFF, mas certifique-se de que o triggerStartTime não seja anterior à ativação do CDC na tabela, caso contrário, isso resultará em um erro.
Clique na guia de configurações da atividade Pesquisa e configure a consulta para usar os parâmetros de início e fim. Copie o seguinte para a consulta:
@concat('DECLARE @begin_time datetime, @end_time datetime, @from_lsn binary(10), @to_lsn binary(10); SET @begin_time = ''',pipeline().parameters.triggerStartTime,'''; SET @end_time = ''',pipeline().parameters.triggerEndTime,'''; SET @from_lsn = sys.fn_cdc_map_time_to_lsn(''smallest greater than or equal'', @begin_time); SET @to_lsn = sys.fn_cdc_map_time_to_lsn(''largest less than'', @end_time); SELECT count(1) changecount FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, ''all'')')
Navegue até a atividade Copiar no caso verdadeiro da atividade Se condição e clique na guia Origem. Copie o seguinte para a consulta:
@concat('DECLARE @begin_time datetime, @end_time datetime, @from_lsn binary(10), @to_lsn binary(10); SET @begin_time = ''',pipeline().parameters.triggerStartTime,'''; SET @end_time = ''',pipeline().parameters.triggerEndTime,'''; SET @from_lsn = sys.fn_cdc_map_time_to_lsn(''smallest greater than or equal'', @begin_time); SET @to_lsn = sys.fn_cdc_map_time_to_lsn(''largest less than'', @end_time); SELECT * FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, ''all'')')
Clique na guia Coletor da atividade Copiar e clique em Abrir para editar as propriedades do conjunto de dados. Clique na guia Parâmetros e adicione um novo parâmetro chamado triggerStart
Em seguida, configure as propriedades do conjunto de dados para armazenar os dados em um subdiretório cliente/incremental com partições baseadas em data.
Clique na guia Conexão das propriedades do conjunto de dados e adicione conteúdo dinâmico para as seções Diretório e Arquivo.
Digite a seguinte expressão na seção Diretório clicando no link de conteúdo dinâmico abaixo da caixa de texto:
@concat('customers/incremental/',formatDateTime(dataset().triggerStart,'yyyy/MM/dd'))
Insira a seguinte expressão na seção Arquivo . Isso criará nomes de arquivo com base na data e hora de início do gatilho, sufixados com a extensão csv:
@concat(formatDateTime(dataset().triggerStart,'yyyyMMddHHmmssfff'),'.csv')
Navegue de volta para as configurações do coletor na atividade Copiar clicando na guia IncrementalCopyPipeline.
Expanda as propriedades do conjunto de dados e insira conteúdo dinâmico no valor do parâmetro triggerStart com a seguinte expressão:
@pipeline().parameters.triggerStartTime
Clique em Depurar para testar o pipeline e garantir que a estrutura de pastas e o arquivo de saída sejam gerados conforme o esperado. Transfira e abra o ficheiro para verificar o conteúdo.
Verifique se os parâmetros estão sendo injetados na consulta revisando os parâmetros de entrada da execução do pipeline.
Publique entidades (serviços vinculados, conjuntos de dados e pipelines) no serviço Data Factory clicando no botão Publicar tudo . Aguarde até ver a mensagem Publicação com êxito.
Por fim, configure um gatilho de janela de tombamento para executar o pipeline em um intervalo regular e defina parâmetros de tempo de início e fim.
- Clique no botão Adicionar gatilho e selecione Novo/Editar
- Insira um nome de gatilho e especifique uma hora de início, que é igual à hora de término da janela de depuração acima.
Na próxima tela, especifique os seguintes valores para os parâmetros de início e fim, respectivamente.
@formatDateTime(trigger().outputs.windowStartTime,'yyyy-MM-dd HH:mm:ss.fff') @formatDateTime(trigger().outputs.windowEndTime,'yyyy-MM-dd HH:mm:ss.fff')
Nota
O gatilho só será executado depois de publicado. Além disso, o comportamento esperado da janela de tombamento é executar todos os intervalos históricos desde a data de início até agora. Mais informações sobre gatilhos de janela de queda podem ser encontradas aqui.
Usando o SQL Server Management Studio , faça algumas alterações adicionais na tabela do cliente executando o seguinte SQL:
insert into customers (customer_id, first_name, last_name, email, city) values (4, 'Farlie', 'Hadigate', 'fhadigate3@zdnet.com', 'Reading'); insert into customers (customer_id, first_name, last_name, email, city) values (5, 'Anet', 'MacColm', 'amaccolm4@yellowbook.com', 'Portsmouth'); insert into customers (customer_id, first_name, last_name, email, city) values (6, 'Elonore', 'Bearham', 'ebearham5@ebay.co.uk', 'Portsmouth'); update customers set first_name='Elon' where customer_id=6; delete from customers where customer_id=5;
Clique no botão Publicar tudo . Aguarde até ver a mensagem Publicação com êxito.
Após alguns minutos, o pipeline será acionado e um novo arquivo será carregado no Armazenamento do Azure
Monitorizar o pipeline da cópia incremental
Clique no separador Monitorizar, no lado esquerdo. Verá a execução do pipeline na lista e o respetivo estado. Para atualizar a lista, clique em Atualizar. Passe o mouse perto do nome do pipeline para acessar a ação Repetir e o relatório de consumo.
Para exibir as execuções de atividade associadas à execução do pipeline, clique no nome do pipeline. Se os dados alterados foram detetados, haverá três atividades, incluindo a atividade de cópia, caso contrário, haverá apenas duas entradas na lista. Para voltar para a exibição de execuções de pipeline, clique no link Todos os pipelines na parte superior.
Rever os resultados
Vai ver um segundo ficheiro na pasta customers/incremental/YYYY/MM/DD
do contentor raw
.
Conteúdos relacionados
Avance para o tutorial a seguir para saber como copiar arquivos novos e alterados somente com base em sua LastModifiedDate: