Przyrostowe ładowanie danych z magazynu danych do Lakehouse
Z tego samouczka dowiesz się, jak przyrostowo ładować dane z usługi Data Warehouse do usługi Lakehouse.
Omówienie
Oto diagram wysokiego poziomu rozwiązań:
Poniżej przedstawiono ważne czynności związane z tworzeniem tego rozwiązania:
Wybierz kolumnę limitu. Wybierz jedną kolumnę w tabeli danych źródłowych, która może służyć do podziału nowych lub zaktualizowanych rekordów w każdym uruchomieniu. Zazwyczaj dane w tej wybranej kolumnie (na przykład last_modify_time lub ID) rosną wraz z tworzeniem i aktualizacją wierszy. Maksymalna wartość w tej kolumnie jest używana jako limit.
Przygotuj tabelę do przechowywania ostatniej wartości znacznika w hurtowni danych.
Utwórz potok z następującym przepływem pracy:
Przepływ danych w tym rozwiązaniu obejmuje następujące działania:
- Utwórz dwa działania wyszukiwania. Użyj pierwszej operacji wyszukiwania, aby pobrać ostatnią wartość znacznika. Użyj drugiego działania wyszukiwania do pobrania nowej wartości znacznika. Te wartości limitu są przekazywane do działania kopiowania.
- Utwórz działanie kopiowania, które kopiuje wiersze z tabeli danych źródłowych z wartością kolumny limitu większej niż stara wartość limitu i mniejsza niż nowa wartość limitu. Następnie kopiuje dane z magazynu danych do usługi Lakehouse jako nowy plik.
- Utwórz aktywność procedury składowanej, która aktualizuje ostatnią wartość punktu odniesienia dla następnego uruchomienia potoku.
Wymagania wstępne
- Magazyn danych. Używasz hurtowni danych jako źródłowego magazynu danych. Jeśli go nie masz, zobacz Tworzenie magazynu danych, aby zapoznać się z krokami do jego utworzenia.
- Lakehouse. Używasz Lakehouse jako docelowego magazynu danych. Jeśli go nie masz, zobacz Jak utworzyć Lakehouse, aby dowiedzieć się, jak to zrobić. Utwórz folder o nazwie IncrementalCopy do przechowywania skopiowanych danych.
Przygotowywanie źródła
Przygotuj poniższe tabele i procedury przechowywane w źródłowym magazynie danych przed skonfigurowaniem potoku kopiowania przyrostowego.
1. Tworzenie tabeli źródła danych w magazynie danych
Uruchom następujące polecenie SQL w usłudze Data Warehouse, aby utworzyć tabelę o nazwie data_source_table jako tabelę źródła danych. W tym samouczku używasz go jako danych przykładowych do wykonania kopii przyrostowej.
create table data_source_table
(
PersonID int,
Name varchar(255),
LastModifytime DATETIME2(6)
);
INSERT INTO data_source_table
(PersonID, Name, LastModifytime)
VALUES
(1, 'aaaa','9/1/2017 12:56:00 AM'),
(2, 'bbbb','9/2/2017 5:23:00 AM'),
(3, 'cccc','9/3/2017 2:36:00 AM'),
(4, 'dddd','9/4/2017 3:21:00 AM'),
(5, 'eeee','9/5/2017 8:06:00 AM');
Dane w tabeli źródła danych są pokazane poniżej:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
W tym samouczku użyjesz funkcji LastModifytime jako kolumny limitu.
2. Utwórz inną tabelę w magazynie danych, aby przechowywać ostatnią wartość watermark.
Uruchom następujące polecenie SQL w usłudze Data Warehouse, aby utworzyć tabelę o nazwie watermarktable do przechowywania ostatniej wartości znacznika wodnego:
create table watermarktable ( TableName varchar(255), WatermarkValue DATETIME2(6), );
Ustaw wartość domyślną ostatniego znacznika z użyciem nazwy tabeli źródłowej danych. W tym samouczku nazwa tabeli to data_source_table, a wartość domyślna to
1/1/2010 12:00:00 AM
.INSERT INTO watermarktable VALUES ('data_source_table','1/1/2010 12:00:00 AM')
Przejrzyj dane w tabeli watermarktable.
Select * from watermarktable
Wyjście:
TableName | WatermarkValue ---------- | -------------- data_source_table | 2010-01-01 00:00:00.000
3. Utwórz procedurę składowaną w magazynie danych
Uruchom następujące polecenie, aby utworzyć procedurę składowaną w magazynie danych. Ta procedura składowana służy do aktualizowania ostatniej wartości znacznika wodnego po ostatnim uruchomieniu potoku.
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
Skonfiguruj potok do kopiowania przyrostowego
Krok 1: Utwórz potok
Przejdź do usługi Power BI.
Wybierz ikonę usługi Power BI w lewym dolnym rogu ekranu, a następnie wybierz pozycję Fabryka danych , aby otworzyć stronę główną usługi Data Factory.
Przejdź do obszaru roboczego usługi Microsoft Fabric.
Wybierz Potok danych, a następnie wprowadź nazwę potoku danych, aby stworzyć nowy.
Krok 2. Dodaj działanie wyszukiwania dla ostatniego znaku wodnego
W tym kroku utworzysz działanie przeszukiwania, aby uzyskać ostatnią wartość znacznika. Wartość domyślna 1/1/2010 12:00:00 AM
, która została wcześniej ustawiona, została uzyskana.
Wybierz pozycję Dodaj aktywność potoku i wybierz Wyszukaj z listy rozwijanej.
Na karcie Ogólne zmień nazwę tego działania na LookupOldWaterMarkActivity.
Na karcie Ustawienia wykonaj następującą konfigurację:
- Typ magazynu danych: wybierz pozycję Obszar roboczy.
- Typ magazynu danych obszaru roboczego: wybierz pozycję Magazyn danych.
- Data Warehouse: wybierz magazyn danych.
- Użyj zapytania: wybierz pozycję Tabela.
- Tabela: wybierz pozycję dbo.watermarktable.
- Pierwszy wiersz tylko: wybrany.
Krok 3. Dodawanie działania wyszukiwania dla nowego znaku wodnego
W tym kroku utworzysz działanie wyszukiwania, aby uzyskać nową wartość znaku wodnego. Używasz zapytania, aby uzyskać nowy znak wodny z tabeli danych źródłowych. Maksymalna wartość w kolumnie LastModifytime w tabeli data_source_table została uzyskana.
Na górnym pasku wybierz pozycję Wyszukiwanie na karcie Aktywności, aby dodać drugie działanie wyszukiwania.
Na karcie Ogólne zmień nazwę tego działania na LookupNewWaterMarkActivity.
Na karcie Ustawienia wykonaj następującą konfigurację:
Typ magazynu danych: wybierz pozycję Obszar roboczy.
Typ magazynu danych obszaru roboczego: wybierz pozycję Magazyn danych.
Data Warehouse: wybierz magazyn danych.
Użyj zapytania: wybierz Zapytanie.
Zapytanie: Wprowadź następujące zapytanie, aby wybrać maksymalny czas ostatniej modyfikacji jako nowy punkt odniesienia:
select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
Pierwszy wiersz tylko: wybrany.
Krok 4. Dodawanie działania kopiowania w celu kopiowania danych przyrostowych
W tym kroku dodasz działanie kopiowania, aby skopiować dane przyrostowe między ostatnim znakiem wodnym a nowym znakiem wodnym z usługi Data Warehouse do usługi Lakehouse.
Wybierz pozycję Działania na górnym pasku i wybierz pozycję Kopiuj dane -> Dodaj do kanwy , aby uzyskać działanie kopiowania.
Na karcie Ogólne zmień nazwę tego działania na IncrementalCopyActivity.
Połącz oba działania wyszukiwania z działaniem kopiowania, przeciągając zielony przycisk (Po powodzeniu) połączony z działaniami wyszukiwania do działania kopiowania. Zwolnij przycisk myszy, gdy kolor obramowania działania kopiowania zmieni się na zielony.
Na karcie Źródło wykonaj następującą konfigurację:
Typ magazynu danych: wybierz pozycję Obszar roboczy.
Typ magazynu danych obszaru roboczego: wybierz pozycję Magazyn danych.
Data Warehouse: wybierz magazyn danych.
Użyj zapytania: Wybierz Zapytanie.
Zapytanie: wprowadź następujące zapytanie, aby skopiować dane przyrostowe między ostatnim znakiem wodnym a nowym znakiem wodnym.
select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
Na karcie Miejsce docelowe wykonaj następującą konfigurację:
- Typ magazynu danych: wybierz pozycję Obszar roboczy.
- Typ magazynu danych obszaru roboczego: wybierz Lakehouse.
- Lakehouse: Wybierz swoje Lakehouse.
- Folder główny: wybierz pozycję Pliki.
-
Ścieżka pliku: określ folder, który chcesz przechowywać skopiowane dane. Wybierz pozycję Przeglądaj , aby wybrać folder. W polu nazwa pliku otwórz pozycję Dodaj dynamiczną zawartość i wprowadź
@CONCAT('Incremental-', pipeline().RunId, '.txt')
w otwartym oknie, aby utworzyć nazwy plików dla skopiowanego pliku danych w usłudze Lakehouse. - Format pliku: wybierz typ formatu danych.
Krok 5. Dodaj działanie procedury składowanej
W tym kroku dodasz działanie procedury składowanej, aby zaktualizować ostatnią wartość limitu dla następnego uruchomienia potoku.
Wybierz pozycję Działania na górnym pasku i wybierz pozycję Procedura składowana, aby dodać działanie procedury składowanej.
Na karcie Ogólne zmień nazwę tego działania na StoredProceduretoWriteWatermarkActivity.
Połącz zielone (po powodzeniu) dane wyjściowe działania kopiowania z działaniem procedury składowanej.
Na karcie Ustawienia wykonaj następującą konfigurację:
Typ magazynu danych: wybierz pozycję Obszar roboczy.
Data Warehouse: wybierz magazyn danych.
Nazwa procedury składowanej: określ procedurę składowaną utworzoną w magazynie danych: [dbo].[ usp_write_watermark].
Rozwiń Parametry procedury składowanej. Aby określić wartości parametrów procedury składowanej, wybierz pozycję Importuj i wprowadź następujące wartości dla parametrów:
Nazwisko Typ Wartość CzasOstatniejModyfikacji Data i Czas @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue} NazwaTabeli String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}
Krok 6. Uruchom potok i monitoruj wynik
Na górnym pasku wybierz pozycję Uruchom na karcie Narzędzia główne . Następnie wybierz pozycję Zapisz i uruchom. Potok zostaje uruchomiony i można monitorować potok na karcie Wyjście.
Przejdź do swojego Lakehouse, gdzie znajdziesz plik danych znajdujący się w określonym folderze i możesz wybrać plik, aby wyświetlić podgląd skopiowanych danych.
Dodawanie większej liczby danych w celu wyświetlenia wyników kopiowania przyrostowego
Po zakończeniu pierwszego uruchomienia potoku spróbujmy dodać więcej danych do tabeli źródłowej magazynu danych, aby sprawdzić, czy potok jest w stanie skopiować dodatkowe dane.
Krok 1. Dodawanie większej ilości danych do źródła
Wstaw nowe dane do magazynu danych, uruchamiając następujące zapytanie:
INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
Zaktualizowane dane dla data_source_table są następujące:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000
Krok 2. Wyzwalanie innego uruchomienia potoku i monitorowanie wyniku
Wróć do strony potoku. Na górnym pasku wybierz ponownie Uruchom na karcie Strona główna. Potok zostaje uruchomiony, a jego przebieg można monitorować w sekcji Dane wyjściowe.
Przejdź do usługi Lakehouse, znajdziesz nowy skopiowany plik danych znajduje się w określonym folderze i możesz wybrać plik, aby wyświetlić podgląd skopiowanych danych. Zobaczysz, że dane przyrostowe są wyświetlane w tym pliku.
Powiązana zawartość
Następnie przejdź dalej, aby dowiedzieć się więcej na temat kopiowania z usługi Azure Blob Storage do usługi Lakehouse.