Udostępnij za pośrednictwem


Przyrostowe ładowanie danych z wielu tabel w programie SQL Server do bazy danych w usłudze Azure SQL Database przy użyciu witryny Azure Portal

DOTYCZY: Azure Data Factory Azure Synapse Analytics

Napiwek

Wypróbuj usługę Data Factory w usłudze Microsoft Fabric — rozwiązanie analityczne typu all-in-one dla przedsiębiorstw. Usługa Microsoft Fabric obejmuje wszystko, od przenoszenia danych do nauki o danych, analizy w czasie rzeczywistym, analizy biznesowej i raportowania. Dowiedz się, jak bezpłatnie rozpocząć nową wersję próbną !

W tym samouczku utworzysz usługę Azure Data Factory z potokiem, który ładuje dane różnicowe z wielu tabel w bazie danych programu SQL Server do bazy danych w usłudze Azure SQL Database.

Ten samouczek obejmuje następujące procedury:

  • Przygotowanie źródłowych i docelowych magazynów danych
  • Tworzenie fabryki danych.
  • Utwórz własne środowisko Integration Runtime.
  • Instalowanie środowiska Integration Runtime.
  • Tworzenie połączonych usług.
  • Tworzenie zestawów danych źródła, ujścia i limitu.
  • Tworzenie, uruchamianie i monitorowanie potoku
  • Przejrzyj wyniki.
  • Dodawanie lub aktualizowanie danych w tabelach źródłowych.
  • Ponowne uruchamianie i monitorowanie potoku.
  • Przegląd wyników końcowych.

Omówienie

Poniżej przedstawiono ważne czynności związane z tworzeniem tego rozwiązania:

  1. Wybierz kolumnę limitu.

    Wybierz jedną kolumnę dla każdej tabeli w magazynie danych źródłowych, która może służyć do identyfikowania nowych lub zaktualizowanych rekordów dla każdego uruchomienia. Zazwyczaj dane w tej wybranej kolumnie (na przykład last_modify_time lub ID) rosną wraz z tworzeniem i aktualizacją wierszy. Maksymalna wartość w tej kolumnie jest używana jako limit.

  2. Przygotuj magazyn danych do przechowywania wartości limitu.

    W tym samouczku wartość limitu jest przechowywana w bazie danych SQL.

  3. Utwórz potok z następującymi działaniami:

    a. Utwórz działanie ForEach służące do przeprowadzania iteracji po liście nazw tabel źródłowych przekazywanych jako parametr do potoku. Dla każdej tabeli źródłowej wywołuje ono następujące działania służące do wykonywania ładowania przyrostowego dla tej tabeli.

    b. Utwórz dwa działania lookup. Użyj pierwszego działania Lookup do pobrania ostatniej wartości limitu. Użyj drugiego działania Lookup do pobrania nowej wartości limitu. Te wartości limitu są przekazywane do działania Copy.

    c. Utwórz działanie Copy, które kopiuje wiersze z magazynu danych źródłowych o wartości kolumny limitu większej niż poprzednia wartość limitu i mniejszej niż nowa wartość limitu. Następnie kopiuje dane różnicowe ze źródłowego magazynu danych do usługi Azure Blob Storage jako nowy plik.

    d. Utwórz działanie StoredProcedure, które aktualizuje wartość limitu dla potoku przy następnym uruchomieniu.

    Diagram ogólny rozwiązania wygląda następująco:

    Przyrostowe ładowanie danych

Jeśli nie masz subskrypcji platformy Azure, przed rozpoczęciem utwórz bezpłatne konto.

Wymagania wstępne

  • SQL Server. Baza danych programu SQL Server jest używana jako źródłowy magazyn danych w tym samouczku.
  • Usługa Azure SQL Database. Baza danych w usłudze Azure SQL Database jest używana jako magazyn danych ujścia. Jeśli nie masz bazy danych w usłudze SQL Database, zobacz Tworzenie bazy danych w usłudze Azure SQL Database , aby uzyskać instrukcje tworzenia bazy danych.

Tworzenie tabel źródłowych w bazie danych SQL Server

  1. Otwórz program SQL Server Management Studio i połącz się z bazą danych programu SQL Server.

  2. W Eksploratorze serwera kliknij prawym przyciskiem myszy bazę danych, a następnie wybierz pozycję Nowe zapytanie.

  3. Uruchom następujące polecenie SQL względem bazy danych w celu utworzenia tabel o nazwach customer_table i project_table:

    create table customer_table
    (
        PersonID int,
        Name varchar(255),
        LastModifytime datetime
    );
    
    create table project_table
    (
        Project varchar(255),
        Creationtime datetime
    );
    
    INSERT INTO customer_table
    (PersonID, Name, LastModifytime)
    VALUES
    (1, 'John','9/1/2017 12:56:00 AM'),
    (2, 'Mike','9/2/2017 5:23:00 AM'),
    (3, 'Alice','9/3/2017 2:36:00 AM'),
    (4, 'Andy','9/4/2017 3:21:00 AM'),
    (5, 'Anny','9/5/2017 8:06:00 AM');
    
    INSERT INTO project_table
    (Project, Creationtime)
    VALUES
    ('project1','1/1/2015 0:00:00 AM'),
    ('project2','2/2/2016 1:23:00 AM'),
    ('project3','3/4/2017 5:16:00 AM');
    
    

Tworzenie tabel docelowych w bazie danych

  1. Otwórz program SQL Server Management Studio i połącz się z bazą danych w usłudze Azure SQL Database.

  2. W Eksploratorze serwera kliknij prawym przyciskiem myszy bazę danych, a następnie wybierz pozycję Nowe zapytanie.

  3. Uruchom następujące polecenie SQL względem bazy danych w celu utworzenia tabel o nazwach customer_table i project_table:

    create table customer_table
    (
        PersonID int,
        Name varchar(255),
        LastModifytime datetime
    );
    
    create table project_table
    (
        Project varchar(255),
        Creationtime datetime
    );
    
    

Utwórz inną tabelę w bazie danych, aby przechowywać wartość wysokiego limitu

  1. Uruchom następujące polecenie SQL względem bazy danych, aby utworzyć tabelę o nazwie watermarktable do przechowywania wartości limitu:

    create table watermarktable
    (
    
        TableName varchar(255),
        WatermarkValue datetime,
    );
    
  2. Wstaw początkowe wartości limitu dla obu tabel źródłowych w tabeli wartości limitu.

    
    INSERT INTO watermarktable
    VALUES 
    ('customer_table','1/1/2010 12:00:00 AM'),
    ('project_table','1/1/2010 12:00:00 AM');
    
    

Tworzenie procedury składowanej w bazie danych

Uruchom następujące polecenie, aby utworzyć procedurę składowaną w bazie danych. Ta procedura składowana służy do aktualizowania wartość limitu po każdym uruchomieniu potoku.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime 
WHERE [TableName] = @TableName

END

Tworzenie typów danych i dodatkowych procedur składowanych w bazie danych

Uruchom następujące zapytanie, aby utworzyć dwie procedury składowane i dwa typy danych w bazie danych. Służą one do scalania danych z tabel źródłowych w tabelach docelowych.

Aby ułatwić rozpoczęcie podróży, użyjemy bezpośrednio tych procedur składowanych przekazujących dane różnicowe za pośrednictwem zmiennej tabeli, a następnie scalimy je z magazynem docelowym. Należy zachować ostrożność, że nie oczekuje się, że w zmiennej tabeli będzie przechowywana "duża" liczba wierszy różnicowych (ponad 100).

Jeśli musisz scalić dużą liczbę wierszy różnicowych z magazynem docelowym, zalecamy użycie działania kopiowania w celu skopiowania wszystkich danych różnicowych do tymczasowej tabeli przejściowej w magazynie docelowym, a następnie skompilowanie własnej procedury składowanej bez użycia zmiennej tabeli tabeli w celu scalenia ich z tabeli "przejściowej" do tabeli "końcowej".

CREATE TYPE DataTypeforCustomerTable AS TABLE(
    PersonID int,
    Name varchar(255),
    LastModifytime datetime
);

GO

CREATE PROCEDURE usp_upsert_customer_table @customer_table DataTypeforCustomerTable READONLY
AS

BEGIN
  MERGE customer_table AS target
  USING @customer_table AS source
  ON (target.PersonID = source.PersonID)
  WHEN MATCHED THEN
      UPDATE SET Name = source.Name,LastModifytime = source.LastModifytime
  WHEN NOT MATCHED THEN
      INSERT (PersonID, Name, LastModifytime)
      VALUES (source.PersonID, source.Name, source.LastModifytime);
END

GO

CREATE TYPE DataTypeforProjectTable AS TABLE(
    Project varchar(255),
    Creationtime datetime
);

GO

CREATE PROCEDURE usp_upsert_project_table @project_table DataTypeforProjectTable READONLY
AS

BEGIN
  MERGE project_table AS target
  USING @project_table AS source
  ON (target.Project = source.Project)
  WHEN MATCHED THEN
      UPDATE SET Creationtime = source.Creationtime
  WHEN NOT MATCHED THEN
      INSERT (Project, Creationtime)
      VALUES (source.Project, source.Creationtime);
END

Tworzenie fabryki danych

  1. Uruchom przeglądarkę internetową Microsoft Edge lub Google Chrome. Obecnie interfejs użytkownika usługi Data Factory jest obsługiwany tylko przez przeglądarki internetowe Microsoft Edge i Google Chrome.

  2. W menu po lewej stronie wybierz pozycję Utwórz zasób>Integration>Data Factory:

    Wybór usługi Data Factory w

  3. Na stronie Nowa fabryka danych wprowadź wartość ADFMultiIncCopyTutorialDF w polu nazwa.

    Nazwa usługi Azure Data Factory musi być globalnie unikatowa. Jeśli pojawi się czerwony wykrzyknik z poniższym błędem, zmień nazwę fabryki danych (np. twojanazwaADFIncCopyTutorialDF) i spróbuj utworzyć ją ponownie. Artykuł Data Factory — Naming Rules (Usługa Data Factory — reguły nazewnictwa) zawiera reguły nazewnictwa artefaktów usługi Data Factory.

    Data factory name "ADFIncCopyTutorialDF" is not available

  4. Wybierz subskrypcję Azure, w której chcesz utworzyć fabrykę danych.

  5. Dla opcji Grupa zasobów wykonaj jedną z następujących czynności:

    • Wybierz pozycję Użyj istniejącej, a następnie wybierz istniejącą grupę zasobów z listy rozwijanej.
    • Wybierz pozycję Utwórz nową, a następnie wprowadź nazwę grupy zasobów.
      Informacje na temat grup zasobów znajdują się w artykule Using resource groups to manage your Azure resources (Używanie grup zasobów do zarządzania zasobami platformy Azure).
  6. Wybierz opcję V2 w obszarze Wersja.

  7. Na liście lokalizacja wybierz lokalizację fabryki danych. Na liście rozwijanej są wyświetlane tylko obsługiwane lokalizacje. Magazyny danych (Azure Storage, Azure SQL Database itp.) i jednostki obliczeniowe (HDInsight itp.) używane przez fabrykę danych mogą mieścić się w innych regionach.

  8. Kliknij pozycję Utwórz.

  9. Po zakończeniu tworzenia zostanie wyświetlona strona Fabryka danych, jak pokazano na poniższej ilustracji.

    Strona główna usługi Azure Data Factory z kafelkiem Otwórz usługę Azure Data Factory Studio.

  10. Wybierz pozycję Otwórz na kafelku Otwórz usługę Azure Data Factory Studio , aby uruchomić interfejs użytkownika usługi Azure Data Factory na osobnej karcie.

