Поделиться через


Добавочная загрузка данных из нескольких таблиц в SQL Server в базу данных в службе "База данных SQL Azure" с использованием портала Azure

ОБЛАСТЬ ПРИМЕНЕНИЯ: Фабрика данных Azure Azure Synapse Analytics

Совет

Попробуйте использовать фабрику данных в Microsoft Fabric, решение для аналитики с одним интерфейсом для предприятий. Microsoft Fabric охватывает все, от перемещения данных до обработки и анализа данных в режиме реального времени, бизнес-аналитики и отчетности. Узнайте, как бесплатно запустить новую пробную версию !

В этом руководстве вы создадите Фабрика данных Azure с конвейером, который загружает разностные данные из нескольких таблиц в базе данных SQL Server в базу данных в База данных SQL Azure.

В этом руководстве вы выполните следующие шаги:

  • подготовите исходное и конечное хранилища данных;
  • Создали фабрику данных.
  • Создание локальной среды выполнения интеграции.
  • Установка среды выполнения интеграции.
  • Создали связанные службы.
  • Создали наборы данных источника, приемника и предела.
  • создадите и запустите конвейер, а также начнете его мониторинг;
  • Проверка результатов.
  • Добавление или обновление данных в исходных таблицах.
  • Повторный запуск конвейера и выполнение его мониторинга.
  • Просмотр окончательных результатов.

Обзор

Ниже приведены важные действия для создания этого решения.

  1. Выберите столбец для предела.

    Выберите один столбец из каждой таблицы в исходном хранилище данных, который можно использовать для идентификации новых или обновленных записей при каждом запуске. Как правило, данные в этом выбранном столбце (например, последнее_время_изменения или идентификатор) продолжают увеличиваться по мере создания или обновления строк. В качестве предела используется максимальное значение в этом столбце.

  2. Подготовьте хранилище данных для хранения значений предела.

    В этом руководстве вы сохраните значение предела в базе данных SQL.

  3. Создайте конвейер, следуя инструкциям ниже.

    a. Создайте действие ForEach, которое выполняет итерацию по списку имен исходной таблицы. Этот список передается в конвейер в качестве параметра. В каждой исходной таблице этот параметр вызывает следующие действия для загрузки разностных данных для этой таблицы.

    b. Создание двух действий поиска. Используйте первое действие поиска для получения последнего значения предела, а второе для получения нового значения предела. Эти значения передаются в действие копирования.

    c. Создайте действие копирования, копирующее строки из исходного хранилища данных со значениями столбцов предела, которые выше значений старого предела и меньше значений нового. Затем оно копирует разностные данные из исходного хранилища данных в хранилище BLOB-объектов Azure в качестве нового файла.

    d. Создайте действие хранимой процедуры, которое обновляет значение предела для конвейера при последующем выполнении.

    Ниже приведена общая схема решения.

    Пошаговая загрузка данных

Если у вас нет подписки Azure, создайте бесплатную учетную запись, прежде чем приступить к работе.

Необходимые компоненты

  • SQL Server. В этом учебнике используйте базу данных SQL Server в качестве исходного хранилища данных.
  • База данных SQL Azure. Базу данных в службе "База данных SQL Azure" следует использовать в качестве принимающего хранилища данных. Если у вас нет базы данных в службе "База данных SQL Azure", создайте ее с помощью этих инструкций.

Создание исходных таблиц в базе данных SQL Server

  1. Откройте SQL Server Management Studio и подключитесь к базе данных SQL Server.

  2. В обозревателе сервера щелкните правой кнопкой мыши базу данных и выберите Создать запрос.

  3. Выполните следующую команду SQL в базе данных, чтобы создать таблицы с именами customer_table и project_table.

    create table customer_table
    (
        PersonID int,
        Name varchar(255),
        LastModifytime datetime
    );
    
    create table project_table
    (
        Project varchar(255),
        Creationtime datetime
    );
    
    INSERT INTO customer_table
    (PersonID, Name, LastModifytime)
    VALUES
    (1, 'John','9/1/2017 12:56:00 AM'),
    (2, 'Mike','9/2/2017 5:23:00 AM'),
    (3, 'Alice','9/3/2017 2:36:00 AM'),
    (4, 'Andy','9/4/2017 3:21:00 AM'),
    (5, 'Anny','9/5/2017 8:06:00 AM');
    
    INSERT INTO project_table
    (Project, Creationtime)
    VALUES
    ('project1','1/1/2015 0:00:00 AM'),
    ('project2','2/2/2016 1:23:00 AM'),
    ('project3','3/4/2017 5:16:00 AM');
    
    

Создание целевых таблиц в вашей базе данных

  1. Откройте SQL Server Management Studio и подключитесь к своей базе данных в службе "База данных SQL Azure".

  2. В обозревателе сервера щелкните правой кнопкой мыши базу данных и выберите Создать запрос.

  3. Выполните следующую команду SQL в базе данных, чтобы создать таблицы с именами customer_table и project_table.

    create table customer_table
    (
        PersonID int,
        Name varchar(255),
        LastModifytime datetime
    );
    
    create table project_table
    (
        Project varchar(255),
        Creationtime datetime
    );
    
    

Создание дополнительной таблицы в своей базе данных для хранения значения верхнего предела

  1. Выполните указанную ниже команду SQL для базы данных, чтобы создать таблицу с именем watermarktable для хранения значения предела.

    create table watermarktable
    (
    
        TableName varchar(255),
        WatermarkValue datetime,
    );
    
  2. Вставьте исходные значения предела для обеих исходных таблиц в таблицу значений предела.

    
    INSERT INTO watermarktable
    VALUES 
    ('customer_table','1/1/2010 12:00:00 AM'),
    ('project_table','1/1/2010 12:00:00 AM');
    
    

Создание хранимой процедуры в вашей базе данных

Выполните указанную ниже команду, чтобы создать хранимую процедуру в своей базе данных. Эта хранимая процедура обновляет значение предела после каждого запуска конвейера.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime 
WHERE [TableName] = @TableName

END

Создание типов данных и дополнительных хранимых процедур в вашей базе данных

Выполните следующий запрос, чтобы создать две хранимые процедуры и два типа данных в своей базе данных. Они используются для объединения данных из исходных в целевые таблицы.

Чтобы упростить начало работы, мы непосредственно используем эти хранимые процедуры, передающие разностные данные в табличной переменной, а затем объединяем их в целевое хранилище. Учтите, что для хранения в табличной переменной не ожидается большое число строк разностных данных (более 100).

Если нужно объединить большое количество строк разностных данных в целевом хранилище, мы рекомендуем с помощью действия копирования сначала скопировать разностные данные во временную "промежуточную" таблицу в целевом хранилище, а затем создать собственную хранимую процедуру, не используя табличную переменную для их объединения из "промежуточной" таблицы в "итоговую".

CREATE TYPE DataTypeforCustomerTable AS TABLE(
    PersonID int,
    Name varchar(255),
    LastModifytime datetime
);

GO

CREATE PROCEDURE usp_upsert_customer_table @customer_table DataTypeforCustomerTable READONLY
AS

BEGIN
  MERGE customer_table AS target
  USING @customer_table AS source
  ON (target.PersonID = source.PersonID)
  WHEN MATCHED THEN
      UPDATE SET Name = source.Name,LastModifytime = source.LastModifytime
  WHEN NOT MATCHED THEN
      INSERT (PersonID, Name, LastModifytime)
      VALUES (source.PersonID, source.Name, source.LastModifytime);
END

GO

CREATE TYPE DataTypeforProjectTable AS TABLE(
    Project varchar(255),
    Creationtime datetime
);

GO

CREATE PROCEDURE usp_upsert_project_table @project_table DataTypeforProjectTable READONLY
AS

BEGIN
  MERGE project_table AS target
  USING @project_table AS source
  ON (target.Project = source.Project)
  WHEN MATCHED THEN
      UPDATE SET Creationtime = source.Creationtime
  WHEN NOT MATCHED THEN
      INSERT (Project, Creationtime)
      VALUES (source.Project, source.Creationtime);
END

Создание фабрики данных

  1. Запустите веб-браузер Microsoft Edge или Google Chrome. Сейчас только эти браузеры поддерживают пользовательский интерфейс фабрики данных.

  2. В меню слева выберите Создать ресурс>Интеграция>Фабрика данных:

    Выбор фабрики данных в

  3. На странице Новая фабрика данных введите ADFMultiIncCopyTutorialDF в поле Имя.

    Имя фабрики данных Azure должно быть глобально уникальным. Если вы увидите красный восклицательный знак с указанным ниже текстом ошибки, введите другое имя фабрики данных (например, ваше_имя_ADFIncCopyTutorialDF) и попробуйте создать фабрику данных снова. Ознакомьтесь со статьей Фабрика данных Azure — правила именования, чтобы узнать правила именования для артефактов службы "Фабрика данных".

    Data factory name "ADFIncCopyTutorialDF" is not available

  4. Выберите подписку Azure, в рамках которой вы хотите создать фабрику данных.

  5. Для группы ресурсов выполните одно из следующих действий.

  6. Укажите V2 при выборе версии.

  7. Укажите расположение фабрики данных. В раскрывающемся списке отображаются только поддерживаемые расположения. Хранилища данных (служба хранилища Azure, база данных SQL Azure и т. д.) и вычисления (HDInsight и т. д.), используемые фабрикой данных, могут располагаться в других регионах.

  8. Нажмите кнопку Создать.

  9. Когда завершится создание, откроется страница Фабрика данных, как показано на рисунке ниже.

    Домашняя страница Фабрики данных Azure с элементом Open Azure Data Factory Studio.

  10. Нажмите кнопку Открыть на элементе Open Azure Data Factory Studio (Открыть студию Фабрики данных Azure), чтобы запустить пользовательский интерфейс Фабрики данных Azure на отдельной вкладке.

Создание и настройка локальной среды выполнения интеграции

При перемещении данных из хранилища данных в частной сети (локальной) в хранилище данных Azure установите в своей локальной среде локальную среду выполнения интеграции (IR). В локальной среде выполнения интеграции данные перемещаются между частной сетью и Azure.

  1. На домашней странице в пользовательском интерфейсе Фабрики данных Azure выберите вкладку Управление в крайней области слева.

    Кнопка управления на домашней странице

  2. Выберите элемент Integration runtimes (Среды выполнения интеграции) на панели слева. Затем выберите команду +Создать.

    Создание среды выполнения интеграции

  3. В окне Integration Runtime Setup (Настройка среды выполнения интеграции) выберите вариант Perform data movement and dispatch activities to external computes (Выполнить перемещение данных и передать действия на внешние вычислительные ресурсы), затем нажмите кнопку Продолжить.

  4. Выберите параметр Независимый и щелкните Продолжить.

  5. Введите MySelfHostedIR для параметра Имя и щелкните Создать.

  6. Щелкните элемент Click here to launch the express setup for this computer (Щелкните здесь, чтобы запустить экспресс-установку для этого компьютера) в разделе Option 1: Express setup (Вариант 1. Экспресс-установка).

    Ссылка экспресс-установки

  7. В окне Экспресс-установка Integration Runtime (Self-hosted) щелкните Закрыть.

    Настройка среды выполнения интеграции — успешное завершение

  8. В окне веб-браузера Integration Runtime Setup (Настройка среды выполнения интеграции) нажмите кнопку Готово.

  9. Проверьте, чтобы в списке сред выполнения интеграции использовалась среда MySelfHostedIR.

Создание связанных служб

Связанная служба в фабрике данных связывает хранилища данных и службы вычислений с фабрикой данных. В рамках этого раздела вы создадите связанные службы для базы данных SQL Server и базы данных в службе "База данных SQL Azure".

Создание связанной службы SQL Server

На этом шаге вы свяжете базу данных SQL Server с фабрикой данных.

  1. В окне подключений перейдите из вкладки сред выполнения интеграции на вкладку Связанные службы, а затем щелкните + Создать.

    Новая связанная служба.

  2. В окне New Linked Service (Новая связанная служба) выберите SQL Server, а затем нажмите кнопку Продолжить.

  3. В окне New Linked Service (Новая связанная служба) выполните следующие действия:

    1. Введите SqlServerLinkedService в поле Имя.
    2. Выберите MySelfHostedIR для параметра Connect via integration runtime (Подключиться через среду выполнения интеграции). Это важный шаг. Среда выполнения интеграции по умолчанию не может подключиться к локальному хранилищу данных. Используйте ранее созданную локальную среду выполнения интеграции.
    3. В поле Имя сервера введите имя компьютера, на котором размещена база данных SQL Server.
    4. В поле Имя базы данных введите имя базы данных на сервере SQL Server, где содержатся исходные данные. Для работы с этим руководством вы ранее создали таблицу и вставили данные в эту базу данных.
    5. В поле Тип проверки подлинности выберите соответствующий тип, который будет использоваться для подключения к базе данных.
    6. В поле Имя пользователя введите имя пользователя, у которого есть доступ к базе данных SQL Server. Если в имени учетной записи пользователя или имени сервера необходимо использовать символ косой черты (\), добавьте escape-символ (\). Например, mydomain\\myuser.
    7. В поле Пароль введите пароль для этого пользователя.
    8. Чтобы проверить, может ли фабрика данных подключиться к базе данных SQL Server, нажмите кнопку Проверить соединение. Устраните все ошибки для успешного подключения.
    9. Чтобы сохранить связанную службу, щелкните Готово.

Создание связанной службы Базы данных SQL Azure

На последнем шаге вы создадите связанную службу, которая свяжет исходную базу данных SQL Server с фабрикой данных. На этом шаге вы свяжете базу данных-приемник или целевую базу данных с фабрикой данных.

  1. В окне подключений перейдите из вкладки сред выполнения интеграции на вкладку Связанные службы, а затем щелкните + Создать.

  2. В окне New Linked Service (Новая связанная служба) выберите Azure SQL Database (База данных SQL Microsoft Azure) и щелкните Continue (Продолжить).

  3. В окне New Linked Service (Новая связанная служба) выполните следующие действия:

    1. Введите AzureSqlDatabaseLinkedService в поле Имя.
    2. Для поля Имя сервера выберите в раскрывающемся списке имя сервера.
    3. В поле Имя базы данных выберите базу данных, в которой в рамках этого руководства были созданы таблицы customer_table и project_table.
    4. В поле Имя пользователя введите имя пользователя с доступом к базе данных.
    5. В поле Пароль введите пароль для этого пользователя.
    6. Чтобы проверить, может ли фабрика данных подключиться к базе данных SQL Server, нажмите кнопку Проверить соединение. Устраните все ошибки для успешного подключения.
    7. Чтобы сохранить связанную службу, щелкните Готово.
  4. Вы должны увидеть в списке две связанные службы.

    Две связанные службы

Создайте наборы данных.

На этом шаге вы создадите наборы данных для представления источника данных, назначение данных и место для хранения предела.

Создание исходного набора данных

  1. На левой панели нажмите кнопку + (плюс) и выберите элемент Набор данных.

  2. В окне Новый набор данных выберите SQL Server, а затем нажмите кнопку Продолжить.

  3. Для настройки набора данных в веб-браузере откроется новая вкладка. Кроме того, этот набор данных появится в представлении в виде дерева. На вкладке Общие окна свойств ниже введите SourceDataset в поле Имя.

  4. Щелкните вкладку Подключение в окне "Свойства", а затем выберите SqlServerLinkedService для связанной службы. Поле таблицы ниже оставьте пустым. Действие копирования в конвейере использует SQL-запрос для загрузки данных, вместо того чтобы загружать всю таблицу.

    Раздел исходного набора данных, вкладка

Создание набора данных приемника

  1. На левой панели нажмите кнопку + (плюс) и выберите элемент Набор данных.

  2. В окне Новый набор данных выберите База данных SQL Microsoft Azure и щелкните Продолжить.

  3. Для настройки набора данных в веб-браузере откроется новая вкладка. Кроме того, этот набор данных появится в представлении в виде дерева. На вкладке Общие окна свойств ниже введите SinkDataset (Набор данных приемника) в поле Имя.

  4. Щелкните вкладку Параметры в окне "Свойства" и выполните следующее:

    1. Нажмите кнопку + Создать в разделе создания и обновления параметров.

    2. Введите SinkTableName в поле Имя, а для параметра Тип выберите значение Строка. В этом наборе данных имя целевой таблицы используется как параметр. Параметр SinkTableName динамически задается конвейером в среде выполнения. Действие ForEach в конвейере выполняет итерацию по списку имен таблиц и передает имя таблицы для этого набора данных в каждой итерации.

      Свойства целевого набора данных

  5. Щелкните вкладку Подключение в окне "Свойства", а затем выберите AzureSqlDatabaseLinkedService в списке Связанная служба. Для свойства таблицы щелкните Добавить динамическое содержимое.

  6. В окне Добавить динамическое содержимое в разделе Параметры выберите SinkTableName.

  7. Нажав кнопку Готово, вы увидите "dataset().SinkTableName" в качестве имени таблицы.

    Вкладка

Создание набора данных для предела

На этом шаге вы создадите набор данных для хранения значения верхнего предела.

  1. На левой панели нажмите кнопку + (плюс) и выберите элемент Набор данных.

  2. В окне Новый набор данных выберите База данных SQL Microsoft Azure и щелкните Продолжить.

  3. На вкладке Общие окна свойств ниже введите WatermarkDataset в поле Имя.

  4. Перейдите на вкладку Подключения и выполните следующие действия:

    1. Выберите AzureSqlDatabaseLinkedService в списке Связанная служба.

    2. Выберите [dbo].[watermarktable] в списке Таблица.

      Вкладка

Создание конвейера

Этот конвейер принимает список имен таблиц в качестве параметра. Действие ForEach выполняет итерацию по списку имен таблиц, а затем выполняет следующие операции:

  1. Использует действие поиска, чтобы получить старое значение предела (начальное значение или значение, используемое в последней итерации).

  2. Использует действие поиска, чтобы получить новое значение предела (максимальное значение в столбце предела в исходной таблице).

  3. Использует действие копирования, чтобы скопировать данные между двумя значениями пределов из исходной в целевую базу данных.

  4. Использует действие хранимой процедуры, чтобы обновить старое значение предела для использования на первом шаге следующей итерации.

Создание конвейера

  1. На левой панели нажмите кнопку + (плюс) и выберите пункт Конвейер.

  2. На общей панели в разделе Свойства укажите значение IncrementalCopyPipeline для параметра Имя. Затем сверните панель, щелкнув значок Свойства в правом верхнем углу.

  3. На вкладке Параметры сделайте следующее:

    1. Щелкните + Создать.
    2. Введите tableList в качестве имени параметра.
    3. Для параметра типа выберите значение Массив.
  4. На панели Действия разверните элемент Итерация и условия, а затем перетащите действие ForEach в область конструктора конвейера. На вкладке Общие окна Свойства введите значение IterateSQLTables.

  5. Перейдите на вкладку Параметры и введите @pipeline().parameters.tableList в качестве значения для параметра Элементы. Действие ForEach выполняет итерацию по списку таблиц, а затем выполняет операцию добавочного копирования.

    Настройки действия ForEach

  6. Выберите в конвейере действие ForEach. Нажмите кнопку редактирования (значок карандаша).

  7. На панели Действия разверните элемент Общие, перетащите действие Поиск в область конструктора конвейера, а затем укажите LookupOldWaterMarkActivity в качестве имени.

  8. Перейдите на вкладку Настройки в окне Свойства и сделайте следующее:

    1. Выберите WatermarkDataset в поле Source Dataset (Исходный набор данных).

    2. Выберите Запрос в списке Use Query (Пользовательский запрос).

    3. Введите следующий запрос SQL в поле Запрос.

      select * from watermarktable where TableName  =  '@{item().TABLE_NAME}'
      

      Настройки первого действия поиска

  9. Перетащите действие Поиск с панели элементов Действия и введите LookupNewWaterMarkActivity в поле Имя.

  10. Переключитесь на вкладку Параметры .

    1. Выберите SourceDataset в поле Source Dataset (Исходный набор данных).

    2. Выберите Запрос в списке Use Query (Пользовательский запрос).

    3. Введите следующий запрос SQL в поле Запрос.

      select MAX(@{item().WaterMark_Column}) as NewWatermarkvalue from @{item().TABLE_NAME}
      

      Настройки второго действия поиска

  11. Перетащите действие Копирование с панели элементов Действия и введите IncrementalCopyActivity в поле Имя.

  12. Поочередно подключите действия поиска к действию копирования. Для этого нужно перетащить зеленые квадраты возле полей с действиями поиска к действию копирования. Когда цвет границы для действия копирования изменится на синий, отпустите кнопку мыши.

    Подключение действий поиска к действию копирования

  13. Выберите действие копирования в конвейере. Перейдите на вкладку Источник в окне Свойства.

    1. Выберите SourceDataset в поле Source Dataset (Исходный набор данных).

    2. Выберите Запрос в списке Use Query (Пользовательский запрос).

    3. Введите следующий запрос SQL в поле Запрос.

      select * from @{item().TABLE_NAME} where @{item().WaterMark_Column} > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and @{item().WaterMark_Column} <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'        
      

      Действие копирования — настройки источника

  14. Перейдите на вкладку Приемник и выберите SinkDataset в поле Sink Dataset (Целевой набор данных).

  15. Выполните следующие действия:

    1. В области Свойства набора данных для параметра SinkTableName введите @{item().TABLE_NAME}.

    2. В поле свойства Stored Procedure Name (Имя хранимой процедуры) введите @{item().StoredProcedureNameForMergeOperation}.

    3. В поле свойства Тип таблицы введите @{item().TableType}.

    4. Для параметра Table type parameter name (Имя параметра типа таблицы) введите @{item().TABLE_NAME}.

      Параметры действия копирования

  16. Перетащите действие Хранимая процедура с панели элементов Действия в область конструктора конвейера. Подключите действие копирования к действию Хранимая процедура.

  17. Выберите в конвейере действие Хранимая процедура и введите StoredProceduretoWriteWatermarkActivity в поле Имя на вкладке Общие окна Свойства.

  18. Перейдите на вкладку Учетная запись SQL и выберите AzureSqlDatabaseLinkedService в списке Связанная служба.

    Действие хранимой процедуры — учетная запись SQL

  19. Перейдите на вкладку Хранимая процедура и выполните здесь следующие действия:

    1. В качестве имени хранимой процедуры укажите [dbo].[usp_write_watermark].

    2. Выберите Параметр импорта.

    3. Укажите следующие значения параметров:

      Имя. Тип значение
      LastModifiedtime Дата/время @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}
      TableName Строка @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}

      Действие хранимой процедуры — параметры хранимой процедуры

  20. Чтобы опубликовать сущности, созданные в службе "Фабрика данных", выберите Опубликовать все.

  21. Дождитесь сообщения Опубликовано. Чтобы просмотреть уведомления, щелкните ссылку Показать уведомления. Чтобы закрыть окно уведомлений, щелкните значок X.

Запуск конвейера

  1. Щелкните Добавить триггер на панели инструментов конвейера, а затем — Trigger Now (Активировать сейчас).

  2. В окне запуска конвейера для параметра tableList введите значение ниже, а затем нажмите кнопку Готово.

    [
        {
            "TABLE_NAME": "customer_table",
            "WaterMark_Column": "LastModifytime",
            "TableType": "DataTypeforCustomerTable",
            "StoredProcedureNameForMergeOperation": "usp_upsert_customer_table"
        },
        {
            "TABLE_NAME": "project_table",
            "WaterMark_Column": "Creationtime",
            "TableType": "DataTypeforProjectTable",
            "StoredProcedureNameForMergeOperation": "usp_upsert_project_table"
        }
    ]
    

    Аргументы окна запуска конвейера

Мониторинг конвейера

  1. Перейдите на вкладку Мониторинг слева. Вы увидите выполнение конвейера, активированноевручную. Ссылки в столбце Имя конвейера позволят вам просмотреть подробные сведения о действиях и повторно выполнить конвейер.

  2. Чтобы просмотреть сведения о выполнениях действий, связанных с выполнением конвейера, щелкните ссылку в столбце Имя конвейера. Чтобы просмотреть сведения о выполнениях действий, щелкните ссылку Сведения (значок очков) в столбце Имя действия.

  3. Выберите Все запуски конвейеров в верхней части окна, чтобы вернуться к представлению "Выполнения конвейеров". Чтобы обновить список, нажмите кнопку Обновить.

Проверьте результаты.

В SQL Server Management Studio выполните следующие запросы в целевой базе данных SQL. Так вы проверите, что данные скопированы из исходных таблиц в целевые.

Запрос

select * from customer_table

Выходные данные

===========================================
PersonID	Name	LastModifytime
===========================================
1	        John	2017-09-01 00:56:00.000
2	        Mike	2017-09-02 05:23:00.000
3	        Alice	2017-09-03 02:36:00.000
4	        Andy	2017-09-04 03:21:00.000
5	        Anny	2017-09-05 08:06:00.000

Запрос

select * from project_table

Выходные данные

===================================
Project	    Creationtime
===================================
project1	2015-01-01 00:00:00.000
project2	2016-02-02 01:23:00.000
project3	2017-03-04 05:16:00.000

Запрос

select * from watermarktable

Выходные данные

======================================
TableName	    WatermarkValue
======================================
customer_table	2017-09-05 08:06:00.000
project_table	2017-03-04 05:16:00.000

Обратите внимание, что значения предела для обеих таблиц обновились.

Добавление данных в исходные таблицы

Выполните следующий запрос в исходной базе данных SQL Server, чтобы обновить имеющуюся строку в таблице customer_table. Вставьте новую строку в таблицу project_table.

UPDATE customer_table
SET [LastModifytime] = '2017-09-08T00:00:00Z', [name]='NewName' where [PersonID] = 3

INSERT INTO project_table
(Project, Creationtime)
VALUES
('NewProject','10/1/2017 0:00:00 AM');

Повторный запуск конвейера

  1. В окне веб-браузера перейдите на вкладку редактирования в области слева.

  2. Щелкните Добавить триггер на панели инструментов конвейера, а затем — Trigger Now (Активировать сейчас).

  3. В окне запуска конвейера для параметра tableList введите значение ниже, а затем нажмите кнопку Готово.

    [
        {
            "TABLE_NAME": "customer_table",
            "WaterMark_Column": "LastModifytime",
            "TableType": "DataTypeforCustomerTable",
            "StoredProcedureNameForMergeOperation": "usp_upsert_customer_table"
        },
        {
            "TABLE_NAME": "project_table",
            "WaterMark_Column": "Creationtime",
            "TableType": "DataTypeforProjectTable",
            "StoredProcedureNameForMergeOperation": "usp_upsert_project_table"
        }
    ]
    

Повторный мониторинг конвейера

  1. Перейдите на вкладку Мониторинг слева. Вы увидите выполнение конвейера, активированноевручную. Ссылки в столбце Имя конвейера позволят вам просмотреть подробные сведения о действиях и повторно выполнить конвейер.

  2. Чтобы просмотреть сведения о выполнениях действий, связанных с выполнением конвейера, щелкните ссылку в столбце Имя конвейера. Чтобы просмотреть сведения о выполнениях действий, щелкните ссылку Сведения (значок очков) в столбце Имя действия.

  3. Выберите Все запуски конвейеров в верхней части окна, чтобы вернуться к представлению "Выполнения конвейеров". Чтобы обновить список, нажмите кнопку Обновить.

Просмотр окончательных результатов

В SQL Server Management Studio выполните следующие запросы в целевой базе данных SQL. Так вы проверите, что обновленные и новые данные скопированы из исходных таблиц в целевые.

Запрос

select * from customer_table

Выходные данные

===========================================
PersonID	Name	LastModifytime
===========================================
1	        John	2017-09-01 00:56:00.000
2	        Mike	2017-09-02 05:23:00.000
3	        NewName	2017-09-08 00:00:00.000
4	        Andy	2017-09-04 03:21:00.000
5	        Anny	2017-09-05 08:06:00.000

Обратите внимание на новые значения в столбцах Name и LastModifytime для строки под номером 3 в столбце PersonID.

Запрос

select * from project_table

Выходные данные

===================================
Project	    Creationtime
===================================
project1	2015-01-01 00:00:00.000
project2	2016-02-02 01:23:00.000
project3	2017-03-04 05:16:00.000
NewProject	2017-10-01 00:00:00.000

Обратите внимание, что запись NewProject добавлена в таблицу project_table.

Запрос

select * from watermarktable

Выходные данные

======================================
TableName	    WatermarkValue
======================================
customer_table	2017-09-08 00:00:00.000
project_table	2017-10-01 00:00:00.000

Обратите внимание, что значения предела для обеих таблиц обновились.

В этом руководстве вы выполнили следующие шаги:

  • подготовите исходное и конечное хранилища данных;
  • Создали фабрику данных.
  • Создание локальной среды выполнения интеграции (IR).
  • Установка среды выполнения интеграции.
  • Создали связанные службы.
  • Создали наборы данных источника, приемника и предела.
  • создадите и запустите конвейер, а также начнете его мониторинг;
  • Проверка результатов.
  • Добавление или обновление данных в исходных таблицах.
  • Повторный запуск конвейера и выполнение его мониторинга.
  • Просмотр окончательных результатов.

Перейдите к следующему руководству, чтобы узнать о преобразовании данных с помощью кластера Spark в Azure: