你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

在 Azure IoT 操作中配置数据流

重要

本页包含使用 Kubernetes 部署清单(目前为预览版)管理 Azure IoT 操作组件的说明。 此功能存在若干限制,不应该用于生产工作负载。

有关 beta 版本、预览版或尚未正式发布的版本的 Azure 功能所适用的法律条款,请参阅 Microsoft Azure 预览版的补充使用条款

数据流是数据使用可选转换从源到目标采用的路径。 你可以通过创建数据流自定义资源或使用 Azure IoT 操作工作室门户来配置数据流。 数据流由三个部分组成:源、转换和目标。

显示从源流到转换再到目标的数据流的关系图。

若要定义源和目标,需要配置数据流终结点。 转换是可选的,可以包括扩充数据、筛选数据并将数据映射到其他字段等操作。

重要

每个数据流都必须具有 Azure IoT 操作本地 MQTT 代理默认终结点作为源或目标

可以使用 Azure IoT 操作中的操作体验来创建数据流。 操作体验提供了可视界面来配置数据流。 你还可以使用 Bicep 通过 Bicep 模板文件创建数据流,或者使用 Kubernetes 通过 YAML 文件创建数据流。

请继续阅读,了解如何配置源、转换和目标。

先决条件

获取 Azure IoT 操作实例后,即可使用默认数据流配置文件和终结点部署数据流。 但是,也可以配置数据流配置文件和终结点来自定义数据流。

数据流配置文件

如果数据流不需要不同的缩放设置,请使用 Azure IoT 操作提供的默认数据流配置文件。 若要了解如何配置数据流配置文件,请参阅配置数据流配置文件

数据流终结点

需要使用数据流终结点来配置数据流的源和目标。 若要快速开始,可以使用本地 MQTT 代理的默认数据流终结点。 还可以创建其他类型的数据流终结点,例如 Kafka、事件中心或 Azure Data Lake Storage。 若要了解如何配置每种类型的数据流终结点,请参阅配置数据流终结点

开始使用

满足先决条件后,即可开始创建数据流。

若要在操作体验中创建数据流,请选择“数据流”>“创建数据流”。 然后会显示一个页面,可在其中配置数据流的源、转换和目标。

使用操作体验创建数据流的屏幕截图。

查看以下部分,了解如何配置数据流的操作类型。

Source

若要配置数据流的源,请指定终结点引用和终结点的数据源列表。 选择以下选项之一作为数据流的源。

如果未将默认终结点用作源,则必须将其用作目标。 若要了解详细信息,请参阅数据流必须使用本地 MQTT 代理终结点

选项 1:将默认 MQTT 终结点用作源

  1. 在“源详细信息”下,选择“MQTT”。

    使用操作体验选择 MQTT 作为源终结点的屏幕截图。

  2. 输入 MQTT 源的以下设置:

    设置 说明
    MQTT 主题 用于订阅传入消息的 MQTT 主题筛选器。 请参阅配置 MQTT 或 Kafka 主题
    消息架构 用于反序列化传入消息的架构。 请参阅指定用于反序列化数据的架构
  3. 选择“应用”。

选项 2:将资产用作源

可以使用资产作为数据流的源。 只能在操作体验中将资产用作源。

  1. 在“源详细信息”下,选择“资产”。

  2. 选择要用作源终结点的资产。

  3. 选择“继续”。

    将显示所选资产的数据点列表。

    使用操作体验选择资产作为源终结点的屏幕截图。

  4. 选择“应用”以将资产用作源终结点。

使用资产作为源时,资产定义用于推断数据流的架构。 资产定义包括资产的数据点的架构。 若要了解详细信息,请参阅远程管理资产配置

配置完成后,资产中的数据通过本地 MQTT 代理到达数据流。 因此,使用资产作为源时,数据流实际上使用本地 MQTT 代理默认终结点作为源。

选项 3:将自定义 MQTT 或 Kafka 数据流终结点用作源

如果创建了自定义 MQTT 或 Kafka 数据流终结点(例如,与事件网格或事件中心配合使用的终结点),则可以将其用作数据流的源。 请记住,存储类型终结点(例如 Data Lake 或 Fabric OneLake)不能用作源。

若要配置,请使用 Kubernetes YAML 或 Bicep。 请将占位符值替换为你的自定义终结点名称和主题。

操作体验目前不支持使用自定义 MQTT 或 Kafka 终结点作为源。

配置数据源(MQTT 或 Kafka 主题)

可以在源中指定多个 MQTT 或 Kafka 主题,而无需修改数据流终结点配置。 这种灵活性意味着即使主题不同,也可以在多个数据流中重用同一个终结点。 有关详细信息,请参阅重用数据流终结点

MQTT 主题

当源是 MQTT(包括事件网格)终结点时,你可以使用 MQTT 主题筛选器来订阅传入消息。 主题筛选器可以包含通配符以订阅多个主题。 例如,thermostats/+/telemetry/temperature/# 订阅来自恒温器的所有温度遥测消息。 若要配置 MQTT 主题筛选器,请执行以下操作:

在操作体验数据流的“源详细信息”中选择“MQTT”,然后使用“MQTT 主题”字段指定用于订阅传入消息的 MQTT 主题筛选器

注意

在操作体验中只能指定一个 MQTT 主题筛选器。 若要使用多个 MQTT 主题筛选器,请使用 Bicep 或 Kubernetes。

共享订阅

若要将共享订阅与 MQTT 源配合使用,可以用 $shared/<GROUP_NAME>/<TOPIC_FILTER> 形式指定共享订阅主题。

在操作体验数据流的“源详细信息”中选择“MQTT”,然后使用“MQTT 主题”字段指定共享订阅组和主题

如果数据流配置文件中的实例计数大于 1,则会自动为所有使用 MQTT 源的数据流启用共享订阅。 在这种情况下,将添加 $shared 前缀,并自动生成共享订阅组名称。 例如,如果你有一个实例数为 3 的数据流配置文件,并且数据流使用 MQTT 终结点作为配置了主题 topic1topic2 的源,则它们会自动转换为共享订阅(以 $shared/<GENERATED_GROUP_NAME>/topic1$shared/<GENERATED_GROUP_NAME>/topic2 的形式)。

可以在配置中明确创建名为 $shared/mygroup/topic 的主题。 但是,不建议明确添加 $shared 主题,因为 $shared 前缀会在需要时自动添加。 如果未设置组名,则数据流可以使用组名进行优化。 例如,如果未设置 $share,数据流只需通过主题名称进行操作。

重要

如果使用事件网格 MQTT 代理作为源,则实例计数大于 1 时需要共享订阅的数据流非常重要,因为它不支持共享订阅。 为避免消息缺失,在使用事件网格 MQTT 代理作为源时,请将数据流配置文件实例计数设置为 1。 那是数据流是订阅者并从云中接收消息的时候。

Kafka 主题

当源是 Kafka(包括事件中心)终结点时,指定用于订阅传入消息的各个 Kafka 主题。 不支持通配符,因此必须静态指定每个主题。

注意

通过 Kafka 终结点使用事件中心时,命名空间中的每个事件中心都是 Kafka 主题。 例如,如果事件中心命名空间包含 thermostatshumidifiers 这两个事件中心,则你可以将每个事件中心指定为一个 Kafka 主题。

若要配置 Kafka 主题,请执行以下操作:

操作体验目前不支持使用 Kafka 终结点作为源。

指定源架构

使用 MQTT 或 Kafka 作为源时,可以指定架构以在操作体验门户中显示数据点列表。 请注意,目前不支持使用架构来反序列化和验证传入消息。

如果源是资产,则系统会根据资产定义自动推理架构。

提示

若要从示例数据文件生成架构,请使用 Schema Gen Helper

若要配置用于反序列化来自源的传入消息的架构,请执行以下操作:

在操作体验数据流的“源详细信息”中选择“MQTT”,然后使用“消息架构”字段指定架构。 可以先使用“上传”按钮上传架构文件。 有关详细信息,请参阅了解消息架构

有关详细信息,请参阅了解消息架构

转换

转换操作用于在将数据发送到目标之前转换源中的数据。 转换是可选的。 如果不需要对数据进行更改,请不要在数据流配置中添加转换操作。 无论在配置中指定的顺序如何,多个转换都将分阶段链接在一起。 阶段的顺序始终为:

  1. 扩充:在给定数据集和匹配条件的情况下,向源数据添加额外数据。
  2. 筛选:根据条件筛选数据。
  3. 映射、计算、重命名,或添加新属性:使用可选转换将数据从一个字段移动到另一个字段。

本部分介绍数据流转换。 有关更多详细信息,请参阅使用数据流映射数据使用数据流转换转换数据使用数据流扩充数据

在操作体验中,选择“数据流”>“添加转换(可选)”。

使用操作体验将转换添加到数据流的屏幕截图。

扩充:添加引用数据

若要扩充数据,请先在 Azure IoT 操作状态存储中添加参考数据集。 数据集用于根据条件向源数据添加额外数据。 条件指定为与数据集中的字段匹配的源数据中的字段。

可以使用状态存储 CLI 将示例数据加载到状态存储中。 状态存储中的键名称对应于数据流配置中的数据集。

目前,操作体验不支持扩充阶段。

如果数据集具有含 asset 字段的记录,类似于:

{
  "asset": "thermostat1",
  "location": "room1",
  "manufacturer": "Contoso"
}

源中 deviceId 字段与 thermostat1 匹配的数据具有在筛选和映射阶段可用的 locationmanufacturer 字段。

有关条件语法的详细信息,请参阅使用数据流扩充数据使用数据流转换数据

筛选器:基于条件筛选数据

若要按条件筛选数据,可以使用 filter 阶段。 条件指定为与值匹配的源数据中的字段。

  1. 在“转换(可选)”下,选择“筛选器”>“添加”。

    使用操作体验添加筛选器转换的屏幕截图。

  2. 输入所需的设置。

    设置 说明
    筛选条件 根据源数据中的字段筛选数据的条件。
    说明 提供筛选条件的描述。

    在筛选条件字段中,键入 @ 或选择 Ctrl + Space,从下拉列表中选择数据点。 还可以采用 @$metadata.<header> 格式输入 $metadata 标头。

    条件可以使用源数据中的字段。 例如,可以使用 temperature > 20 之类的筛选条件根据温度字段筛选小于或等于 20 的数据。

  3. 选择“应用”。

映射:将数据从一个字段移动到另一个字段

若要将数据映射到另一个具有可选转换的字段,可以使用 map 操作。 系统将转换指定为使用源数据中的字段的公式。

在操作体验中,目前支持使用“计算”、“重命名”和“新属性”转换进行映射。

计算

可以使用“计算”转换将公式应用于源数据。 此操作用于将公式应用于源数据并存储结果字段。

  1. 在“转换(可选)”下,选择“计算”>“添加”。

    使用操作体验添加计算转换的屏幕截图。

  2. 输入所需的设置。

    设置 说明
    选择公式 从下拉列表中选择现有公式,或选择“自定义”手动输入公式。
    输出 指定结果的输出显示名称。
    公式 输入要应用于源数据的公式。
    说明 为转换提供说明。
    上一个已知值 (可选)如果当前值不可用,则使用最后一个已知值。

    可以在“公式”字段中输入或编辑公式。 公式可以使用源数据中的字段。 键入 @ 或选择 Ctrl + Space,从下拉列表中选择数据点。 还可以采用 @$metadata.<header> 格式输入 $metadata 标头。

    公式可以使用源数据中的字段。 例如,可以使用源数据中的 temperature 字段将温度转换为摄氏度,并将其存储在 temperatureCelsius 输出字段中。

  3. 选择“应用”。

重命名

可以使用“重命名”转换来重命名数据点。 此操作用于将源数据中的数据点重命名为新名称。 新名称可用于数据流的后续阶段。

  1. 在“转换(可选)”下,选择“重命名”>“添加”。

    使用操作体验重命名数据点的屏幕截图。

  2. 输入所需的设置。

    设置 说明
    数据点 从下拉列表中选择一个数据点或以 $metadata.<header>. 格式输入 $metadata 标头
    新数据点名称 为数据点输入新名称。
    说明 为转换提供说明。
  3. 选择“应用”。

新属性

可以使用“新属性”转换向源数据添加新属性。 此操作用于向源数据添加新属性。 新属性可用于数据流的后续阶段。

  1. 在“转换(可选)”下,选择“新属性”>“添加”。

    使用操作体验添加新属性的屏幕截图。

  2. 输入所需的设置。

    设置 说明
    属性键 输入新属性的键。
    属性值 输入新属性的值。
    说明 为新属性提供说明。
  3. 选择“应用”。

若要了解详细信息,请参阅使用数据流映射数据使用数据流转换数据

根据架构序列化数据

如果要在将数据发送到目标之前对其进行序列化,则需要指定架构和序列化格式。 否则,数据会以 JSON 格式序列化,并推断出类型。 存储终结点(例如 Microsoft Fabric 或 Azure Data Lake)需要架构来确保数据一致性。 支持的序列化格式为 Parquet 和 Delta。

提示

若要从示例数据文件生成架构,请使用 Schema Gen Helper

对于操作体验,可以在数据流终结点详细信息中指定架构和序列化格式。 支持序列化格式的终结点包括 Microsoft Fabric OneLake、Azure Data Lake Storage Gen 2 和 Azure 数据资源管理器。 例如,若要以 Delta 格式序列化数据,需要将架构上传到架构注册表并在数据流目标终结点配置中引用它。

使用操作体验设置数据流目标终结点序列化的屏幕截图。

有关架构注册表的详细信息,请参阅了解消息架构

目标

若要配置数据流的目标,请指定终结点引用和数据目标。 可以指定终结点的数据目标列表。

若要将数据发送到除本地 MQTT 代理以外的目标,请创建数据流终结点。 若要了解操作方法,请参阅配置数据流终结点。 如果目标不是本地 MQTT 代理,则必须将其用作源。 若要了解详细信息,请参阅数据流必须使用本地 MQTT 代理终结点

重要

存储终结点需要序列化架构。 若要将数据流与 Microsoft Fabric OneLake、Azure Data Lake Storage、Azure 数据资源管理器或本地存储一起使用,必须指定架构引用

  1. 选择要用作目标的数据流终结点。

    使用操作体验选择事件中心目标终结点的屏幕截图。

  2. 选择“继续”以配置目标。

  3. 输入目标所需的设置,包括要将数据发送到的主题或表。 有关详细信息,请参阅配置数据目标(主题、容器或表)

配置数据目标(主题、容器或表)

与数据源类似,数据目标这一概念用于保持数据流终结点在多个数据流中的可重用性。 本质上,它代表数据流终结点配置中的子目录。 例如,如果数据流终结点是存储终结点,则数据目标是存储帐户中的表。 如果数据流终结点是 Kafka 终结点,则数据目标是 Kafka 主题。

终结点类型 数据目标的含义 说明
MQTT(或事件网格) 主题 将数据发送到的 MQTT 主题。 仅支持静态主题,不支持通配符。
Kafka(或事件中心) 主题 将数据发送到的 Kafka 主题。 仅支持静态主题,不支持通配符。 如果终结点是事件中心命名空间,则数据目标是命名空间中的单个事件中心。
Azure Data Lake Storage 容器 存储帐户中的容器。 不是表。
Microsoft Fabric OneLake 表或文件夹 对应于为终结点配置的路径类型
Azure 数据资源管理器 Azure 数据资源管理器数据库中的表。
本地存储 文件夹 本地存储永久性卷装载中的文件夹或目录名称。 使用由 Azure Arc 云引入边缘卷启用的 Azure 容器存储时,这必须与所创建的子卷的 spec.path 参数相符。

若要配置数据目标,请执行以下操作:

使用操作体验时,系统会根据终结点类型自动解释数据目标字段。 例如,如果数据流终结点是存储终结点,则目标详细信息页面会提示你输入容器名称。 如果数据流终结点是 MQTT 终结点,目标详细信息页面会提示你输入主题,依此类推。

显示操作体验提示用户根据终结点类型输入 MQTT 主题的屏幕截图。

示例

以下示例是一个数据流配置,它将 MQTT 终结点用于源和目标。 源会筛选 MQTT 主题 azure-iot-operations/data/thermostat 中的数据。 该转换会将温度转换为华氏度,并筛选温度乘以湿度小于 100000 的数据。 目标会将数据发送到 MQTT 主题 factory

有关配置示例,请参阅 Bicep 或 Kubernetes 选项卡。

若要查看数据流配置的更多示例,请参阅 Azure REST API - 数据流快速入门 Bicep

验证数据流是否正常工作

遵循教程:到 Azure 事件网格的双向 MQTT 桥,验证数据流是否正常工作。

导出数据流配置

若要导出数据流配置,可以使用操作体验或导出数据流自定义资源。

选择要导出的数据流,然后从工具栏中选择“导出”。

使用操作体验导出数据流的屏幕截图。

正确的数据流配置

若要确保数据流按预期工作,请验证以下内容:

后续步骤