Create self-hosted integration runtime (Tworzenie środowiska Integration Runtime (Self-hosted)

Podczas przenoszenia danych z magazynu danych w sieci prywatnej (lokalnej) do magazynu danych platformy Azure zainstaluj własne środowisko Integration Runtime (IR) w środowisku lokalnym. Własne środowisko IR przenosi dane między siecią prywatną a platformą Azure.

  1. Na stronie głównej interfejsu użytkownika usługi Azure Data Factory wybierz kartę Zarządzanie w okienku po lewej stronie.

    Przycisk Zarządzaj stroną główną

  2. Wybierz pozycję Środowiska Integration Runtime w okienku po lewej stronie, a następnie wybierz pozycję +Nowy.

    Tworzenie środowiska Integration Runtime

  3. W oknie Konfiguracja środowiska Integration Runtime wybierz pozycję Wykonaj przenoszenie danych i wysyłanie działań do obliczeń zewnętrznych, a następnie kliknij przycisk Kontynuuj.

  4. Wybierz pozycję Self-Hosted (Self-Hosted), a następnie kliknij przycisk Kontynuuj.

  5. Wprowadź wartość MySelfHostedIR w polu Nazwa, a następnie kliknij pozycję Utwórz.

  6. Kliknij pozycję Click here to launch the express setup for this computer (Kliknij tutaj, aby uruchomić instalację ekspresową dla tego komputera) w sekcji Opcja 1: Instalacja ekspresowa.

    Kliknij link instalacji ekspresowej

  7. W oknie Instalacja ekspresowa środowiska Integration Runtime (Self-hosted) kliknij przycisk Zamknij.

    Instalacja środowiska Integration Runtime — pomyślna

  8. W przeglądarce internetowej w oknie instalatora środowiska Integration Runtime kliknij przycisk Zakończ.

  9. Upewnij się, że na liście środowisk Integration Runtime jest widoczna pozycja MySelfHostedIR.

Tworzenie połączonych usług

Połączone usługi tworzy się w fabryce danych w celu połączenia magazynów danych i usług obliczeniowych z fabryką danych. W tej sekcji utworzysz połączone usługi z bazą danych programu SQL Server i bazą danych w usłudze Azure SQL Database.

Tworzenie usługi połączonej z serwerem SQL Server

W tym kroku połączysz bazę danych programu SQL Server z fabryką danych.

  1. W oknie Połączenia przejdź z karty Środowiska Integration Runtime do karty Połączone usługi, a następnie kliknij pozycję + Nowa.

    Nowa połączona usługa.

  2. W oknie Nowa połączona usługa wybierz pozycję SQL Server, a następnie kliknij pozycję Kontynuuj.

  3. W oknie Nowa połączona usługa wykonaj następujące czynności:

    1. Wprowadź jako nazwę wartość SqlServerLinkedService.
    2. Wybierz pozycję MySelfHostedIR w polu Połącz za pośrednictwem środowiska Integration Runtime. Jest to ważny krok. Domyślne środowisko Integration Runtime nie może połączyć się z lokalnym magazynem danych. Użyj własnego środowiska Integration Runtime utworzonego wcześniej.
    3. W polu Nazwa serwera wprowadź nazwę komputera, na którym znajduje się baza danych programu SQL Server.
    4. W polu Nazwa bazy danych wprowadź nazwę bazy danych programu SQL Server, która zawiera dane źródłowe. Tworzenie tabeli i wstawienie danych do tej bazy danych zostało przeprowadzone w ramach wymagań wstępnych.
    5. W polu Typ uwierzytelniania wybierz typ uwierzytelniania, którego chcesz używać podczas łączenia się z bazą danych.
    6. W polu Nazwa użytkownika wprowadź nazwę użytkownika mającego dostęp do bazy danych programu SQL Server. Jeśli musisz użyć znaku ukośnika (\) w nazwie konta użytkownika lub nazwie serwera, użyj znaku ucieczki (\). Może to być na przykład mydomain\\myuser.
    7. W polu Hasło wprowadź hasło użytkownika.
    8. Aby sprawdzić, czy fabryka danych może połączyć się z bazą danych programu SQL Server, kliknij pozycję Testuj połączenie. Usuń wszelkie błędy, tak aby połączenie zostało nawiązane pomyślnie.
    9. Aby zapisać połączoną usługę, kliknij przycisk Zakończ.

Tworzenie połączonej usługi Azure SQL Database

W ostatnim kroku utworzysz połączoną usługę w celu połączenia źródłowej bazy danych programu SQL Server z fabryką danych. W tym kroku połączysz docelową/ujścia bazę danych z fabryką danych.

  1. W oknie Połączenia przejdź z karty Środowiska Integration Runtime do karty Połączone usługi, a następnie kliknij pozycję + Nowa.

  2. W oknie Nowa połączona usługa wybierz pozycję Azure SQL Database, a następnie kliknij pozycję Kontynuuj.

  3. W oknie Nowa połączona usługa wykonaj następujące czynności:

    1. Wprowadź wartość AzureSqlDatabaseLinkedService w polu Nazwa.
    2. W polu Nazwa serwera wybierz nazwę serwera z listy rozwijanej.
    3. W polu Nazwa bazy danych wybierz bazę danych, w której utworzono customer_table i project_table w ramach wymagań wstępnych.
    4. W polu Nazwa użytkownika wprowadź nazwę użytkownika, który ma dostęp do bazy danych.
    5. W polu Hasło wprowadź hasło użytkownika.
    6. Aby sprawdzić, czy fabryka danych może połączyć się z bazą danych programu SQL Server, kliknij pozycję Testuj połączenie. Usuń wszelkie błędy, tak aby połączenie zostało nawiązane pomyślnie.
    7. Aby zapisać połączoną usługę, kliknij przycisk Zakończ.
  4. Upewnij się, że lista zawiera dwie połączone usługi.

    Dwie połączone usługi

Tworzenie zestawów danych

W tym kroku utworzysz zestawy danych reprezentujące źródło danych, docelową lokalizację danych i lokalizację, w której będzie przechowywana wartość limitu.

Tworzenie zestawu danych źródłowych

  1. W lewym okienku kliknij pozycję + (plus), a następnie kliknij pozycję Zestaw danych.

  2. W oknie Nowy zestaw danych wybierz pozycję SQL Server, kliknij przycisk Kontynuuj.

  3. W przeglądarce sieci Web zostanie otwarta nowa karta służąca do konfigurowania zestawu danych. Zestaw danych jest również widoczny w widoku drzewa. U dołu karty Ogólne w oknie właściwości wprowadź wartość SourceDataset w polu Nazwa.

  4. Przejdź do karty Połączenie w oknie właściwości, a następnie wybierz pozycję SqlServerLinkedService w polu Połączona usługa. Nie należy wybierać tabeli w tym miejscu. Działanie Copy w potoku korzysta z zapytania SQL do załadowania danych, a nie całej tabeli.

    Źródłowy zestaw danych — połączenie

Tworzenie ujścia zestawu danych

  1. W lewym okienku kliknij pozycję + (plus), a następnie kliknij pozycję Zestaw danych.

  2. W oknie Nowy zestaw danych wybierz pozycję Azure SQL Database, a następnie kliknij przycisk Kontynuuj.

  3. W przeglądarce sieci Web zostanie otwarta nowa karta służąca do konfigurowania zestawu danych. Zestaw danych jest również widoczny w widoku drzewa. U dołu karty Ogólne w oknie właściwości wprowadź wartość SinkDataset w polu Nazwa.

  4. Przejdź do karty Parametry w oknie Właściwości i wykonaj następujące czynności:

    1. Kliknij pozycję Nowy w sekcji Parametry tworzenia/aktualizacji.

    2. Wprowadź wartość SinkTableName w polu nazwa i wartość Ciąg w polu typ. Ten zestaw danych otrzymuje wartość SinkTableName jako parametr. Parametr SinkTableName jest ustawiany dynamicznie przez potok w czasie wykonywania. Działanie ForEach w potoku przeprowadza iterację po liście nazw i przekazuje nazwę tabeli do tego zestawu danych w każdej iteracji.

      Zestaw danych będący ujściem — właściwości

  5. Przejdź do karty Połączenie w okno Właściwości i wybierz pozycję AzureSqlDatabaseLinkedService dla pozycji Połączona usługa. W obszarze właściwości Tabela kliknij pozycję Dodaj zawartość dynamiczną.

  6. W oknie Dodawanie zawartości dynamicznej wybierz pozycję SinkTableName w sekcji Parametry.

  7. Po kliknięciu przycisku Zakończ zostanie wyświetlony komunikat "@dataset(). SinkTableName" jako nazwa tabeli.

    Zestaw danych będący ujściem — połączenie

Tworzenie zestawu danych dla limitu

W tym kroku utworzysz zestaw danych do przechowywania wartości górnego limitu.

  1. W lewym okienku kliknij pozycję + (plus), a następnie kliknij pozycję Zestaw danych.

  2. W oknie Nowy zestaw danych wybierz pozycję Azure SQL Database, a następnie kliknij przycisk Kontynuuj.

  3. U dołu karty Ogólne w oknie właściwości wprowadź wartość WatermarkDataset w polu Nazwa.

  4. Przejdź do karty Połączenie i wykonaj następujące czynności:

    1. Wybierz wartość AzureSqlDatabaseLinkedService w polu Połączona usługa.

    2. Wybierz element [dbo].[watermarktable] dla pozycji Tabela.

      Zestaw danych limitu — połączenie

Tworzenie potoku

Potok przyjmuje listę nazw tabel jako parametr. Działanie ForEach służy do przeprowadzania iteracji po liście nazw tabel i wykonywania następujących operacji:

  1. Użyj działania Lookup do pobrania starej wartość limitu (wartości początkowej lub wartości użytej w ostatniej iteracji).

  2. Użyj działania Lookup do pobrania nowej wartości limitu (maksymalnej wartości kolumny limitu w tabeli źródłowej).

  3. Użyj działania Copy do skopiowania danych między tymi dwiema wartościami limitu ze źródłowej bazy danych do docelowej bazy danych.

  4. Użyj działania StoredProcedure do zaktualizowania starej wartości limitu, która zostanie użyta w pierwszym kroku następnej iteracji.

Tworzenie potoku

  1. W lewym okienku kliknij pozycję + (plus), a następnie kliknij pozycję Potok.

  2. W panelu Ogólne w obszarze Właściwości określ wartość IncrementalCopyPipeline w polu Nazwa. Następnie zwiń panel, klikając ikonę Właściwości w prawym górnym rogu.

  3. Na karcie Parametry wykonaj następujące czynności:

    1. Kliknij pozycję + Nowy.
    2. Wprowadź ciąg tableList jako nazwę parametru.
    3. Wybierz pozycję Tablica dla typu parametru.
  4. W przyborniku Działania rozwiń pozycję Iteracja i warunki, a następnie przeciągnij i upuść działanie ForEach na powierzchni projektanta potoku. Na karcie Ogólne w oknie Właściwości wprowadź wartość IterateSQLTables.

  5. Przejdź do karty Ustawienia, a następnie wprowadź wartość @pipeline().parameters.tableList w polu Elementy. Działanie ForEach przeprowadza iterację po liście tabel i wykonuje operację kopiowania przyrostowego.

    Działanie ForEach — ustawienia

  6. Wybierz działanie ForEach w potoku, jeśli jeszcze nie zostało wybrane. Kliknij przycisk Edytuj (ikonę ołówka).

  7. W przyborniku Działania rozwiń pozycję SQL Database, przeciągnij i upuść działanie Lookup (Wyszukiwanie) na powierzchni projektanta potoku, a następnie wprowadź wartość LookupOldWaterMarkActivity w polu Nazwa.

  8. Przejdź do karty Ustawienia w oknie Właściwości i wykonaj następujące czynności:

    1. Wybierz pozycję WatermarkDataset w polu Zestaw danych będący źródłem.

    2. Wybierz pozycję Zapytanie w polu Użyj zapytania.

    3. W obszarze Zapytanie wprowadź następujące zapytanie SQL.

      select * from watermarktable where TableName  =  '@{item().TABLE_NAME}'
      

      Pierwsze działanie Lookup — ustawienia

  9. Przeciągnij działanie Lookup z przybornika Działania i wprowadź wartość LookupNewWaterMarkActivity w polu Nazwa.

  10. Przejdź do karty Ustawienia.

    1. Wybierz pozycję SourceDataset w obszarze Zestaw danych będący źródłem.

    2. Wybierz pozycję Zapytanie w polu Użyj zapytania.

    3. W obszarze Zapytanie wprowadź następujące zapytanie SQL.

      select MAX(@{item().WaterMark_Column}) as NewWatermarkvalue from @{item().TABLE_NAME}
      

      Drugie działanie Lookup — ustawienia

  11. Przeciągnij działanie Copy (Kopiuj) z przybornika Działania i wprowadź wartość IncrementalCopyActivity w polu Nazwa.

  12. Kolejno połącz działania Lookup z działaniem Copy. Aby utworzyć połączenie, zacznij przeciąganie w zielonym polu połączonym z działaniem Lookup i upuść je na działaniu Copy. Po zmianie koloru obramowania działania Copy na niebieski zwolnij przycisk myszy.

    Łączenie działań Lookup z działaniem Copy

  13. Wybierz działanie Copy w potoku. Przejdź do karty Źródło w oknie Właściwości.

    1. Wybierz pozycję SourceDataset w obszarze Zestaw danych będący źródłem.

    2. Wybierz pozycję Zapytanie w polu Użyj zapytania.

    3. W obszarze Zapytanie wprowadź następujące zapytanie SQL.

      select * from @{item().TABLE_NAME} where @{item().WaterMark_Column} > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and @{item().WaterMark_Column} <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'        
      

      Działanie Copy (Kopiowanie) — ustawienia źródła

  14. Przejdź do karty Ujście i wybierz pozycję SinkDataset w polu Zestaw danych będący ujściem.

  15. Wykonaj poniższe kroki:

    1. We właściwościach zestawu danych dla parametru SinkTableName wprowadź wartość @{item().TABLE_NAME}.

    2. W polu Właściwość Nazwa procedury składowanej wprowadź wartość @{item().StoredProcedureNameForMergeOperation}.

    3. W polu Właściwość typ tabeli wprowadź wartość @{item().TableType}.

    4. W polu Nazwa parametru typu tabeli wprowadź wartość @{item().TABLE_NAME}.

      Działanie Copy — parametry

  16. Przeciągnij działanie Stored Procedure (Procedura składowana) z przybornika Działania do powierzchni projektanta potoku. Połącz działanie Copy z działaniem Stored Procedure.

  17. Wybierz działanie Stored Procedure w potoku, a następnie wprowadź wartość StoredProceduretoWriteWatermarkActivity w polu Nazwa na karcie Ogólne w oknie Właściwości.

  18. Przejdź do karty Konto SQL i wybierz wartość AzureSqlDatabaseLinkedService w polu Połączona usługa.

    Działanie Stored Procedure (Procedura składowana) — konto SQL

  19. Przejdź do karty Procedura składowana i wykonaj następujące czynności:

    1. W polu Nazwa procedury składowanej wybierz wartość [dbo].[usp_write_watermark].

    2. Wybierz pozycję Importuj parametr.

    3. Określ wartości następujących parametrów:

      Nazwisko Typ Wartość
      LastModifiedtime DateTime @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}
      TableName String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}

      Działanie procedury składowanej — ustawienia procedury składowanej

  20. Wybierz pozycję Opublikuj wszystko , aby opublikować jednostki utworzone w usłudze Data Factory.

  21. Poczekaj na wyświetlenie komunikatu Pomyślnie opublikowano. Aby wyświetlić powiadomienia, kliknij link Pokaż powiadomienia. Zamknij okno powiadomień, klikając przycisk X.

Uruchamianie potoku

  1. Na pasku narzędzi potoku kliknij pozycję Dodaj wyzwalacz, a następnie kliknij pozycję Wyzwól teraz.

  2. W oknie Uruchamianie potoku wprowadź następującą wartość dla parametru tableList, a następnie kliknij pozycję Zakończ.

    [
        {
            "TABLE_NAME": "customer_table",
            "WaterMark_Column": "LastModifytime",
            "TableType": "DataTypeforCustomerTable",
            "StoredProcedureNameForMergeOperation": "usp_upsert_customer_table"
        },
        {
            "TABLE_NAME": "project_table",
            "WaterMark_Column": "Creationtime",
            "TableType": "DataTypeforProjectTable",
            "StoredProcedureNameForMergeOperation": "usp_upsert_project_table"
        }
    ]
    

    Argumenty uruchomienia potoku

Monitor the pipeline (Monitorowanie potoku)

  1. Przejdź do karty Monitorowanie po lewej stronie. Zostanie wyświetlone uruchomienie potoku, które zostało wyzwolone za pomocą wyzwalacza ręcznego. Możesz użyć linków w kolumnie NAZWA POTOKu, aby wyświetlić szczegóły działania i ponownie uruchomić potok.

  2. Aby wyświetlić uruchomienia działań skojarzone z uruchomieniem potoku, wybierz link w kolumnie NAZWA POTOKU. Aby uzyskać szczegółowe informacje na temat przebiegów działań, wybierz link Szczegóły (ikona okularów) w kolumnie NAZWA DZIAŁANIA.

  3. Wybierz pozycję Wszystkie uruchomienia potoku u góry, aby wrócić do widoku Uruchomienia potoku. Aby odświeżyć widok, wybierz pozycję Odśwież.

Sprawdzanie wyników

W programu SQL Server Management Studio uruchom następujące zapytania względem docelowej bazy danych Azure SQL Database, aby sprawdzić, czy dane zostały skopiowane z tabel źródłowych do tabel docelowych:

Zapytanie

select * from customer_table

Wyjście

===========================================
PersonID	Name	LastModifytime
===========================================
1	        John	2017-09-01 00:56:00.000
2	        Mike	2017-09-02 05:23:00.000
3	        Alice	2017-09-03 02:36:00.000
4	        Andy	2017-09-04 03:21:00.000
5	        Anny	2017-09-05 08:06:00.000

Zapytanie

select * from project_table

Wyjście

===================================
Project	    Creationtime
===================================
project1	2015-01-01 00:00:00.000
project2	2016-02-02 01:23:00.000
project3	2017-03-04 05:16:00.000

Zapytanie

select * from watermarktable

Wyjście

======================================
TableName	    WatermarkValue
======================================
customer_table	2017-09-05 08:06:00.000
project_table	2017-03-04 05:16:00.000

Należy zauważyć, że wartości limitu dla obu tabel zostały zaktualizowane.

Dodawanie większej ilości danych do tabel źródłowych

Uruchom następujące zapytanie względem źródłowej bazy danych programu SQL Server, aby zaktualizować istniejący wiersz w tabeli customer_table. Wstaw nowy wiersz do tabeli project_table.

UPDATE customer_table
SET [LastModifytime] = '2017-09-08T00:00:00Z', [name]='NewName' where [PersonID] = 3

INSERT INTO project_table
(Project, Creationtime)
VALUES
('NewProject','10/1/2017 0:00:00 AM');

Ponowne uruchamianie potoku

  1. W oknie przeglądarki sieci Web przejdź do karty Edycja po lewej stronie.

  2. Na pasku narzędzi potoku kliknij pozycję Dodaj wyzwalacz, a następnie kliknij pozycję Wyzwól teraz.

  3. W oknie Uruchamianie potoku wprowadź następującą wartość dla parametru tableList, a następnie kliknij pozycję Zakończ.

    [
        {
            "TABLE_NAME": "customer_table",
            "WaterMark_Column": "LastModifytime",
            "TableType": "DataTypeforCustomerTable",
            "StoredProcedureNameForMergeOperation": "usp_upsert_customer_table"
        },
        {
            "TABLE_NAME": "project_table",
            "WaterMark_Column": "Creationtime",
            "TableType": "DataTypeforProjectTable",
            "StoredProcedureNameForMergeOperation": "usp_upsert_project_table"
        }
    ]
    

Ponowne monitorowanie potoku

  1. Przejdź do karty Monitorowanie po lewej stronie. Zostanie wyświetlone uruchomienie potoku, które zostało wyzwolone za pomocą wyzwalacza ręcznego. Możesz użyć linków w kolumnie NAZWA POTOKu, aby wyświetlić szczegóły działania i ponownie uruchomić potok.

  2. Aby wyświetlić uruchomienia działań skojarzone z uruchomieniem potoku, wybierz link w kolumnie NAZWA POTOKU. Aby uzyskać szczegółowe informacje na temat przebiegów działań, wybierz link Szczegóły (ikona okularów) w kolumnie NAZWA DZIAŁANIA.

  3. Wybierz pozycję Wszystkie uruchomienia potoku u góry, aby wrócić do widoku Uruchomienia potoku. Aby odświeżyć widok, wybierz pozycję Odśwież.

Przegląd wyników końcowych

W programie SQL Server Management Studio uruchom następujące zapytania względem docelowej bazy danych SQL, aby sprawdzić, czy zaktualizowane/nowe dane zostały skopiowane z tabel źródłowych do tabel docelowych.

Zapytanie

select * from customer_table

Wyjście

===========================================
PersonID	Name	LastModifytime
===========================================
1	        John	2017-09-01 00:56:00.000
2	        Mike	2017-09-02 05:23:00.000
3	        NewName	2017-09-08 00:00:00.000
4	        Andy	2017-09-04 03:21:00.000
5	        Anny	2017-09-05 08:06:00.000

Zwróć uwagę na nowe wartości właściwości Name i LastModifytime dla identyfikatora PersonID numeru 3.

Zapytanie

select * from project_table

Wyjście

===================================
Project	    Creationtime
===================================
project1	2015-01-01 00:00:00.000
project2	2016-02-02 01:23:00.000
project3	2017-03-04 05:16:00.000
NewProject	2017-10-01 00:00:00.000

Zwróć uwagę, że do tabeli project_table dodano pozycję NewProject.

Zapytanie

select * from watermarktable

Wyjście

======================================
TableName	    WatermarkValue
======================================
customer_table	2017-09-08 00:00:00.000
project_table	2017-10-01 00:00:00.000

Należy zauważyć, że wartości limitu dla obu tabel zostały zaktualizowane.

W ramach tego samouczka wykonano następujące procedury:

  • Przygotowanie źródłowych i docelowych magazynów danych
  • Tworzenie fabryki danych.
  • Tworzenie własnego środowiska Integration Runtime.
  • Instalowanie środowiska Integration Runtime.
  • Tworzenie połączonych usług.
  • Tworzenie zestawów danych źródła, ujścia i limitu.
  • Tworzenie, uruchamianie i monitorowanie potoku
  • Przejrzyj wyniki.
  • Dodawanie lub aktualizowanie danych w tabelach źródłowych.
  • Ponowne uruchamianie i monitorowanie potoku.
  • Przegląd wyników końcowych.

Przejdź do następującego samouczka, aby dowiedzieć się więcej o przekształcaniu danych za pomocą klastra Spark na platformie Azure: