Поделиться через


Настройка конечных точек потока данных Центры событий Azure и Kafka

Внимание

На этой странице содержатся инструкции по управлению компонентами Операций Интернета вещей Azure с помощью манифестов развертывания Kubernetes, которые доступны в предварительной версии. Эта функция предоставляется с несколькими ограничениями и не должна использоваться для рабочих нагрузок.

Юридические условия, применимые к функциям Azure, которые находятся в состоянии бета-версии, предварительной версии или иным образом еще не выпущены в общедоступной версии, см. на странице Дополнительные условия использования предварительных версий в Microsoft Azure.

Чтобы настроить двунаправленное взаимодействие между операциями Интернета вещей Azure и брокерами Apache Kafka, можно настроить конечную точку потока данных. Эта конфигурация позволяет указать конечную точку, tls, проверку подлинности и другие параметры.

Необходимые компоненты

  • Экземпляр операций Интернета вещей Azure

Центры событий Azure

Центры событий Azure совместим с протоколом Kafka и может использоваться с потоками данных с некоторыми ограничениями.

Создание пространства имен Центры событий Azure и концентратора событий

Сначала создайте пространство имен с поддержкой Kafka Центры событий Azure

Затем создайте концентратор событий в пространстве имен. Каждый отдельный концентратор событий соответствует разделу Kafka. Вы можете создать несколько центров событий в одном пространстве имен, чтобы представить несколько разделов Kafka.

Назначение разрешения управляемому удостоверению

Чтобы настроить конечную точку потока данных для Центры событий Azure, рекомендуется использовать управляемое удостоверение, назначаемое пользователем или назначаемое системой. Этот подход является безопасным и устраняет необходимость управления учетными данными вручную.

После создания пространства имен и концентратора событий Центры событий Azure необходимо назначить роль управляемому удостоверению Операций Интернета вещей Azure, которое предоставляет разрешение на отправку или получение сообщений в концентратор событий.

При использовании управляемого удостоверения, назначаемого системой, в портал Azure перейдите к экземпляру Операций Интернета вещей Azure и выберите "Обзор". Скопируйте имя расширения, указанного после расширения Azure IoT Operations Arc. Например, azure-iot-operations-xxxx7. Управляемое удостоверение, назначаемое системой, можно найти с помощью того же имени расширения Azure IoT Operations Arc.

Затем перейдите к управлению доступом к пространству >имен Центров событий (IAM)>Добавление назначения ролей.

  1. На вкладке "Роль" выберите соответствующую роль, например Azure Event Hubs Data Sender или Azure Event Hubs Data Receiver. Это дает управляемому удостоверению необходимые разрешения для отправки или получения сообщений для всех центров событий в пространстве имен. Дополнительные сведения см. в статье Аутентификация приложения с помощью идентификатора Microsoft Entra для доступа к ресурсам Центров событий.
  2. На вкладке "Члены" :
    1. При использовании управляемого удостоверения, назначаемого системой, для назначения доступа выберите "Пользователь", "Группа" или "Субъект-служба ", а затем выберите +Выберите участников и найдите имя расширения Azure IoT Operations Arc.
    2. При использовании управляемого удостоверения, назначаемого пользователем, для назначения доступа выберите параметр "Управляемое удостоверение ", а затем выберите "Выбрать участников " и выполните поиск управляемого удостоверения, назначаемого пользователем, для облачных подключений.

Создание конечной точки потока данных для Центры событий Azure

После настройки пространства имен Центры событий Azure и концентратора событий можно создать конечную точку потока данных для пространства имен с поддержкой Kaf Центры событий Azure ka.

  1. В интерфейсе операций выберите вкладку конечных точек потока данных.

  2. В разделе "Создание новой конечной точки потока данных" выберите Центры событий Azure> New.

    Снимок экрана: использование операций для создания конечной точки потока данных Центры событий Azure.

  3. Введите следующие параметры для конечной точки:

    Параметр Description
    Имя. Имя конечной точки потока данных.
    Хост Имя узла брокера Kafka в формате <NAMESPACE>.servicebus.windows.net:9093. Включите номер 9093 порта в параметр узла для Центров событий.
    Authentication method Метод, используемый для проверки подлинности. Рекомендуется выбрать управляемое удостоверение, назначаемое системой, или назначаемое пользователем управляемое удостоверение.
  4. Нажмите кнопку "Применить" , чтобы подготовить конечную точку.

Примечание.

Раздел Kafka или отдельный концентратор событий настраивается позже при создании потока данных. Раздел Kafka — это назначение для сообщений потока данных.

Использование строка подключения для проверки подлинности в Центрах событий

Внимание

Чтобы использовать портал операций для управления секретами, операции Интернета вещей Azure сначала должны быть включены с безопасными параметрами, настроив Azure Key Vault и включив удостоверения рабочей нагрузки. Дополнительные сведения см. в статье "Включение безопасных параметров в развертывании Операций Интернета вещей Azure".

На странице параметров конечной точки потока данных для операций перейдите на вкладку "Базовый", а затем выберите SASL метода>проверки подлинности.

Введите следующие параметры для конечной точки:

Параметр Description
Тип SASL Выберите Plain.
Синхронизированное имя секрета Введите имя секрета Kubernetes, содержащего строка подключения.
Справочник по имени пользователя или секрет маркера Ссылка на секрет имени пользователя или маркера, используемого для проверки подлинности SASL. Выберите его из списка Key Vault или создайте новый. Значение должно быть равно $ConnectionString.
Ссылка на пароль секрета токена Ссылка на пароль или секрет маркера, используемый для проверки подлинности SASL. Выберите его из списка Key Vault или создайте новый. Значение должно быть в формате Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY-NAME>;SharedAccessKey=<KEY>.

После нажатия кнопки "Добавить ссылку" при нажатии кнопки "Создать" введите следующие параметры:

Параметр Description
Имя секрета Имя секрета в Azure Key Vault. Выберите имя, которое легко помнить, чтобы выбрать секрет позже из списка.
Значение секрета Введите $ConnectionStringимя пользователя. Для пароля введите строка подключения в форматеEndpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY-NAME>;SharedAccessKey=<KEY>.
Задать дату активации Если этот параметр включен, дата, когда секрет становится активным.
Задать дату окончания срока действия Если этот параметр включен, дата истечения срока действия секрета.

Дополнительные сведения о секретах см. в статье "Создание секретов и управление ими в операциях Интернета вещей Azure".

Ограничения

Центры событий Azure не поддерживает все типы сжатия, поддерживаемые Kafka. В настоящее время в Центры событий Azure уровня "Премиум" и выделенных уровней поддерживается только сжатие GZIP. Использование других типов сжатия может привести к ошибкам.

Пользовательские брокеры Kafka

Чтобы настроить конечную точку потока данных для брокеров Kafka, отличных от Event-Hub, задайте узел, TLS, проверку подлинности и другие параметры по мере необходимости.

  1. В интерфейсе операций выберите вкладку конечных точек потока данных.

  2. В разделе "Создание новой конечной точки потока данных" выберите "Создать настраиваемый брокер>Kafka".

    Снимок экрана: использование операций для создания конечной точки потока данных Kafka.

  3. Введите следующие параметры для конечной точки:

    Параметр Description
    Имя. Имя конечной точки потока данных.
    Хост Имя узла брокера Kafka в формате <Kafka-broker-host>:xxxx. Включите номер порта в параметр узла.
    Authentication method Метод, используемый для проверки подлинности. Выберите SASL.
    Тип SASL Тип проверки подлинности SASL. Выберите "Обычный", "ScramSha256" или "ScramSha512". Обязательно, если используется SASL.
    Синхронизированное имя секрета Имя секрета. Обязательно, если используется SASL.
    Справочник по имени пользователя секрета маркера Ссылка на имя пользователя в секрете маркера SASL. Обязательно, если используется SASL.
  4. Нажмите кнопку "Применить" , чтобы подготовить конечную точку.

Примечание.

В настоящее время операции не поддерживают использование конечной точки потока данных Kafka в качестве источника. Поток данных можно создать с исходной конечной точкой потока данных Kafka с помощью Kubernetes или Bicep.

Чтобы настроить параметры конечной точки, используйте следующие разделы для получения дополнительных сведений.

Доступные методы проверки подлинности

Для конечных точек потока данных брокера Kafka доступны следующие методы проверки подлинности.

Управляемое удостоверение, назначаемое системой

Перед настройкой конечной точки потока данных назначьте роль управляемому удостоверению Операций Интернета вещей Azure, который предоставляет разрешение на подключение к брокеру Kafka:

  1. В портал Azure перейдите к экземпляру Операций Интернета вещей Azure и выберите "Обзор".
  2. Скопируйте имя расширения, указанного после расширения Azure IoT Operations Arc. Например, azure-iot-operations-xxxx7.
  3. Перейдите к облачному ресурсу, которому необходимо предоставить разрешения. Например, перейдите к элементу управления доступом к пространству >имен Центров событий (IAM)>Добавить назначение ролей.
  4. На вкладке "Роль" выберите соответствующую роль.
  5. На вкладке "Участники" , чтобы назначить доступ, выберите "Пользователь", "Группа" или "Субъект-служба ", а затем нажмите кнопку "Выбрать участников " и выполните поиск управляемого удостоверения Операций Интернета вещей Azure. Например, azure-iot-operations-xxxx7.

Затем настройте конечную точку потока данных с параметрами управляемого удостоверения, назначаемого системой.

На странице параметров конечной точки потока данных для операций выберите вкладку "Базовый", а затем выберите управляемое удостоверение, назначенное системой проверки подлинности>.

Эта конфигурация создает управляемое удостоверение с аудиторией по умолчанию, которая совпадает со значением узла пространства имен Центров событий в форме https://<NAMESPACE>.servicebus.windows.net. Однако если вам нужно переопределить аудиторию по умолчанию, можно задать audience для поля требуемое значение.

Не поддерживается в интерфейсе операций.

Управляемое удостоверение, назначаемое пользователем

Чтобы использовать назначаемое пользователем управляемое удостоверение для проверки подлинности, необходимо сначала развернуть операции Интернета вещей Azure с включенными безопасными параметрами. Затем необходимо настроить управляемое удостоверение, назначаемое пользователем, для облачных подключений. Дополнительные сведения см. в статье "Включение безопасных параметров в развертывании Операций Интернета вещей Azure".

Перед настройкой конечной точки потока данных назначьте роль управляемому удостоверению, назначаемого пользователем, который предоставляет разрешение на подключение к брокеру Kafka:

  1. В портал Azure перейдите к облачному ресурсу, которому необходимо предоставить разрешения. Например, перейдите к элементу управления доступом к пространству >имен сетки событий (IAM)>Add role assignment.
  2. На вкладке "Роль" выберите соответствующую роль.
  3. На вкладке "Участники" для назначения доступа выберите параметр "Управляемое удостоверение", а затем нажмите кнопку "Выбрать участников" и найдите управляемое удостоверение, назначаемое пользователем.

Затем настройте конечную точку потока данных с параметрами управляемого удостоверения, назначаемого пользователем.

На странице параметров конечной точки потока данных для операций выберите вкладку "Базовый" и выберите метод проверки подлинности>, назначенный пользователем.

Здесь область — это аудитория управляемого удостоверения. Значение по умолчанию совпадает со значением узла пространства имен Центров событий в форме https://<NAMESPACE>.servicebus.windows.net. Однако если необходимо переопределить аудиторию по умолчанию, можно задать для поля области требуемое значение с помощью Bicep или Kubernetes.

SASL

Чтобы использовать SASL для проверки подлинности, укажите метод проверки подлинности SASL и настройте тип SASL и ссылку на секрет с именем секрета, содержащего маркер SASL.

На странице параметров конечной точки потока данных для операций перейдите на вкладку "Базовый", а затем выберите SASL метода>проверки подлинности.

Введите следующие параметры для конечной точки:

Параметр Description
Тип SASL Тип используемой проверки подлинности SASL. Поддерживаются типы Plain, ScramSha256 и ScramSha512.
Синхронизированное имя секрета Имя секрета Kubernetes, содержащего маркер SASL.
Справочник по имени пользователя или секрет маркера Ссылка на секрет имени пользователя или маркера, используемого для проверки подлинности SASL.
Ссылка на пароль секрета токена Ссылка на пароль или секрет маркера, используемый для проверки подлинности SASL.

Поддерживаемые типы SASL:

  • Plain
  • ScramSha256
  • ScramSha512

Секрет должен находиться в том же пространстве имен, что и конечная точка потока данных Kafka. Секрет должен иметь маркер SASL в качестве пары "ключ-значение".

Анонимные

Чтобы использовать анонимную проверку подлинности, обновите раздел проверки подлинности параметров Kafka, чтобы использовать анонимный метод.

На странице параметров конечной точки потока данных для операций выберите вкладку "Базовый" и выберите метод>проверки подлинности None.

Расширенные настройки

Дополнительные параметры можно задать для конечной точки потока данных Kafka, таких как TLS, доверенный сертификат ЦС, параметры обмена сообщениями Kafka, пакетная обработка и CloudEvents. Эти параметры можно задать на вкладке "Расширенный портал потока данных" или в ресурсе конечной точки потока данных.

В интерфейсе операций выберите вкладку "Дополнительно " для конечной точки потока данных.

Снимок экрана: использование операций для задания дополнительных параметров конечной точки потока данных Kafka.

Параметры протокола TLS

Режим TLS

Чтобы включить или отключить TLS для конечной точки Kafka, обновите mode параметр в параметрах TLS.

На странице параметров конечной точки потока данных для операций выберите вкладку "Дополнительно ", а затем установите флажок рядом с режимом TLS.

Для режима TLS можно задать Enabled или Disabled. Если задан Enabledрежим, поток данных использует безопасное подключение к брокеру Kafka. Если задан Disabledрежим, поток данных использует небезопасное подключение к брокеру Kafka.

Сертификат доверенного ЦС

Настройте доверенный сертификат ЦС для конечной точки Kafka, чтобы установить безопасное подключение к брокеру Kafka. Этот параметр важен, если брокер Kafka использует самозаверяющий сертификат или сертификат, подписанный пользовательским ЦС, который по умолчанию не является доверенным.

На странице параметров конечной точки потока данных для операций выберите вкладку "Дополнительно ", а затем используйте поле карты конфигурации сертификата доверенного ЦС, чтобы указать ConfigMap, содержащий сертификат доверенного ЦС.

Этот ConfigMap должен содержать сертификат ЦС в формате PEM. ConfigMap должен находиться в том же пространстве имен, что и ресурс потока данных Kafka. Например:

kubectl create configmap client-ca-configmap --from-file root_ca.crt -n azure-iot-operations

Совет

При подключении к Центры событий Azure сертификат ЦС не требуется, так как служба Центров событий использует сертификат, подписанный общедоступным ЦС, доверенным по умолчанию.

Идентификатор группы потребителей

Идентификатор группы потребителей используется для идентификации группы потребителей, используемой потоком данных для чтения сообщений из раздела Kafka. Идентификатор группы потребителей должен быть уникальным в брокере Kafka.

Внимание

Если конечная точка Kafka используется в качестве источника, требуется идентификатор группы потребителей. В противном случае поток данных не может считывать сообщения из раздела Kafka, и вы получите сообщение об ошибке "Конечные точки источника типа Kafka должны иметь идентификатор consumerGroupId".

На странице параметров конечной точки потока данных для операций выберите вкладку "Дополнительно ", а затем используйте поле "Идентификатор группы потребителей", чтобы указать идентификатор группы потребителей.

Этот параметр действует только в том случае, если конечная точка используется в качестве источника (т. е. поток данных является потребителем).

Сжатие

Поле сжатия позволяет сжимать сообщения, отправленные в разделы Kafka. Сжатие помогает сократить пропускную способность сети и пространство хранилища, необходимые для передачи данных. Однако сжатие также добавляет некоторые издержки и задержку в процесс. Поддерживаемые типы сжатия перечислены в следующей таблице.

значение Описание
None Сжатие или пакетная обработка не применяется. Значение по умолчанию не является значением, если сжатие не указано.
Gzip Применяются сжатие и пакетная обработка GZIP. GZIP — это алгоритм сжатия общего назначения, который обеспечивает хороший баланс между коэффициентом сжатия и скоростью. В настоящее время в Центры событий Azure уровня "Премиум" и выделенных уровней поддерживается только сжатие GZIP.
Snappy Применяются сжатие snappy и пакетная обработка. Snappy — это алгоритм быстрого сжатия, который предлагает умеренное соотношение сжатия и скорость. Этот режим сжатия не поддерживается Центры событий Azure.
Lz4 Применяются сжатие LZ4 и пакетная обработка. LZ4 — это алгоритм быстрого сжатия, который обеспечивает низкое соотношение сжатия и высокую скорость. Этот режим сжатия не поддерживается Центры событий Azure.

Чтобы настроить сжатие, выполните приведенные действия.

На странице параметров конечной точки потока данных для операций выберите вкладку "Дополнительно ", а затем используйте поле сжатия, чтобы указать тип сжатия .

Этот параметр действует только в том случае, если конечная точка используется в качестве назначения, где поток данных является производителем.

Пакетная обработка

Помимо сжатия, можно также настроить пакетную обработку сообщений перед отправкой в разделы Kafka. Пакетная обработка позволяет группировать несколько сообщений вместе и сжимать их в виде одной единицы, что может повысить эффективность сжатия и снизить нагрузку на сеть.

Поле Description Обязательное поле
mode Может иметь значение Enabled или Disabled. Значением по умолчанию является Enabled то, что Kafka не имеет понятия о незабавленном обмене сообщениями. Если задано значение Disabled, пакетная обработка свернута для создания пакета с одним сообщением каждый раз. No
latencyMs Максимальный интервал времени в миллисекундах, который можно буферизировать перед отправкой сообщений. Если этот интервал достигнут, все буферные сообщения отправляются в виде пакета независимо от того, сколько или сколько их размеров. Если значение не задано, значение по умолчанию равно 5. No
maxMessages Максимальное количество сообщений, которые можно буферизать перед отправкой. Если это число достигнуто, все буферные сообщения отправляются в виде пакета независимо от того, насколько большой или какой срок их буферизации. Если значение не задано, значение по умолчанию равно 100000. No
maxBytes Максимальный размер в байтах, который можно буферизать перед отправкой. Если этот размер достигнут, все буферные сообщения отправляются в виде пакета независимо от того, сколько или сколько времени они буферизированы. Значение по умолчанию — 1000000 (1 МБ). No

Например, если задано значение задержки 1000, maxMessagesages значение 100 и maxBytes равным 1024, сообщения отправляются либо при наличии 100 сообщений в буфере, либо при наличии 1024 байтов в буфере или при истечении 1000 миллисекунд с момента последней отправки.

Чтобы настроить пакетную обработку, выполните приведенные действия.

На странице параметров конечной точки потока данных для операций выберите вкладку "Дополнительно ", а затем используйте поле "Пакетная обработка ", чтобы включить пакетную обработку. Используйте поля "Задержка пакетной обработки", "Максимальное число байтов" и "Число сообщений", чтобы указать параметры пакетной обработки.

Этот параметр действует только в том случае, если конечная точка используется в качестве назначения, где поток данных является производителем.

Стратегия обработки секций

Стратегия обработки секций определяет, как сообщения назначаются секциям Kafka при отправке их в разделы Kafka. Секции Kafka — это логические сегменты раздела Kafka, обеспечивающие параллельную обработку и отказоустойчивость. Каждое сообщение в разделе Kafka содержит секцию и смещение, которые используются для идентификации и упорядочивания сообщений.

Этот параметр действует только в том случае, если конечная точка используется в качестве назначения, где поток данных является производителем.

По умолчанию поток данных назначает сообщения случайным секциям с помощью алгоритма циклического перебора. Однако можно использовать различные стратегии для назначения сообщений секциям на основе некоторых критериев, таких как имя раздела MQTT или свойство сообщения MQTT. Это поможет повысить балансировку нагрузки, локализацию данных или упорядочивание сообщений.

значение Описание
Default Назначает сообщения случайным секциям с помощью алгоритма циклического перебора. Это значение по умолчанию, если стратегия не указана.
Static Назначает сообщения фиксированному номеру секции, который является производным от идентификатора экземпляра потока данных. Это означает, что каждый экземпляр потока данных отправляет сообщения в другую секцию. Это может помочь добиться лучшей балансировки нагрузки и локальности данных.
Topic Использует имя раздела MQTT из источника потока данных в качестве ключа для секционирования. Это означает, что сообщения с тем же именем раздела MQTT отправляются в ту же секцию. Это может помочь добиться лучшего порядка сообщений и локальности данных.
Property Использует свойство сообщения MQTT из источника потока данных в качестве ключа для секционирования. Укажите имя свойства в partitionKeyProperty поле. Это означает, что сообщения с тем же значением свойства отправляются в одну секцию. Это поможет улучшить порядок сообщений и локализацию данных на основе пользовательского критерия.

Например, если вы задали стратегию Property обработки секций и свойству device-idключа секции, сообщения с тем же свойством отправляются в ту же device-id секцию.

Чтобы настроить стратегию обработки секций, выполните следующие действия.

На странице параметров конечной точки потока данных для операций выберите вкладку "Дополнительно ", а затем используйте поле стратегии обработки секций, чтобы указать стратегию обработки секций. Используйте поле свойства ключа секции, чтобы указать свойство, используемое для секционирования, если стратегия задана.Property

Подтверждения Kafka

Подтверждения Kafka (acks) используются для управления устойчивостью и согласованностью сообщений, отправленных в разделы Kafka. Когда производитель отправляет сообщение в раздел Kafka, он может запрашивать различные уровни подтверждения от брокера Kafka, чтобы убедиться, что сообщение успешно записано в раздел и реплицируется в кластере Kafka.

Этот параметр действует только в том случае, если конечная точка используется в качестве назначения (т. е. поток данных является производителем).

значение Описание
None Поток данных не ожидает каких-либо подтверждений от брокера Kafka. Этот параметр является самым быстрым, но наименее устойчивым вариантом.
All Поток данных ожидает записи сообщения в раздел лидера и все секции последователей. Этот параметр является самым медленным, но наиболее устойчивым вариантом. Этот параметр также является параметром по умолчанию
One Поток данных ожидает записи сообщения в раздел лидера и по крайней мере одну секцию подписчика.
Zero Поток данных ожидает записи сообщения в раздел лидера, но не ожидает каких-либо подтверждений от подписчиков. Это быстрее, чем One менее устойчивый.

Например, если задать подтверждение AllKafka, поток данных ожидает записи сообщения в раздел лидера и все секции подписчиков перед отправкой следующего сообщения.

Чтобы настроить подтверждения Kafka, выполните следующие действия.

На странице параметров конечной точки потока данных для операций выберите вкладку "Дополнительно ", а затем используйте поле подтверждения Kafka, чтобы указать уровень подтверждения Kafka.

Этот параметр действует только в том случае, если конечная точка используется в качестве назначения, где поток данных является производителем.

Копирование свойств MQTT

По умолчанию параметр свойств MQTT копирования включен. Эти свойства пользователя включают такие значения, как subject хранение имени ресурса, отправляющего сообщение.

На странице параметров конечной точки потока данных для операций выберите вкладку "Дополнительно ", а затем установите флажок рядом с полем "Копировать свойства MQTT", чтобы включить или отключить копирование свойств MQTT.

В следующих разделах описывается преобразование свойств MQTT в заголовки пользователей Kafka и наоборот, когда параметр включен.

Конечная точка Kafka — это назначение

Когда конечная точка Kafka является назначением потока данных, все определенные свойства спецификации MQTT версии 5 преобразуются в заголовки пользователей Kafka. Например, сообщение MQTT версии 5 с перенаправлением типа контента в Kafka преобразуется в заголовок "Content Type":{specifiedValue}пользователя Kafka. Аналогичные правила применяются к другим встроенным свойствам MQTT, определенным в следующей таблице.

Свойство MQTT Переведенное поведение
Индикатор формата полезных данных Ключ: "Индикатор формата полезных данных"
Значение: "0" (полезные данные равно байтам) или "1" (полезные данные — UTF-8)
Раздел ответа Ключ: "Раздел ответа"
Значение: копия раздела ответа из исходного сообщения.
Интервал истечения срока действия сообщения Ключ: "Интервал истечения срока действия сообщения"
Значение: представление UTF-8 числа секунд до истечения срока действия сообщения. Дополнительные сведения см . в свойстве "Интервал истечения срока действия сообщения".
Данные корреляции: Ключ: "Данные корреляции"
Значение: копирование данных корреляции из исходного сообщения. В отличие от многих свойств MQTT версии 5, которые кодируются в кодировке UTF-8, данные корреляции могут быть произвольными данными.
Content Type: Ключ: "Тип контента"
Значение: копирование типа контента из исходного сообщения.

Пары значений ключей ключа свойства пользователя MQTT версии 5 напрямую преобразуются в заголовки пользователей Kafka. Если заголовок пользователя в сообщении имеет то же имя, что и встроенное свойство MQTT (например, заголовок пользователя с именем "Корреляционные данные"), то будет ли перенаправление значения свойства спецификации MQTT версии 5 или свойство пользователя не определено.

Потоки данных никогда не получают эти свойства от брокера MQTT. Таким образом, поток данных никогда не пересылает их:

  • Псевдоним раздела
  • Идентификаторы подписки
Свойство "Интервал истечения срока действия сообщения"

Интервал истечения срока действия сообщения указывает, сколько времени сообщение может оставаться в брокере MQTT до отмены.

Когда поток данных получает сообщение MQTT с указанным интервалом истечения срока действия сообщения, он:

  • Записывает время получения сообщения.
  • Перед отправкой сообщения в место назначения время вычитается из сообщения в очередь из исходного интервала истечения срока действия.
  • Если сообщение не истекло (операция выше равно > 0), сообщение отправляется в место назначения и содержит обновленное время истечения срока действия сообщения.
  • Если сообщение истекло (операция выше = <0), сообщение не создается целевым объектом.

Примеры:

  • Поток данных получает сообщение MQTT с интервалом истечения срока действия сообщения = 3600 секунд. Соответствующее назначение временно отключено, но может повторно подключиться. 1000 секунд перед отправкой этого сообщения MQTT в целевой объект. В этом случае сообщение назначения имеет значение 2600 (3600 – 1000) секунд.
  • Поток данных получает сообщение MQTT с интервалом истечения срока действия сообщения = 3600 секунд. Соответствующее назначение временно отключено, но может повторно подключиться. Однако в этом случае для повторного подключения требуется 4000 секунд. Срок действия сообщения истек, и поток данных не перенаправит это сообщение в место назначения.

Конечная точка Kafka — это источник потока данных

Примечание.

При использовании конечной точки Центров событий в качестве источника потока данных возникает известная проблема, из-за которой заголовок Kafka поврежден при переводе в MQTT. Это происходит только в том случае, если используется концентратор событий, хотя клиент Концентратора событий использует AMQP под крышкой. Например, "foo"="bar", преобразуется "foo", но значение становится "\xa1\x03bar".

Когда конечная точка Kafka является источником потока данных, заголовки пользователей Kafka преобразуются в свойства MQTT версии 5. В следующей таблице описывается преобразование заголовков пользователей Kafka в свойства MQTT версии 5.

Заголовок Kafka Переведенное поведение
Ключ Ключ: "Ключ"
Значение: копия ключа из исходного сообщения.
Метка времени Ключ: "Метка времени"
Значение: кодировка UTF-8 метки времени Kafka, которая является числом миллисекунда с эпохи Unix.

Пары ключей и значений заголовка пользователя Kafka, предоставленные в кодировке UTF-8, напрямую преобразуются в свойства ключа и значения MQTT.

UTF-8 / двоичные несоответствия

MQTT версии 5 поддерживает только свойства на основе UTF-8. Если поток данных получает сообщение Kafka, содержащее один или несколько заголовков, отличных от UTF-8, поток данных:

  • Удалите обижающее свойство или свойства.
  • Перенаправите остальное сообщение в соответствии с предыдущими правилами.

Приложения, для которых требуется двоичная передача в заголовках источника Kafka => свойства MQTT Target должны сначала кодировать их в кодировке UTF-8, например с помощью Base64.

>=64 КБ несоответствия свойств

Свойства MQTT версии 5 должны быть меньше 64 КБ. Если поток данных получает сообщение Kafka, содержащее один или несколько заголовков, равное >64 КБ, поток данных:

  • Удалите обижающее свойство или свойства.
  • Перенаправите остальное сообщение в соответствии с предыдущими правилами.
Преобразование свойств при использовании Центров событий и производителей, использующих AMQP

Если у вас есть сообщения, передаваемые клиентом, конечная точка источника потока данных Kafka выполняет одно из следующих действий:

  • Отправка сообщений в Центры событий с помощью клиентских библиотек, таких как Azure.Messaging.EventHubs
  • Использование AMQP напрямую

Существуют нюансы перевода свойств, которые следует учитывать.

Необходимо выполнить одно из следующих действий:

  • Избегайте отправки свойств
  • Если необходимо отправить свойства, отправьте значения, закодированные как UTF-8.

Когда центры событий преобразовывает свойства из AMQP в Kafka, он включает базовые типы, закодированные AMQP в своем сообщении. Дополнительные сведения о поведении см. в разделе "Обмен событиями между потребителями и производителями" с помощью различных протоколов.

В следующем примере кода, когда конечная точка потока данных получает значение "foo":"bar", оно получает свойство как <0xA1 0x03 "bar">.

using global::Azure.Messaging.EventHubs;
using global::Azure.Messaging.EventHubs.Producer;

var propertyEventBody = new BinaryData("payload");

var propertyEventData = new EventData(propertyEventBody)
{
  Properties =
  {
    {"foo", "bar"},
  }
};

var propertyEventAdded = eventBatch.TryAdd(propertyEventData);
await producerClient.SendAsync(eventBatch);

Конечная точка потока данных не может перенаправить свойство <0xA1 0x03 "bar"> полезных данных в сообщение MQTT, так как данные не TF-8. Однако если указать строку UTF-8, конечная точка потока данных преобразует строку перед отправкой в MQTT. Если используется строка UTF-8, сообщение MQTT будет иметь "foo":"bar" в качестве свойств пользователя.

Переводятся только заголовки UTF-8. Например, учитывая следующий сценарий, в котором свойство задано как float:

Properties = 
{
  {"float-value", 11.9 },
}

Конечная точка потока данных удаляет пакеты, содержащие "float-value" поле.

Перенаправляются не все свойства данных событий, включая propertyEventData.correlationId. Дополнительные сведения см. в разделе "Свойства пользователя события",

Источники облачных

CloudEvents — это способ описания данных событий распространенным способом. Параметры CloudEvents используются для отправки или получения сообщений в формате CloudEvents. Вы можете использовать CloudEvents для архитектур на основе событий, где разные службы должны взаимодействовать друг с другом в одном или разных поставщиках облачных служб.

Параметры CloudEventAttributes илиPropagateCreateOrRemap.

На странице параметров конечной точки потока данных для операций выберите вкладку "Дополнительно ", а затем используйте поле атрибутов событий Cloud Для указания параметра CloudEvents.

В следующих разделах описывается распространение или изменение свойств CloudEvent.

Параметр распространения

Свойства CloudEvent передаются для сообщений, содержащих необходимые свойства. Если сообщение не содержит необходимых свойств, сообщение передается как есть. Если необходимые свойства присутствуют, ce_ префикс добавляется в имя свойства CloudEvent.

имени Обязательное поле Пример значения Имя вывода Выходное значение
specversion Да 1.0 ce-specversion Передается как
type Да ms.aio.telemetry ce-type Передается как
source Да aio://mycluster/myoven ce-source Передается как
id Да A234-1234-1234 ce-id Передается как
subject No aio/myoven/telemetry/temperature ce-subject Передается как
time No 2018-04-05T17:31:00Z ce-time Передается как есть. Это не отдохнулось.
datacontenttype No application/json ce-datacontenttype Изменен на тип содержимого выходных данных после необязательного этапа преобразования.
dataschema No sr://fabrikam-schemas/123123123234234234234234#1.0.0 ce-dataschema Если в конфигурации преобразования выходных данных задана схема преобразования выходных данных, dataschema изменится на выходную схему.

Параметр CreateOrRemap

Свойства CloudEvent передаются для сообщений, содержащих необходимые свойства. Если сообщение не содержит обязательных свойств, создаются свойства.

имени Обязательное поле Имя вывода Созданное значение, если отсутствует
specversion Да ce-specversion 1.0
type Да ce-type ms.aio-dataflow.telemetry
source Да ce-source aio://<target-name>
id Да ce-id Созданный UUID в целевом клиенте
subject No ce-subject Выходной раздел, в котором отправляется сообщение
time No ce-time Создано как RFC 3339 в целевом клиенте
datacontenttype No ce-datacontenttype Изменен на тип содержимого выходных данных после необязательного этапа преобразования
dataschema No ce-dataschema Схема, определенная в реестре схем

Следующие шаги

Дополнительные сведения о потоках данных см. в статье "Создание потока данных".