Поделиться через


Действие Потока данных в Фабрике данных Azure и Azure Synapse Analytics

ОБЛАСТЬ ПРИМЕНЕНИЯ: Фабрика данных Azure Azure Synapse Analytics

Совет

Попробуйте использовать фабрику данных в Microsoft Fabric, решение для аналитики с одним интерфейсом для предприятий. Microsoft Fabric охватывает все, от перемещения данных до обработки и анализа данных в режиме реального времени, бизнес-аналитики и отчетности. Узнайте, как бесплатно запустить новую пробную версию !

Используйте действие потока данных для преобразования и передачи данных посредством потоков данных для сопоставления. Если вы не знакомы с потоками данных, см. раздел Общие сведения о потоке данных для сопоставления.

Создание действия потока данных с помощью пользовательского интерфейса

Чтобы использовать действие потока данных в конвейере, выполните следующие шаги:

  1. Выполните поиск элемента Поток данных на панели конвейера «Действия» и перетащите действие Потока данных на холст конвейера.

  2. Выберите новое действие Поток данных на холсте, если оно еще не выбрано, и вкладка "Параметры" для изменения сведений.

    Изображение пользовательского интерфейса для действия потока данных.

  3. Ключ контрольной точки используется для установки контрольной точки, если поток данных используется для получения измененных данных. Его можно перезаписать. Действия потока данных используют значение GUID в качестве ключа контрольной точки вместо "имя конвейера + имя действия", чтобы всегда отслеживать состояние отслеживания измененных данных клиента, даже если есть какие-либо действия по переименованию. Все существующие действия потока данных используют старый ключ шаблона для обратной совместимости. Ниже приведен параметр ключа контрольной точки после публикации нового действия потока данных с включенной функцией получения измененных данных.

    Изображение пользовательского интерфейса для действия потока данных с ключом контрольной точки.

  4. Выберите существующий поток данных или создайте новый с помощью кнопки «Создать». Выберите другие параметры, необходимые для завершения настройки.

Синтаксис

{
    "name": "MyDataFlowActivity",
    "type": "ExecuteDataFlow",
    "typeProperties": {
      "dataflow": {
         "referenceName": "MyDataFlow",
         "type": "DataFlowReference"
      },
      "compute": {
         "coreCount": 8,
         "computeType": "General"
      },
      "traceLevel": "Fine",
      "runConcurrently": true,
      "continueOnError": true,      
      "staging": {
          "linkedService": {
              "referenceName": "MyStagingLinkedService",
              "type": "LinkedServiceReference"
          },
          "folderPath": "my-container/my-folder"
      },
      "integrationRuntime": {
          "referenceName": "MyDataFlowIntegrationRuntime",
          "type": "IntegrationRuntimeReference"
      }
}

Свойства типа

Свойство Description Допустимые значения Обязательное поле
dataflow Ссылка на выполняемый поток данных DataFlowReference Да
integrationRuntime Вычислительная среда, в которой выполняется поток данных. Если это не указано, используется среда выполнения интеграции Azure автоматически. IntegrationRuntimeReference No
compute.coreCount Количество ядер, используемых в кластере Spark. Можно указать только в том случае, если используется среда выполнения интеграции Azure autoresolve 8, 16, 32, 48, 80, 144, 272 No
compute.computeType Тип вычисления, используемого в кластере Spark. Можно указать только в том случае, если используется среда выполнения интеграции Azure autoresolve "Общие" No
staging.linkedService Если вы используете источник или приемник Azure Synapse Analytics, укажите учетную запись хранения, используемую для промежуточного процесса Polybase.

Если в хранилище Azure настроена конечная точка службы виртуальной сети, то необходимо использовать проверку подлинности по управляемому удостоверению. См. раздел Влияние использования конечных точек службы виртуальной сети со службой хранилища Azure. Ознакомьтесь с необходимыми конфигурациями для BLOB-объектов Azure и Azure Data Lake Storage 2-го поколения соответственно.
LinkedServiceReference Только если поток данных считывает или записывает данные в Azure Synapse Analytics
staging.folderPath Если вы используете источник или приемник Azure Synapse Analytics, для промежуточного процесса Polybase используется путь к папке в учетной записи хранения BLOB-объектов Строка Только если поток данных считывает или записывает данные в Azure Synapse Analytics
traceLevel Установка уровня ведения журнала для выполнения действия потока данных Прекрасно, грубая, нет No

Выполнение действия потока данных

Вычисление динамического размера потока данных в среде выполнения

Свойства "Число ядер" и "Тип вычисления" можно динамически установить для регулировки размера входящих данных источника в среде выполнения. Используйте такие действия конвейера, как "Поиск" или "Получение метаданных", чтобы определить размер данных набора данных источника. Затем используйте команду "Добавить динамическое содержимое" в свойствах "Действие потока данных". Можно выбрать объем вычислительных ресурсов Small (Малый), Medium (Средний) или Large (Большой). Можно также выбрать Custom (Настраиваемый) и настроить типы вычислений и количество ядер вручную.

Динамический поток данных

Вот краткое видео учебник, объясняющее этот метод

Среда выполнения интеграции потока данных

Выберите среду выполнения Integration Runtime (IR), которая будет использоваться для выполнения действия потока данных. По умолчанию служба будет использовать среду выполнения интеграции Azure с автоматическим разрешением с четырьмя рабочими ядрами. Эта среда IR имеет тип вычислений общего назначения и выполняется в том же регионе, что и экземпляр службы. Для рабочих конвейеров настоятельно рекомендуется создать собственные среды выполнения интеграции Azure, определяющие определенные регионы, тип вычислений, количество ядер и TTL для выполнения действия потока данных.

Минимальный тип вычислений общего назначения с конфигурацией 8 + 8 (всего 16 виртуальных ядер) и 10-минутным сроком жизни — это рекомендуемый минимум для большинства производственных рабочих нагрузок. Задав небольшой срок жизни, Azure IR может поддерживать теплый кластер, который не будет нести несколько минут времени начала для холодного кластера. Дополнительные сведения см. в статье Среда выполнения интеграции Azure.

Azure Integration Runtime

Внимание

Выбор среды Integration Runtime в действии потока данных применяется только к запущенным выполнениям конвейера. Отладка конвейера с потоками данных выполняется в кластере, указанном в сеансе отладки.

PolyBase

Если вы используете Azure Synapse Analytics в качестве приемника или источника, необходимо выбрать промежуточное расположение для пакетной загрузки Polybase. Polybase обеспечивает групповую пакетную загрузку вместо построчной загрузки данных. Polybase радикально сокращает время загрузки в Azure Synapse Analytics.

Ключ контрольной точки

При использовании параметра отслеживания изменений для источников потока данных ADF поддерживает контрольную точку и управляет ею автоматически. Ключ контрольной точки по умолчанию — это хэш имени потока данных и имени конвейера. Если вы используете динамический шаблон для исходных таблиц или папок, вы можете переопределить этот хэш и задать собственное значение ключа контрольной точки здесь.

Уровень ведения журнала

Если вам не требуется каждое выполнение конвейера действий потока данных для полного журнала всех подробных журналов телеметрии, можно при необходимости задать уровень ведения журнала на "Базовый" или "Нет". При выполнении потоков данных в режиме "Подробный" (по умолчанию) служба запрашивает полное действие журнала на каждом отдельном уровне секции во время преобразования данных. Это может быть ресурсоемкой операцией, поэтому включать режим подробного ведения журнала следует только при устранении неполадок. Такой подход может повысить общую производительность потоков данных и конвейеров. Режим "Базовый" регистрирует только длительность преобразования, а "Нет" — только сводку по длительности.

Уровень ведения журнала

Свойства приемника

Функция группировки в потоках данных позволяет задать порядок выполнения приемников, а также группировать приемники вместе с одинаковым номером группы. Чтобы упростить управление группами, можно настроить службу для параллельного запуска приемников в одной группе. Можно задать продолжение работы группы приемников даже после того, как один из приемников обнаружит ошибку.

По умолчанию приемники потоков данных выполняются последовательно, то есть по очереди, пока не происходит сбой потока данных при обнаружении ошибки в приемнике. Кроме того, все приемники по умолчанию входят в одну группу, если только вы не перешли к свойствам потока данных и не установили разные приоритеты для приемников.

Свойства приемника

Только первая строка

Этот параметр доступен только для потоков данных, в которых включены приемники кэша для вывода в действие. Выходные данные из потока данных, которые вводятся непосредственно в ваш конвейер, ограничены 2 МБ. Установка параметра "Только первая строка" позволяет ограничить выходные данные потока данных при внедрении выходных данных действия потока данных непосредственно в конвейер.

Параметризация потоков данных

Параметризованные наборы данных

Если в потоке данных используются параметризованные наборы данных, задайте значения параметров на вкладке Параметры.

Параметры выполнения потока данных

Параметризованные потоки данных

Если поток данных является параметризованным, задайте динамические значения параметров потока данных на вкладке Параметры. Для назначения динамических или литеральных значений параметров можно использовать язык выражений конвейера или язык выражений потока данных. Дополнительные сведения см. в статье Параметры потока данных.

Параметризованные свойства вычислений.

Параметризовать число ядер или тип вычислений можно, если вы используете среду выполнения интеграции Azure autoresolve и укажите значения для compute.coreCount и compute.computeType.

Пример параметров выполнения потока данных

Отладка конвейера действия потока данных

Для выполнения конвейера отладки, выполняемого с действием потока данных, необходимо переключиться в режим отладки потока данных с помощью ползунка Отладки потока данных в верхней панели. Режим отладки позволяет запускать поток данных в активном кластере Spark. Дополнительные сведения см. в статье Режим отладки.

Снимок экрана: расположение кнопки

Конвейер отладки выполняется для активного кластера отладки, а не для среды выполнения интеграции, указанной в параметрах действия потока данных. При запуске режима отладки можно выбрать среду вычислений для отладки.

Мониторинг действия потока данных

В действии потока данных предусмотрена специальная процедура мониторинга, позволяющая просматривать сведения о секционировании, времени этапа и происхождении данных. Откройте панель мониторинга с помощью значка очков в разделе Действия. Дополнительные сведения см. в статье Мониторинг потоков данных.

Использование действия потока данных приводит к последующему действию

Действие потока данных выводит метрики, относящиеся к количеству строк, записанных в каждый приемник, и строк, считываемых из каждого источника. Эти результаты возвращаются в раздел output "Результат выполнения действия". Возвращаемые метрики имеют формат ниже JSON.

{
    "runStatus": {
        "metrics": {
            "<your sink name1>": {
                "rowsWritten": <number of rows written>,
                "sinkProcessingTime": <sink processing time in ms>,
                "sources": {
                    "<your source name1>": {
                        "rowsRead": <number of rows read>
                    },
                    "<your source name2>": {
                        "rowsRead": <number of rows read>
                    },
                    ...
                }
            },
            "<your sink name2>": {
                ...
            },
            ...
        }
    }
}

Например, чтобы получить количество строк, записанных в приемник с именем "sink1" в действии с именем "dataflowActivity", используйте @activity('dataflowActivity').output.runStatus.metrics.sink1.rowsWritten.

Чтобы получить количество строк, прочитанных из источника с именем "source1", который использовался в этом приемнике, используйте @activity('dataflowActivity').output.runStatus.metrics.sink1.sources.source1.rowsRead.

Примечание.

Если приемник содержит нулевые строки, он не будет отображаться в метриках. Существование можно проверить с помощью функции contains. Например, проверяет, contains(activity('dataflowActivity').output.runStatus.metrics, 'sink1') были ли строки записаны в приемник1.

Ознакомьтесь с поддерживаемыми действиями потока управления: