Dela via


Läs in data stegvis från Azure SQL Managed Instance till Azure Storage med hjälp av CDC (Change Data Capture)

GÄLLER FÖR: Azure Data Factory Azure Synapse Analytics

Dricks

Prova Data Factory i Microsoft Fabric, en allt-i-ett-analyslösning för företag. Microsoft Fabric omfattar allt från dataflytt till datavetenskap, realtidsanalys, business intelligence och rapportering. Lär dig hur du startar en ny utvärderingsversion kostnadsfritt!

I den här självstudien skapar du en Azure-datafabrik med en pipeline som läser in deltadata baserat på CDC-information (Change Data Capture) i azure SQL Managed Instance-källdatabasen till en Azure Blob Storage.

I de här självstudierna går du igenom följande steg:

  • Förbereda källdatalagret
  • Skapa en datafabrik.
  • Skapa länkade tjänster.
  • Skapa datauppsättningar för källa och mottagare.
  • Skapa, felsöka och kör pipelinen för att söka efter ändrade data
  • Ändra data i källtabellen
  • Slutför, kör och övervakar den fullständiga inkrementella kopieringspipelinen

Översikt

Tekniken Change Data Capture som stöds av datalager som Azure SQL Managed Instances (MI) och SQL Server kan användas för att identifiera ändrade data. I den här självstudien beskrivs hur du använder Azure Data Factory med SQL Change Data Capture-teknik för inkrementell inläsning av deltadata från Azure SQL Managed Instance till Azure Blob Storage. Mer konkret information om SQL Change Data Capture-teknik finns i Ändra datainsamling i SQL Server.

Arbetsflödet slutpunkt till slutpunkt

Här följer de vanliga stegen för arbetsflöden från slutpunkt till slutpunkt för inkrementell inläsning med hjälp av tekniken För ändringsdatainsamling.

Kommentar

Både Azure SQL MI och SQL Server stöder Change Data Capture-tekniken. I den här självstudien används Azure SQL Managed Instance som källdatalager. Du kan också använda en lokal SQL Server.

Lösning på hög nivå

I den här självstudien skapar du en pipeline som utför följande åtgärder:

  1. Skapa en uppslagsaktivitet för att räkna antalet ändrade poster i tabellen SQL Database CDC och skicka den till en IF Condition-aktivitet.
  2. Skapa ett if-villkor för att kontrollera om det finns ändrade poster och i så fall anropa kopieringsaktiviteten.
  3. Skapa en kopieringsaktivitet för att kopiera infogade/uppdaterade/borttagna data mellan CDC-tabellen till Azure Blob Storage.

Om du inte har någon Azure-prenumeration skapar du ett kostnadsfritt konto innan du börjar.

Förutsättningar

  • Azure SQL Managed Instance. Du använder databasen som källa för datalagringen. Om du inte har någon Hanterad Azure SQL-instans kan du läsa artikeln Skapa en hanterad Azure SQL Database-instans för steg för att skapa en.
  • Azure Storage-konto. Du kan använda blob-lagringen som mottagare för datalagringen. Om du inte har ett Azure Storage-konto finns det anvisningar om hur du skapar ett i artikeln Skapa ett lagringskonto . Skapa en container med namnet raw.

Skapa en datakällatabell i Azure SQL Database

  1. Starta SQL Server Management Studio och anslut till din Azure SQL Managed Instances-server.

  2. I Server Explorer högerklickar du på databasen och väljer Ny fråga.

  3. Kör följande SQL-kommando mot din Azure SQL Managed Instances-databas för att skapa en tabell med namnet customers som datakälllager.

    create table customers 
    (
    customer_id int, 
    first_name varchar(50), 
    last_name varchar(50), 
    email varchar(100), 
    city varchar(50), CONSTRAINT "PK_Customers" PRIMARY KEY CLUSTERED ("customer_id") 
     );
    
  4. Aktivera mekanismen för ändringsdatainsamling i databasen och källtabellen (kunder) genom att köra följande SQL-fråga:

    Kommentar

    • Ersätt <källschemanamnet> med schemat för din Azure SQL MI som har tabellen kunder.
    • Ändringsdatainsamling gör ingenting som en del av de transaktioner som ändrar tabellen som spåras. I stället skrivs åtgärderna insert, update och delete till transaktionsloggen. Data som deponeras i ändringstabeller växer ohanterligt om du inte regelbundet och systematiskt rensar data. Mer information finns i Aktivera ändringsdatainsamling för en databas
    EXEC sys.sp_cdc_enable_db 
    
    EXEC sys.sp_cdc_enable_table
    @source_schema = 'dbo',
    @source_name = 'customers', 
    @role_name = NULL,
    @supports_net_changes = 1
    
  5. Infoga data i kundtabellen genom att köra följande kommando:

     insert into customers 
         (customer_id, first_name, last_name, email, city) 
     values 
         (1, 'Chevy', 'Leward', 'cleward0@mapy.cz', 'Reading'),
         (2, 'Sayre', 'Ateggart', 'sateggart1@nih.gov', 'Portsmouth'),
        (3, 'Nathalia', 'Seckom', 'nseckom2@blogger.com', 'Portsmouth');
    

    Kommentar

    Inga historiska ändringar i tabellen samlas in innan ändringsdatainsamling aktiveras.

Skapa en datafabrik

Följ stegen i artikeln Snabbstart: Skapa en datafabrik med hjälp av Azure Portal för att skapa en datafabrik om du inte redan har en att arbeta med.

Skapa länkade tjänster

Du kan skapa länkade tjänster i en datafabrik för att länka ditt datalager och beräkna datafabrik-tjänster. I det här avsnittet skapar du länkade tjänster till ditt Azure Storage-konto och Azure SQL MI.

Skapa en länkad Azure-lagringstjänst.

I det här steget länkar du ditt Azure-lagringskonto till datafabriken.

  1. Klicka på Anslutningar och sedan på + Ny.

    Knapp för ny anslutning

  2. I fönstret New Linked Service (Ny länkad tjänst) väljer du Azure Blob Storage och klickar på Fortsätt.

    Välj Azure Blob Storage

  3. Utför följande steg i fönstret New Linked Service (Ny länkad tjänst):

    1. Ange AzureStorageLinkedService som namn.
    2. Välj ditt Azure Storage-konto i Lagringskontonamn.
    3. Klicka på Spara.

    Inställningar för Azure Storage-konto

Skapa en länkad Azure SQL MI Database-tjänst.

I det här steget länkar du din Azure SQL MI-databas till datafabriken.

Kommentar

För dem som använder SQL MI, se här för information om åtkomst via offentlig eller privat slutpunkt. Om du använder en privat slutpunkt måste du köra den här pipelinen med hjälp av en lokalt installerad integrationskörning. Samma sak gäller för dem som kör SQL Server lokalt, i scenarier med virtuella datorer eller virtuella nätverk.

  1. Klicka på Anslutningar och sedan på + Ny.

  2. I fönstret Ny länkad tjänst väljer du Azure SQL Database Managed Instance och klickar på Fortsätt.

  3. Utför följande steg i fönstret New Linked Service (Ny länkad tjänst):

    1. Ange AzureSqlMI1 som namnfält.
    2. Välj sql-servern för fältet Servernamn .
    3. Välj sql-databasen för fältet Databasnamn .
    4. Ange användarens namn i fältet Användarnamn.
    5. Ange användarens lösenord i fältet Lösenord.
    6. Testa anslutningen genom att klicka på Testa anslutning.
    7. Klicka på Spara för att spara den länkade tjänsten.

    Länkade tjänstinställningar för Azure SQL MI Database

Skapa datauppsättningar

I det här steget skapar du datauppsättningar som representerar datakällan och datamålet.

Skapa en datauppsättning för att representera källdata

I det här steget skapar du en datamängd för att representera källdata.

  1. I trädvyn klickar du på + (plus) och sedan på Datauppsättning.

    Menyn Ny datauppsättning

  2. Välj Azure SQL Database Managed Instance och klicka på Fortsätt.

    Typ av källdatauppsättning – Azure SQL Database

  3. På fliken Ange egenskaper anger du datauppsättningens namn och anslutningsinformation:

    1. Välj AzureSqlMI1 för länkad tjänst.
    2. Välj [dbo].[ dbo_customers_CT] som Tabellnamn. Obs! Den här tabellen skapades automatiskt när CDC aktiverades i kundtabellen. Ändrade data efterfrågas aldrig direkt från den här tabellen, utan extraheras i stället via CDC-funktionerna.

    Källanslutning

Skapa en datauppsättning för att representera data som kopierats till datalagret för mottagare.

I det här steget skapar du en datamängd för att representera data som kopieras från källdatalagret. Du skapade data lake-containern i Azure Blob Storage som en del av förutsättningarna. Skapa containern om den inte finns (eller) ställ in den för namnet på en befintlig. I den här självstudien genereras utdatafilens namn dynamiskt med hjälp av utlösartiden, som kommer att konfigureras senare.

  1. I trädvyn klickar du på + (plus) och sedan på Datauppsättning.

    Menyn Ny datauppsättning

  2. Välj Azure Blob Storage och klicka på Fortsätt.

    Typ av datauppsättning för mottagare – Azure Blob Storage

  3. Välj AvgränsadText och klicka på Fortsätt.

    Format för datauppsättning för mottagare – Avgränsadtext

  4. På fliken Ange egenskaper anger du datauppsättningens namn och anslutningsinformation:

    1. Välj AzureStorageLinkedService för Länkad tjänst.
    2. Ange raw för containerdelen av filePath.
    3. Aktivera första raden som rubrik
    4. Klicka på Ok

    Datauppsättning för mottagare – anslutning

Skapa en pipeline för att kopiera ändrade data

I det här steget skapar du en pipeline som först kontrollerar antalet ändrade poster som finns i ändringstabellen med hjälp av en uppslagsaktivitet. En IF-villkorsaktivitet kontrollerar om antalet ändrade poster är större än noll och kör en kopieringsaktivitet för att kopiera infogade/uppdaterade/borttagna data från Azure SQL Database till Azure Blob Storage. Slutligen konfigureras en utlösare för rullande fönster och start- och sluttiderna skickas till aktiviteterna som start- och slutfönsterparametrar.

  1. I användargränssnittet för Data Factory växlar du till fliken Redigera . Klicka på + (plus) i det vänstra fönstret och klicka på Pipeline.

    Meny för ny pipeline

  2. En ny flik öppnas för inställningar för pipelinen. Du kan också se pipelinen i trädvyn. I fönstret Egenskaper ändrar du pipelinenamnet till IncrementalCopyPipeline.

    Pipelinenamn

  3. Visa verktygslådan Allmänt i verktygslådan Aktiviteter och dra och släpp sökningen på pipelinedesignytan. Ange namnet på aktiviteten till GetChangeCount. Den här aktiviteten hämtar antalet poster i ändringstabellen för ett visst tidsfönster.

    Namn för sökningsaktivitet

  4. Växla till Inställningar i fönstret Egenskaper :

    1. Ange SQL MI-datauppsättningens namn för fältet Källdatauppsättning .

    2. Välj alternativet Fråga och ange följande i frågerutan:

    DECLARE  @from_lsn binary(10), @to_lsn binary(10);  
    SET @from_lsn =sys.fn_cdc_get_min_lsn('dbo_customers');  
    SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal',  GETDATE());
    SELECT count(1) changecount FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, 'all')
    
    1. Aktivera endast första raden

    Inställningar för sökningsaktivitet

  5. Klicka på knappen Förhandsgranska data för att se till att ett giltigt utdata hämtas av uppslagsaktiviteten

    Uppslagsaktivitet – förhandsversion

  6. Expandera Iteration och villkor i verktygslådan Aktiviteter och dra och släpp aktiviteten If Condition till pipelinedesignerns yta. Ange namnet på aktiviteten till HasChangedRows.

    If Condition Activity – namn

  7. Växla till aktiviteterna i fönstret Egenskaper:

    1. Ange följande uttryck
    @greater(int(activity('GetChangeCount').output.firstRow.changecount),0)
    
    1. Klicka på pennikonen för att redigera villkoret Sant.

    If Condition Activity – inställningar

    1. Expandera Allmänt i verktygslådan Aktiviteter och dra och släpp en vänta-aktivitet till pipelinedesignytan. Det här är en tillfällig aktivitet för att felsöka villkoret If och ändras senare i självstudien.

    If Condition True – vänta

    1. Klicka på sökvägsrutan IncrementalCopyPipeline för att återgå till huvudpipelinen.
  8. Kör pipelinen i felsökningsläge för att verifiera att pipelinen körs.

    Pipeline – felsöka

  9. Gå sedan tillbaka till steget Sant villkor och ta bort aktiviteten Vänta . I verktygslådan Aktiviteter expanderar du Flytta och transformerar och drar och släpper en kopieringsaktivitet till pipelinedesignytan. Ange IncrementalCopyActivity som namn på aktiviteten.

    Kopiera aktivitet – namn

  10. Växla till fliken Källa i fönstret Egenskaper och utför följande steg:

  11. Ange SQL MI-datauppsättningens namn för fältet Källdatauppsättning .

  12. Välj Fråga för Använd fråga.

  13. Ange följande för Fråga.

    DECLARE @from_lsn binary(10), @to_lsn binary(10); 
    SET @from_lsn =sys.fn_cdc_get_min_lsn('dbo_customers'); 
    SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal', GETDATE());
    SELECT * FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, 'all')
    

Kopiera aktivitet – källinställningar

  1. Klicka på förhandsgranska för att kontrollera att frågan returnerar de ändrade raderna korrekt.

    Skärmbild som visar förhandsgranskning för att verifiera frågan.

  2. Växla till fliken Mottagare och ange Azure Storage-datauppsättningen för fältet Mottagardatauppsättning .

    Skärmbild som visar fliken Mottagare.

  3. Klicka tillbaka till huvudarbetsytan för pipelinen och anslut uppslagsaktiviteten till if condition-aktiviteten en i taget. Dra den gröna knappen som är kopplad till uppslagsaktiviteten till if condition-aktiviteten .

    Ansluta sökning- och kopieringsaktiviteter

  4. Klicka på Verifiera i verktygsfältet. Kontrollera att det inte finns några verifieringsfel. Stäng fönstret med verifieringsrapporten för pipeline genom att klicka på >>.

    Verifieringsknapp

  5. Klicka på Felsök för att testa pipelinen och kontrollera att en fil genereras på lagringsplatsen.

    Inkrementell pipeline debug-2

  6. Publicera entiteter (länkade tjänster, datauppsättningar och pipelines) till Data Factory-tjänsten genom att klicka på knappen Publicera alla . Vänta tills du ser meddelandet Publiceringen är klar.

    Knappen Publicera

Konfigurera parametrarna för utlösare för rullande fönster och CDC-fönster

I det här steget skapar du en utlösare för rullande fönster för att köra jobbet enligt ett vanligt schema. Du använder systemvariablerna WindowStart och WindowEnd i utlösaren för rullande fönster och skickar dem som parametrar till din pipeline som ska användas i CDC-frågan.

  1. Gå till fliken Parametrar i pipelinen IncrementalCopyPipeline och lägg till två parametrar (triggerStartTime och triggerEndTime) i pipelinen med knappen + Ny, vilket representerar start- och sluttiden för rullande fönster. I felsökningssyfte lägger du till standardvärden i formatet YYYY-MM-DD HH24:MI:SS.FFF , men se till att triggerStartTime inte är innan CDC aktiveras i tabellen, annars resulterar det i ett fel.

    Menyn Utlös nu

  2. Klicka på inställningsfliken för uppslagsaktiviteten och konfigurera frågan så att den använder start- och slutparametrarna. Kopiera följande till frågan:

    @concat('DECLARE @begin_time datetime, @end_time datetime, @from_lsn binary(10), @to_lsn binary(10); 
    SET @begin_time = ''',pipeline().parameters.triggerStartTime,''';
    SET @end_time = ''',pipeline().parameters.triggerEndTime,''';
    SET @from_lsn = sys.fn_cdc_map_time_to_lsn(''smallest greater than or equal'', @begin_time);
    SET @to_lsn = sys.fn_cdc_map_time_to_lsn(''largest less than'', @end_time);
    SELECT count(1) changecount FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, ''all'')')
    
  3. Gå till kopieringsaktiviteten i true-fallet för if condition-aktiviteten och klicka på fliken Källa. Kopiera följande till frågan:

    @concat('DECLARE @begin_time datetime, @end_time datetime, @from_lsn binary(10), @to_lsn binary(10); 
    SET @begin_time = ''',pipeline().parameters.triggerStartTime,''';
    SET @end_time = ''',pipeline().parameters.triggerEndTime,''';
    SET @from_lsn = sys.fn_cdc_map_time_to_lsn(''smallest greater than or equal'', @begin_time);
    SET @to_lsn = sys.fn_cdc_map_time_to_lsn(''largest less than'', @end_time);
    SELECT * FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, ''all'')')
    
  4. Klicka på fliken Mottagare i aktiviteten Kopiera och klicka på Öppna för att redigera datauppsättningsegenskaperna. Klicka på fliken Parametrar och lägg till en ny parameter med namnet triggerStart

    Skärmbild som visar hur du lägger till en ny parameter på fliken Parametrar.

  5. Konfigurera sedan datauppsättningsegenskaperna för att lagra data i en kund/inkrementell underkatalog med datumbaserade partitioner.

    1. Klicka på fliken Anslutning för datamängdsegenskaperna och lägg till dynamiskt innehåll för både katalog- och filavsnitten.

    2. Ange följande uttryck i avsnittet Katalog genom att klicka på länken för dynamiskt innehåll under textrutan:

      @concat('customers/incremental/',formatDateTime(dataset().triggerStart,'yyyy/MM/dd'))
      
    3. Ange följande uttryck i avsnittet Arkiv . Detta skapar filnamn baserat på utlösarens startdatum och tid, suffix med csv-tillägget:

      @concat(formatDateTime(dataset().triggerStart,'yyyyMMddHHmmssfff'),'.csv')
      

      Konfiguration av mottagardatauppsättning-3

    4. Gå tillbaka till mottagarinställningarna i Kopieringsaktivitet genom att klicka på fliken IncrementalCopyPipeline.

    5. Expandera egenskaperna för datamängden och ange dynamiskt innehåll i parametervärdet triggerStart med följande uttryck:

      @pipeline().parameters.triggerStartTime
      

    Konfiguration av datauppsättning för mottagare-4

  6. Klicka på Felsök för att testa pipelinen och se till att mappstrukturen och utdatafilen genereras som förväntat. Ladda ned och öppna filen för att verifiera innehållet.

    Inkrementell kopieringsfelsökning-3

  7. Kontrollera att parametrarna matas in i frågan genom att granska indataparametrarna för pipelinekörningen.

    Inkrementell kopieringsfelsökning-4

  8. Publicera entiteter (länkade tjänster, datauppsättningar och pipelines) till Data Factory-tjänsten genom att klicka på knappen Publicera alla . Vänta tills du ser meddelandet Publiceringen är klar.

  9. Konfigurera slutligen en utlösare för rullande fönster för att köra pipelinen med ett regelbundet intervall och ange parametrar för start- och sluttid.

    1. Klicka på knappen Lägg till utlösare och välj Ny/Redigera

    Lägg till ny utlösare

    1. Ange ett utlösarnamn och ange en starttid som är lika med sluttiden för felsökningsfönstret ovan.

    Utlösare för rullande fönster

    1. På nästa skärm anger du följande värden för start- respektive slutparametrarna.

      @formatDateTime(trigger().outputs.windowStartTime,'yyyy-MM-dd HH:mm:ss.fff')
      @formatDateTime(trigger().outputs.windowEndTime,'yyyy-MM-dd HH:mm:ss.fff')
      

      Utlösare för rullande fönster-2

Kommentar

Utlösaren körs bara när den har publicerats. Dessutom är det förväntade beteendet för rullande fönster att köra alla historiska intervall från startdatumet fram till nu. Mer information om utlösare för rullande fönster finns här.

  1. Använd SQL Server Management Studio för att göra några ytterligare ändringar i kundtabellen genom att köra följande SQL:

    insert into customers (customer_id, first_name, last_name, email, city) values (4, 'Farlie', 'Hadigate', 'fhadigate3@zdnet.com', 'Reading');
    insert into customers (customer_id, first_name, last_name, email, city) values (5, 'Anet', 'MacColm', 'amaccolm4@yellowbook.com', 'Portsmouth');
    insert into customers (customer_id, first_name, last_name, email, city) values (6, 'Elonore', 'Bearham', 'ebearham5@ebay.co.uk', 'Portsmouth');
    update customers set first_name='Elon' where customer_id=6;
    delete from customers where customer_id=5;
    
  2. Klicka på knappen Publicera alla . Vänta tills du ser meddelandet Publiceringen är klar.

  3. Efter några minuter har pipelinen utlösts och en ny fil har lästs in i Azure Storage

Övervaka den inkrementella kopieringspipelinen

  1. Klicka på fliken Övervaka till vänster. Du ser pipelinekörningen samt dess status i listan. Om du vill uppdatera listan klickar du på Uppdatera. Hovra nära namnet på pipelinen för att få åtkomst till åtgärden Kör om och förbrukningsrapporten.

    Pipelinekörningar

  2. Om du vill visa aktivitetskörningar som är associerade med pipelinekörningen klickar du på pipelinenamnet. Om ändrade data har identifierats kommer det att finnas tre aktiviteter, inklusive kopieringsaktiviteten, annars finns det bara två poster i listan. Om du vill växla tillbaka till pipelinekörningsvyn klickar du på länken Alla pipelines högst upp.

    Aktivitetskörningar

Granska resultaten

Du ser den andra filen i mappen customers/incremental/YYYY/MM/DD i containern raw.

Utdatafil från inkrementell säkerhetskopia

Fortsätt till följande självstudie och lär dig mer om hur du kopierar nya och ändrade filer endast baserat på deras LastModifiedDate: