Поделиться через


Пошаговая загрузка данных из хранилища данных в Lakehouse

В этом руководстве вы узнаете, как добавочно загружать данные из хранилища данных в Lakehouse.

Обзор

Ниже приведена схема высокоуровневого решения:

Схема, показывающая логику добавочной загрузки данных.

Ниже приведены важные действия для создания этого решения.

  1. Выберите столбец водяного знака. Выберите один столбец в исходной таблице данных, который можно использовать для среза новых или обновленных записей для каждого запуска. Как правило, данные в этом выбранном столбце (например, последнее_время_изменения или идентификатор) продолжают увеличиваться по мере создания или обновления строк. В качестве предела используется максимальное значение в этом столбце.

  2. Подготовьте таблицу для хранения последнего значения водяного знака в хранилище данных.

  3. Создайте конвейер, используя следующий рабочий процесс:

    Конвейер в этом решении содержит следующие действия:

    • Создайте два действия поиска. Используйте первую операцию поиска для получения последнего значения водяного знака. Используйте вторую проверку для получения нового значения водяного знака. Эти значения водяных знаков передаются в процесс копирования.
    • Создайте действие копирования, которое копирует строки из исходной таблицы данных, где значение столбца водяного знака больше старого значения водяного знака и меньше нового значения водяного знака. Затем он копирует данные из хранилища данных в Lakehouse в виде нового файла.
    • Создайте задание хранимой процедуры, которое обновляет последнее значение отметки для следующего запуска конвейера.

Предварительные условия

  • Хранилище данных. Хранилище данных используется в качестве исходного хранилища данных. Если у вас его нет, см. статью "Создание хранилища данных", чтобы выполнить действия по его созданию.
  • Лейкхаус. Вы используете Lakehouse в качестве целевого хранилища данных. Если его у вас нет, см. Создание Lakehouse для получения сведений о том, как его создать. Создайте папку с именем IncrementalCopy для хранения скопированных данных.

Подготовка источника

Ниже приведены некоторые таблицы и хранимые процедуры, которые необходимо подготовить в исходном хранилище данных перед настройкой конвейера добавочного копирования.

1. Создание таблицы источника данных в хранилище данных

Выполните следующую команду SQL в хранилище данных, чтобы создать таблицу с именем data_source_table в качестве таблицы источника данных. В этом руководстве вы используете его как образец данных для инкрементального копирования.

create table data_source_table
(
    PersonID int,
    Name varchar(255),
    LastModifytime DATETIME2(6)
);

INSERT INTO data_source_table
    (PersonID, Name, LastModifytime)
VALUES
    (1, 'aaaa','9/1/2017 12:56:00 AM'),
    (2, 'bbbb','9/2/2017 5:23:00 AM'),
    (3, 'cccc','9/3/2017 2:36:00 AM'),
    (4, 'dddd','9/4/2017 3:21:00 AM'),
    (5, 'eeee','9/5/2017 8:06:00 AM');

Данные в таблице источника данных показаны ниже.

PersonID | Name | LastModifytime
-------- | ---- | --------------
1        | aaaa | 2017-09-01 00:56:00.000
2        | bbbb | 2017-09-02 05:23:00.000
3        | cccc | 2017-09-03 02:36:00.000
4        | dddd | 2017-09-04 03:21:00.000
5        | eeee | 2017-09-05 08:06:00.000

В этом руководстве в качестве столбца подложки используется LastModifytime .

2. Создайте другую таблицу в хранилище данных для хранения последнего значения водяного знака

  1. Выполните следующую команду SQL в хранилище данных, чтобы создать таблицу с именем watermarktable для хранения последнего значения водяного знака.

    create table watermarktable
    (
    TableName varchar(255),
    WatermarkValue DATETIME2(6),
    );
    
  2. Установите значение по умолчанию для последнего водяного знака с использованием имени таблицы исходных данных. В этом руководстве имя таблицы — data_source_table, а значение по умолчанию — 1/1/2010 12:00:00 AM.

    INSERT INTO watermarktable
    VALUES ('data_source_table','1/1/2010 12:00:00 AM')    
    
  3. Просмотрите данные в таблице watermarktable.

    Select * from watermarktable
    

    Выходные данные:

    TableName  | WatermarkValue
    ----------  | --------------
    data_source_table | 2010-01-01 00:00:00.000
    

3. Создание хранимой процедуры в хранилище данных

Выполните следующую команду, чтобы создать хранимую процедуру в хранилище данных. Эта хранимая процедура используется для обновления последнего значения водяного знака после последнего запуска конвейера.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

Настройка конвейера для добавочной копии

Шаг 1. Создание конвейера

  1. Перейдите в Power BI.

  2. Щелкните значок Power BI в нижней левой части экрана, а затем выберите фабрику данных, чтобы открыть домашнюю страницу фабрики данных.

  3. Перейдите в рабочую область Microsoft Fabric.

  4. Выберите конвейер данных и введите имя конвейера для создания нового конвейера.

    Снимок экрана: кнопка нового конвейера данных в созданной рабочей области.

    Снимок экрана с названием шага создания нового конвейера.

Шаг 2. Добавление операции поиска для последнего водяного знака

На этом шаге вы создадите действие поиска, чтобы получить последнее значение метки времени. Значение по умолчанию 1/1/2010 12:00:00 AM, установленное ранее, получено.

  1. Выберите "Добавить действие конвейера" и выберите "Поиск" из раскрывающегося списка.

  2. На вкладке "Общие " переименуйте это действие в LookupOldWaterMarkActivity.

  3. На вкладке "Параметры" выполните следующую конфигурацию:

    • Тип хранилища данных: выбор рабочей области.
    • Тип хранилища данных рабочей области: выбор хранилища данных.
    • Хранилище данных: выберите хранилище данных.
    • Использование запроса: выбор таблицы.
    • Таблица. Выберите dbo.watermarktable.
    • Только первая строка: выбрано.

    Снимок экрана: просмотр старого водяного знака.

Шаг 3. Добавление операции поиска для нового водяного знака

На этом шаге вы создадите активность поиска, чтобы получить новое значение водяного знака. Вы используете запрос для получения нового водяного знака из вашей исходной таблицы данных. Получено максимальное значение в столбце LastModifytime из таблицы data_source_table.

  1. На верхней панели выберите Поиск на вкладке Действия, чтобы добавить вторую активность поиска.

  2. На вкладке "Общие " переименуйте это действие в LookupNewWaterMarkActivity.

  3. На вкладке "Параметры" выполните следующую конфигурацию:

    • Тип хранилища данных: выбор рабочей области.

    • Тип хранилища данных рабочей области: выбор хранилища данных.

    • Хранилище данных: выберите хранилище данных.

    • Используйте запрос: выберите запрос.

    • Запрос: введите следующий запрос, чтобы выбрать максимальное время последнего изменения в качестве нового водяного знака:

      select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
      
    • Только первая строка: выбрано.

    Снимок экрана, показывающий новый водяной знак.

Шаг 4. Добавление действия копирования для копирования добавочных данных

На этом шаге вы добавите процесс копирования, чтобы копировать инкрементные данные между последней меткой и новой меткой из хранилища данных в Lakehouse.

  1. Выберите "Действия " на верхней панели и нажмите кнопку "Копировать данные ">Добавить на холст ", чтобы получить действие копирования.

  2. На вкладке "Общие" переименуйте это действие в IncrementalCopyActivity.

  3. Подключите оба действия подстановки к действию копирования, перетащив зеленую кнопку (при успешном выполнении), прикрепленную к действиям подстановки, к действию копирования. Отпустите кнопку мыши, когда цвет границы действия копирования изменится на зеленый.

    Снимок экрана с объединением действий поиска и копирования.

  4. На вкладке "Источник" выполните следующую конфигурацию:

    • Тип хранилища данных: выбор рабочей области.

    • Тип хранилища данных рабочей области: выбор хранилища данных.

    • Хранилище данных: выберите хранилище данных.

    • Используйте запрос: выберите запрос.

    • Запрос: Введите следующий запрос, чтобы скопировать инкрементальные данные между последней контрольной точкой и новой контрольной точкой.

      select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
      

    Снимок экрана: конфигурация источника копирования.

  5. На вкладке "Назначение" выполните следующую конфигурацию:

    • Тип хранилища данных: выбор рабочей области.
    • Тип хранилища данных рабочей области: Выберите Lakehouse.
    • Lakehouse: Выберите ваш Lakehouse.
    • Корневая папка: выбор файлов.
    • Путь к данным: укажите папку, в которой вы хотите сохранить скопированные данные. Нажмите кнопку "Обзор", чтобы выбрать папку. Для имени файла откройте динамический контент и введите @CONCAT('Incremental-', pipeline().RunId, '.txt') в открывшемся окне, чтобы создать имена файлов для скопированного файла данных в Lakehouse.
    • Формат файла: выберите тип формата данных.

    Снимок экрана: конфигурация назначения копирования.

Шаг 5. Добавление действия хранимой процедуры

На этом шаге вы добавите действие хранимой процедуры, чтобы обновить последнее значение водяного знака для следующего запуска конвейера.

  1. Выберите действия на верхней панели и выберите хранимую процедуру, чтобы добавить действие хранимой процедуры .

  2. На вкладке "Общие " переименуйте это действие в StoredProceduretoWriteWatermarkActivity.

  3. Подключите зеленый (При успешном завершении) выходной сигнал действия копирования к действию хранимой процедуры.

  4. На вкладке "Параметры" выполните следующую конфигурацию:

    • Тип хранилища данных: выбор рабочей области.

    • Хранилище данных: выберите хранилище данных.

    • Имя хранимой процедуры: укажите хранимую процедуру, созданную в хранилище данных: [dbo].[ usp_write_watermark].

    • Разверните параметры хранимой процедуры. Чтобы указать значения параметров хранимой процедуры, выберите "Импорт" и введите следующие значения для параметров:

      Имя. Тип значение
      Время последнего изменения Дата/время @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}
      ИмяТаблицы Строка @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}

    Снимок экрана: конфигурация действия хранимой процедуры.

Шаг 6.Запуск конвейера и мониторинг результата

На верхней панели выберите "Запустить " на вкладке "Главная ". Затем нажмите кнопку "Сохранить и запустить". Конвейер запускается, и вы можете отслеживать конвейер на вкладке "Выходные данные ".

Снимок экрана: результаты выполнения конвейера.

Перейдите в Lakehouse, найдите файл данных в указанной папке, и вы можете выбрать файл для предварительного просмотра скопированных данных.

Снимок экрана: данные Lakehouse для первого запуска конвейера.

Снимок экрана: предварительная версия данных Lakehouse для первого запуска конвейера.

Добавьте больше данных, чтобы увидеть результаты пошагового копирования

После завершения первого запуска конвейера давайте попытаемся добавить дополнительные данные в исходную таблицу хранилища данных, чтобы узнать, может ли этот конвейер скопировать добавочные данные.

Шаг 1. Добавление дополнительных данных в источник

Вставьте новые данные в хранилище данных, выполнив следующий запрос:

INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')

INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')

Обновленные данные для data_source_table :

PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000

Шаг 2. Запуск другого конвейера и мониторинг результата

Вернитесь на страницу конвейера. На верхней панели снова нажмите кнопку "Запустить " на вкладке "Главная ". Конвейер запускается, и вы можете отслеживать конвейер в разделе "Выходные данные".

Перейдите в Lakehouse, найдите новый скопированный файл данных находится в указанной папке, и вы можете выбрать файл для предварительного просмотра скопированных данных. Вы видите, что ваши инкрементные данные отображаются в этом файле.

Снимок экрана: данные lakehouse для второго запуска конвейера.

Снимок экрана: предварительный просмотр данных Lakehouse для второго запуска конвейера.

Затем перейдите к дополнительным сведениям о копировании из Хранилище BLOB-объектов Azure в Lakehouse.