你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

在 Azure IoT 操作中配置数据流

重要

Azure Arc 启用的 Azure IoT 操作预览版目前处于预览状态。 不应在生产环境中使用此预览版软件。

在正式版推出后,你需要部署新的 Azure IoT 操作安装。 无法升级预览版安装。

有关 beta 版本、预览版或尚未正式发布的版本的 Azure 功能所适用的法律条款,请参阅 Microsoft Azure 预览版的补充使用条款

数据流是数据使用可选转换从源到目标采用的路径。 你可以通过创建数据流自定义资源或使用 Azure IoT 操作工作室门户来配置数据流。 数据流由三个部分组成:源、转换和目标。

显示从源流到转换再到目标的数据流的关系图。

若要定义源和目标,需要配置数据流终结点。 转换是可选的,可以包括扩充数据、筛选数据并将数据映射到其他字段等操作。

重要

每个数据流都必须具有 Azure IoT 操作本地 MQTT 代理默认终结点作为源或目标

可以使用 Azure IoT 操作中的操作体验来创建数据流。 操作体验提供了可视界面来配置数据流。 你还可以使用 Bicep 通过 Bicep 模板文件创建数据流,或者使用 Kubernetes 通过 YAML 文件创建数据流。

请继续阅读,了解如何配置源、转换和目标。

先决条件

获取 Azure IoT 操作预览版实例后,即可使用默认数据流配置文件和终结点部署数据流。 但是,也可以配置数据流配置文件和终结点来自定义数据流。

数据流配置文件

数据流配置文件指定其下的数据流要使用的实例数。 如果你不需要多个采用不同缩放设置的数据流组,则可以使用默认数据流配置文件。 若要了解如何配置数据流配置文件,请参阅配置数据流配置文件

数据流终结点

需要使用数据流终结点来配置数据流的源和目标。 若要快速开始,可以使用本地 MQTT 代理的默认数据流终结点。 还可以创建其他类型的数据流终结点,例如 Kafka、事件中心或 Azure Data Lake Storage。 若要了解如何配置每种类型的数据流终结点,请参阅配置数据流终结点

开始使用

满足先决条件后,即可开始创建数据流。

若要在操作体验中创建数据流,请选择“数据流”>“创建数据流”。 然后会显示一个页面,可在其中配置数据流的源、转换和目标。

使用操作体验创建数据流的屏幕截图。

查看以下部分,了解如何配置数据流的操作类型。

Source

若要配置数据流的源,请指定终结点引用和终结点的数据源列表。

将资产用作源

可以使用资产作为数据流的源。 只能在操作体验中将资产用作源。

  1. 在“源详细信息”下,选择“资产”。

  2. 选择要用作源终结点的资产。

  3. 选择“继续”。

    将显示所选资产的数据点列表。

    使用操作体验选择资产作为源终结点的屏幕截图。

  4. 选择“应用”以将资产用作源终结点。

使用资产作为源时,资产定义用于推断数据流的架构。 资产定义包括资产的数据点的架构。 若要了解详细信息,请参阅远程管理资产配置

配置完成后,资产中的数据通过本地 MQTT 代理到达数据流。 因此,使用资产作为源时,数据流实际上使用本地 MQTT 代理默认终结点作为源。

将默认 MQTT 终结点用作源

  1. 在“源详细信息”下,选择“MQTT”。

    使用操作体验选择 MQTT 作为源终结点的屏幕截图。

  2. 输入 MQTT 源的以下设置:

    设置 说明
    MQTT 主题 用于订阅传入消息的 MQTT 主题筛选器。 请参阅配置 MQTT 或 Kafka 主题
    消息架构 用于反序列化传入消息的架构。 请参阅指定用于反序列化数据的架构
  3. 选择“应用”。

如果未将默认终结点用作源,则必须将其用作目标。 若要了解详细信息,请参阅数据流必须使用本地 MQTT 代理终结点

将自定义 MQTT 或 Kafka 数据流终结点用作源

如果创建了自定义 MQTT 或 Kafka 数据流终结点(例如,与事件网格或事件中心配合使用的终结点),则可以将其用作数据流的源。 请记住,存储类型终结点(例如 Data Lake 或 Fabric OneLake)不能用作源。

若要配置,请使用 Kubernetes YAML 或 Bicep。 请将占位符值替换为你的自定义终结点名称和主题。

操作体验目前不支持使用自定义 MQTT 或 Kafka 终结点作为源。

配置数据源(MQTT 或 Kafka 主题)

可以在源中指定多个 MQTT 或 Kafka 主题,而无需修改数据流终结点配置。 这种灵活性意味着即使主题不同,也可以在多个数据流中重用同一个终结点。 有关详细信息,请参阅重用数据流终结点

MQTT 主题

当源是 MQTT(包括事件网格)终结点时,你可以使用 MQTT 主题筛选器来订阅传入消息。 主题筛选器可以包含通配符以订阅多个主题。 例如,thermostats/+/telemetry/temperature/# 订阅来自恒温器的所有温度遥测消息。 若要配置 MQTT 主题筛选器,请执行以下操作:

在操作体验数据流的“源详细信息”中选择“MQTT”,然后使用“MQTT 主题”字段指定用于订阅传入消息的 MQTT 主题筛选器

注意

在操作体验中只能指定一个 MQTT 主题筛选器。 若要使用多个 MQTT 主题筛选器,请使用 Bicep 或 Kubernetes。

共享订阅

若要将共享订阅与 MQTT 源配合使用,可以用 $shared/<GROUP_NAME>/<TOPIC_FILTER> 形式指定共享订阅主题。

在操作体验数据流的“源详细信息”中选择“MQTT”,然后使用“MQTT 主题”字段指定共享订阅组和主题

如果数据流配置文件中的实例计数大于 1,则会自动为所有使用 MQTT 源的数据流启用共享订阅。 在这种情况下,将添加 $shared 前缀,并自动生成共享订阅组名称。 例如,如果你有一个实例数为 3 的数据流配置文件,并且数据流使用 MQTT 终结点作为配置了主题 topic1topic2 的源,则它们会自动转换为共享订阅(以 $shared/<GENERATED_GROUP_NAME>/topic1$shared/<GENERATED_GROUP_NAME>/topic2 的形式)。 如果要使用另一个共享订阅组 ID,可以在主题中替代它,例如 $shared/mygroup/topic1

重要

如果使用事件网格 MQTT 代理作为源,则实例计数大于 1 时需要共享订阅的数据流非常重要,因为它不支持共享订阅。 为避免消息缺失,在使用事件网格 MQTT 代理作为源时,请将数据流配置文件实例计数设置为 1。 那是数据流是订阅者并从云中接收消息的时候。

Kafka 主题

当源是 Kafka(包括事件中心)终结点时,指定用于订阅传入消息的各个 kafka 主题。 不支持通配符,因此必须静态指定每个主题。

注意

通过 Kafka 终结点使用事件中心时,命名空间中的每个事件中心都是 Kafka 主题。 例如,如果事件中心命名空间包含 thermostatshumidifiers 这两个事件中心,则你可以将每个事件中心指定为一个 Kafka 主题。

若要配置 Kafka 主题,请执行以下操作:

操作体验目前不支持使用 Kafka 终结点作为源。

指定要反序列化数据的架构

如果源数据具有可选字段或带有不同类型的字段,请指定反序列化架构以确保一致性。 例如,数据可能包含并非在所有消息中都存在的字段。 如果没有架构,转换就无法处理这些字段,因为它们会有空值。 具有架构时,可以指定默认值或忽略字段。

仅当使用 MQTT 或 Kafka 源时,指定架构才有意义。 如果源是资产,则系统会根据资产定义自动推理架构。

若要配置用于反序列化来自源的传入消息的架构,请执行以下操作:

在操作体验数据流的“源详细信息”中选择“MQTT”,然后使用“消息架构”字段指定架构。 可以先使用“上传”按钮上传架构文件。 有关详细信息,请参阅了解消息架构

转换

转换操作用于在将数据发送到目标之前转换源中的数据。 转换是可选的。 如果不需要对数据进行更改,请不要在数据流配置中添加转换操作。 无论在配置中指定的顺序如何,多个转换都将分阶段链接在一起。 阶段的顺序始终为:

  1. 扩充、重命名或添加新属性:在给定数据集和条件的情况下向源数据添加额外数据。
  2. 筛选:根据条件筛选数据。
  3. 映射或计算:使用可选转换将数据从一个字段移到另一个字段。

在操作体验中,选择“数据流”>“添加转换(可选)”。

使用操作体验将转换添加到数据流的屏幕截图。

扩充:添加引用数据

若要扩充数据,可以在 Azure IoT 操作的分布式状态存储 (DSS) 中使用引用数据集。 数据集用于根据条件向源数据添加额外数据。 条件指定为与数据集中的字段匹配的源数据中的字段。

你可以使用 DSS 集工具示例将示例数据加载到 DSS。 分布式状态存储中的键名称对应于数据流配置中的数据集。

在操作体验中,扩充阶段目前支持使用重命名和新属性转换。

  1. 在操作体验中,选择一个数据流,然后选择“添加转换(可选)”

  2. 选择“重命名”或“新属性”转换,然后选择“添加”

    使用操作经验重命名数据点并添加新属性的屏幕截图。

如果数据集具有含 asset 字段的记录,类似于:

{
  "asset": "thermostat1",
  "location": "room1",
  "manufacturer": "Contoso"
}

源中 deviceId 字段与 thermostat1 匹配的数据具有在筛选和映射阶段可用的 locationmanufacturer 字段。

有关条件语法的详细信息,请参阅使用数据流扩充数据使用数据流转换数据

筛选器:基于条件筛选数据

若要按条件筛选数据,可以使用 filter 阶段。 条件指定为与值匹配的源数据中的字段。

  1. 在“转换(可选)”下,选择“筛选器”>“添加”。

  2. 选择要包含在数据集中的数据点。

  3. 添加筛选器条件和说明。

    使用操作体验添加筛选器转换的屏幕截图。

  4. 选择“应用”。

例如,可以使用 temperature > 20 之类的筛选条件根据温度字段筛选小于或等于 20 的数据。

映射:将数据从一个字段移动到另一个字段

若要将数据映射到另一个具有可选转换的字段,可以使用 map 操作。 系统将转换指定为使用源数据中的字段的公式。

在操作体验中,目前支持使用“计算”转换进行映射

  1. 在“转换(可选)”下,选择“计算”>“添加”。

  2. 输入必填字段和表达式。

    使用操作体验添加计算转换的屏幕截图。

  3. 选择“应用”。

若要了解详细信息,请参阅使用数据流映射数据使用数据流转换数据

根据架构序列化数据

如果要在将数据发送到目标之前对其进行序列化,则需要指定架构和序列化格式。 否则,数据会以 JSON 格式序列化,并推断出类型。 存储终结点(例如 Microsoft Fabric 或 Azure Data Lake)需要架构来确保数据一致性。 支持的序列化格式为 Parquet 和 Delta。

操作体验目前不支持指定输出架构和序列化。

有关架构注册表的详细信息,请参阅了解消息架构

目标

若要配置数据流的目标,请指定终结点引用和数据目标。 可以指定终结点的数据目标列表。

若要将数据发送到除本地 MQTT 代理以外的目标,请创建数据流终结点。 若要了解操作方法,请参阅配置数据流终结点。 如果目标不是本地 MQTT 代理,则必须将其用作源。 若要了解详细信息,请参阅数据流必须使用本地 MQTT 代理终结点

重要

存储终结点需要架构引用。 如果已经为 Microsoft Fabric OneLake、ADLS Gen 2、Azure 数据资源管理器和本地存储创建了存储目标终结点,则必须指定架构引用。

  1. 选择要用作目标的数据流终结点。

    使用操作体验选择事件中心目标终结点的屏幕截图。

  2. 选择“继续”以配置目标。

  3. 输入目标所需的设置,包括要将数据发送到的主题或表。 有关详细信息,请参阅配置数据目标(主题、容器或表)

配置数据目标(主题、容器或表)

与数据源类似,数据目标这一概念用于保持数据流终结点在多个数据流中的可重用性。 本质上,它代表数据流终结点配置中的子目录。 例如,如果数据流终结点是存储终结点,则数据目标是存储帐户中的表。 如果数据流终结点是 Kafka 终结点,则数据目标是 Kafka 主题。

终结点类型 数据目标的含义 说明
MQTT(或事件网格) 主题 将数据发送到的 MQTT 主题。 仅支持静态主题,不支持通配符。
Kafka(或事件中心) 主题 将数据发送到的 Kafka 主题。 仅支持静态主题,不支持通配符。 如果终结点是事件中心命名空间,则数据目标是命名空间中的单个事件中心。
Azure Data Lake Storage 容器 存储帐户中的容器。 不是表。
Microsoft Fabric OneLake 表或文件夹 对应于为终结点配置的路径类型
Azure 数据资源管理器 Azure 数据资源管理器数据库中的表。
本地存储 文件夹 本地存储永久性卷装载中的文件夹或目录名称。 使用由 Azure Arc 云引入边缘卷启用的 Azure 容器存储时,这必须与所创建的子卷的 spec.path 参数相符。

若要配置数据目标,请执行以下操作:

使用操作体验时,系统会根据终结点类型自动解释数据目标字段。 例如,如果数据流终结点是存储终结点,则目标详细信息页面会提示你输入容器名称。 如果数据流终结点是 MQTT 终结点,目标详细信息页面会提示你输入主题,依此类推。

显示操作体验提示用户根据终结点类型输入 MQTT 主题的屏幕截图。

示例

以下示例是一个数据流配置,它将 MQTT 终结点用于源和目标。 源会筛选 MQTT 主题 azure-iot-operations/data/thermostat 中的数据。 该转换会将温度转换为华氏度,并筛选温度乘以湿度小于 100000 的数据。 目标会将数据发送到 MQTT 主题 factory

有关配置示例,请参阅 Bicep 或 Kubernetes 选项卡。

若要查看数据流配置的更多示例,请参阅 Azure REST API - 数据流快速入门 Bicep

验证数据流是否正常工作

遵循教程:到 Azure 事件网格的双向 MQTT 桥,验证数据流是否正常工作。

导出数据流配置

若要导出数据流配置,可以使用操作体验或导出数据流自定义资源。

选择要导出的数据流,然后从工具栏中选择“导出”。

使用操作体验导出数据流的屏幕截图。

正确的数据流配置

若要确保数据流按预期工作,请验证以下内容:

后续步骤