你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

配置 Azure 事件中心和 Kafka 数据流终结点

重要

本页包含使用 Kubernetes 部署清单(目前为预览版)管理 Azure IoT 操作组件的说明。 此功能存在若干限制,不应该用于生产工作负载。

有关 beta 版本、预览版或尚未正式发布的版本的 Azure 功能所适用的法律条款,请参阅 Microsoft Azure 预览版的补充使用条款

若要在 Azure IoT 操作和 Apache Kafka 中转站之间设置双向通信,可以配置数据流终结点。 此配置允许指定终结点、传输层安全性 (TLS)、身份验证和其他设置。

先决条件

Azure 事件中心

Azure 事件中心与 Kafka 协议兼容,可用于数据流,但存在一些限制。

创建 Azure 事件中心命名空间和事件中心

首先,创建已启用 Kafka 的 Azure 事件中心命名空间

接下来,在命名空间中创建事件中心。 每个事件中心与一个 Kafka 主题相对应。 可以在同一命名空间中创建多个事件中心来表示多个 Kafka 主题。

为托管标识分配权限

若要为 Azure 事件中心配置数据流终结点,建议使用用户分配的托管标识或系统分配的托管标识。 此方法是安全的,并且无需手动管理凭据。

创建 Azure 事件中心命名空间和事件中心后,需要为 Azure IoT 操作托管标识分配一个角色,以授予其向事件中心发送或接收消息的权限。

如果使用系统分配的托管标识,请在 Azure 门户中,转到 Azure IoT 操作实例并选择“概述”。 复制“Azure IoT 操作 Arc 扩展”后列出的扩展的名称。 例如 azure-iot-operations-xxxx7。 可以使用 Azure IoT 操作 Arc 扩展的名称找到系统分配的托管标识。

然后,转到事件中心命名空间 >“访问控制(IAM)”>“添加角色分配”。

  1. 在“角色”选项卡上,选择相应的角色,如“Azure Event Hubs Data Sender”或“Azure Event Hubs Data Receiver。 这为托管标识提供了必要的权限来为命名空间中的所有事件中心发送或接收消息。 要了解详情,请参阅使用 Microsoft Entra ID 对访问事件中心资源的应用程序进行身份验证
  2. 在“成员”选项卡上
    1. 如果使用系统分配的托管标识,请在“分配访问权限到”中,选择“用户、组或服务主体”选项,然后选择“+ 选择成员”并搜索 Azure IoT 中心 Arc 扩展的名称
    2. 如果使用用户分配的托管标识,请在“分配访问权限到”中,选择“+ 选择成员”选项并搜索“设置用户分配的托管标识以进行云连接”

为 Azure 事件中心创建数据流终结点

配置 Azure 事件中心命名空间和事件中心后,可以为已启用 Kafka 的 Azure 事件中心命名空间创建数据流终结点。

  1. 操作体验中,选择“数据流终结点”选项卡。

  2. 在“创建新的数据流终结点”下,选择“Azure 事件中心”>“新建”。

    使用操作体验门户创建 Azure 事件数据流终结点的屏幕截图。

  3. 输入以下用于终结点的设置:

    设置 说明
    Name 数据流终结点的名称。
    主机 格式为 <NAMEPSACE>.servicebus.windows.net:9093 的 Kafka 代理的主机名。 在事件中心的主机设置中包含端口号 9093
    身份验证方法 用于身份验证的方法。 建议选择“系统分配的托管标识”或“用户分配的托管标识
  4. 选择“应用”来预配终结点。

注意

创建数据流时,稍后会配置 Kafka 主题或单个事件中心。 Kafka 主题是数据流消息的目标。

使用连接字符串向事件中心进行身份验证

重要

要使用操作体验门户来管理机密,必须首先通过配置 Azure Key Vault 并启用工作负载标识来使用安全设置启用 Azure IoT 操作。 若要了解详细信息,请参阅在 Azure IoT 操作部署中启用安全设置

在操作体验数据流终结点设置页中,选择“基本”选项卡,然后选择“身份验证方法”>“SASL”。

输入以下用于终结点的设置:

设置 说明
SASL 类型 选择 Plain
同步的机密名称 输入包含连接字符串的 Kubernetes 机密的名称。
用户名引用或令牌机密 SASL 身份验证所引用的用户名或令牌密钥。 可以从 Key Vault 列表中选择一个现有的密钥,也可以创建一个新的密钥。 值必须是 $ConnectionString
令牌机密的密码引用 SASL 身份验证所引用的密码或令牌密钥。 可以从 Key Vault 列表中选择一个现有的密钥,也可以创建一个新的密钥。 此值必须采用 Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY-NAME>;SharedAccessKey=<KEY> 格式。

选择“添加引用”后,如果选择“新建”,请输入以下设置

设置 说明
机密名称 Azure Key Vault 中机密的名称。 选择一个易于记住的名称,以便稍后从列表中选择机密。
密码值 对于用户名,请输入 $ConnectionString。 对于密码,请输入 Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY-NAME>;SharedAccessKey=<KEY> 格式的连接字符串。
设置激活日期 如果启用,则为机密处于活动状态的日期。
设置过期日期 如果启用,则为机密到期的日期。

有关机密的详细信息,请参阅在 Azure IoT 操作(预览版)中创建和管理机密

限制

Azure 事件中心不支持 Kafka 支持的所有压缩类型。 目前,只有 Azure 事件中心高级层和专用层中支持 GZIP 压缩。 使用其他压缩类型可能会导致出错。

自定义 Kafka 代理

若要为非事件中心 Kafka 代理配置数据流终结点,请根据需要设置主机、TLS、身份验证和其他设置。

  1. 操作体验中,选择“数据流终结点”选项卡。

  2. 在“创建新的数据流终结点”下,选择“自定义 Kafka 代理”>“新建”。

    使用操作体验创建 Kafka 数据流终结点的屏幕截图。

  3. 输入以下用于终结点的设置:

    设置 说明
    Name 数据流终结点的名称。
    主机 格式为 <Kafa-broker-host>:xxxx 的 Kafka 代理的主机名。 包含主机设置中的端口号。
    身份验证方法 用于身份验证的方法。 选择“SASL”。
    SASL 类型 SASL 身份验证的类型。 选择“”、“ScramSha256”或“ScramSha512”。 如果使用 SASL,则是必需的。
    同步的机密名称 机密的名称。 如果使用 SASL,则是必需的。
    令牌机密的用户名引用 对 SASL 令牌机密中用户名的引用。 如果使用 SASL,则是必需的。
  4. 选择“应用”来预配终结点。

注意

目前,操作体验不支持将 Kafka 数据流终结点用作源。 你可以使用 Kubernetes 或 Bicep 创建包含源 Kafka 数据流终结点的数据流。

要自定义终结点设置,请参阅以下部分以了解详细信息。

可用身份验证方法

以下身份验证方法可用于 Kafka 代理数据流终结点。

系统分配的托管标识

在配置数据流终结点之前,请将角色分配给 Azure IoT 操作托管标识,以授予连接到 Kafka 代理的权限:

  1. 在 Azure 门户中,转到 Azure IoT 操作实例并选择“概述”
  2. 复制“Azure IoT 操作 Arc 扩展”后列出的扩展的名称。 例如 azure-iot-operations-xxxx7
  3. 转到需要授予权限的云资源。 例如,转到事件中心命名空间“访问控制(IAM)”>“添加角色分配”>
  4. 在“角色”选项卡上,选择相应的角色
  5. 在“成员”选项卡上,对于“分配访问权限到”,请选择“用户、组或服务主体”选项,然后选择“+ 选择成员”并搜索 Azure IoT 操作托管标识。 例如 azure-iot-operations-xxxx7

然后,使用系统分配的托管标识设置配置数据流终结点。

在操作体验数据流终结点设置页中,选择“基本”选项卡,然后选择“身份验证方法”>“系统分配的托管标识”。

此配置创建一个具有默认受众的托管标识,该标识与事件中心命名空间主机值采用 https://<NAMESPACE>.servicebus.windows.net 的形式相同。 但是,如果需要替换默认受众,可以将 audience 字段设置为所需的值。

操作体验不支持。

用户分配的托管标识

若要使用用户分配的托管标识进行身份验证,必须先部署已启用安全设置的 Azure IoT 操作。 然后,需要设置用户分配的托管标识以进行云连接。 若要了解详细信息,请参阅在 Azure IoT 操作部署中启用安全设置

在配置数据流终结点之前,请将角色分配给用户分配的托管标识,以授予连接到 Kafka 代理的权限:

  1. 在 Azure 门户中,转到需要授予权限的云资源。 例如,转到事件网格命名空间“访问控制(IAM)”>“添加角色分配”>
  2. 在“角色”选项卡上,选择相应的角色
  3. 在“成员”选项卡上,对于“分配访问权限到”,请选择“托管标识”选项,然后选择“+ 选择成员”并搜索用户分配的托管标识

然后,使用用户分配的托管标识设置配置数据流终结点。

在操作体验数据流终结点设置页中,选择“基本”选项卡,然后选择“身份验证方法”>“用户分配的托管标识”。

此处的范围是托管标识的受众。 默认值与采用 https://<NAMESPACE>.servicebus.windows.net 格式的事件中心命名空间主机值相同。 但如果需要替代默认受众,则可以使用 Bicep 或 Kubernetes 将范围字段设置为所需的值。

SASL

要使用 SASL 进行身份验证,请指定 SASL 身份验证方法,并配置 SASL 类型以及包含 SASL 令牌机密名称的机密引用。

在操作体验数据流终结点设置页中,选择“基本”选项卡,然后选择“身份验证方法”>“SASL”。

输入以下用于终结点的设置:

设置 说明
SASL 类型 要使用的 SASL 身份验证的类型。 支持的类型有 PlainScramSha256ScramSha512
同步的机密名称 包含 SASL 令牌的 Kubernetes 机密的名称。
用户名引用或令牌机密 SASL 身份验证所引用的用户名或令牌密钥。
令牌机密的密码引用 SASL 身份验证所引用的密码或令牌密钥。

支持的 SASL 类型如下:

  • Plain
  • ScramSha256
  • ScramSha512

该机密必须与 Kafka 数据流终结点位于同一命名空间中。 机密必须具有 SASL 令牌作为密钥值对。

匿名

若要使用匿名身份验证,请更新 Kafka 设置的身份验证部分以使用匿名方法。

在操作体验数据流终结点设置页中,选择“基本”选项卡,然后选择“身份验证方法”>“无”。

高级设置

你可以为 Kafka 数据流终结点设置高级设置,例如 TLS、受信任的 CA 证书、Kafka 消息传送设置、批处理和 CloudEvents。 可以在数据流终结点“高级”门户选项卡中或数据流终结点资源中设置这些设置。

在操作体验中,选择数据流终结点的“高级”选项卡。

使用操作体验设置 Kafka 数据流终结点的高级设置的屏幕截图。

TLS 设置

TLS 模式

若要为 Kafka 终结点启用或禁用 TLS,请在 TLS 设置中更新 mode 设置。

在操作体验数据流终结点设置页中,选择“高级”选项卡,然后使用“启用 TLS 模式”旁边的复选框。

TLS 模式可以设置为 EnabledDisabled。 如果该模式设置为 Enabled,数据流将使用与 Kafka 代理的安全连接。 如果该模式设置为 Disabled,数据流将使用与 Kafka 代理的不安全连接。

受信任的 CA 证书

为 Kafka 终结点配置受信任的 CA 证书,以建立与 Kafka 代理的安全连接。 如果 Kafka 代理使用自签名证书或由默认不受信任的自定义 CA 签名的证书,则此设置非常重要。

在操作体验数据流终结点设置页中,选择“高级”选项卡,然后使用“受信任的 CA 证书配置映射”字段指定包含受信任的 CA 证书的 ConfigMap。

此 ConfigMap 应包含 PEM 格式的 CA 证书。 ConfigMap 必须与 Kafka 数据流资源位于同一命名空间中。 例如:

kubectl create configmap client-ca-configmap --from-file root_ca.crt -n azure-iot-operations

提示

连接到 Azure 事件中心时,不需要 CA 证书,因为事件中心服务使用由默认受信任的公共 CA 签名的证书。

使用者组 ID

使用者组 ID 用于标记数据流用于从 Kafka 主题读取消息的使用者组。 使用者组 ID 在 Kafka 代理中必须是唯一的。

重要

将 Kafka 终结点用作时,需要使用使用者组 ID。 否则,数据流无法从 Kafka 主题读取消息,并且会收到错误“Kafka 类型源终结点必须定义 consumerGroupId”。

在操作体验数据流终结点设置页中,选择“高级”选项卡,然后使用“使用者组 ID 前缀”字段来指定使用者组 ID。

仅当终结点用作目标(即数据流为使用者)时,此设置才会生效。

压缩

压缩字段可为发送到 Kafka 主题的消息启用压缩配置。 压缩有助于减少数据传输所需的网络带宽和存储空间。 但是,压缩还会给进程增加一些开销和延迟。 下表中列出了支持的压缩类型。

说明
None 不应用压缩或批处理。 如果未指定压缩类型,则默认值为“无”。
Gzip 应用 GZIP 压缩和批处理。 GZIP 是一种通用压缩算法,提供压缩比率和速度之间的良好平衡。 目前,只有 Azure 事件中心高级层和专用层中支持 GZIP 压缩
Snappy 应用 Snappy 压缩和批处理。 Snappy 是一种快速压缩算法,可提供中等压缩比率和速度。 Azure 事件中心不支持此压缩模式。
Lz4 应用 LZ4 压缩和批处理。 LZ4 是一种快速压缩算法,可提供低压缩比率和高速度。 Azure 事件中心不支持此压缩模式。

若要配置压缩:

在操作体验数据流终结点设置页中,选择“高级”选项卡,然后使用“压缩”字段来指定压缩类型。

仅当终结点用作目标,且其中数据流为创建器时,此设置才会生效。

批处理

除了压缩之外,还可以在将消息发送到 Kafka 主题之前为消息配置批处理。 批处理允许将多个消息组合在一起,并将其压缩为单个单元,从而提高压缩效率并减少网络开销。

字段 说明 必须
mode 可以是 EnabledDisabled。 默认值为 Enabled,因为 Kafka 没有未批处理消息的概念。 如果设置为 Disabled,则会将批处理最小化,以创建每次包含单个消息的批。
latencyMs 消息在发送前可以缓冲的最大时间间隔(以毫秒为单位)。 如果达到此间隔,则所有缓冲消息都会通过批处理发送,而不管它们的数量或大小。 如果未设置,则默认值为 5。
maxMessages 在发送之前可以缓冲的最大消息数。 如果达到此数量,则所有缓冲消息都会以批的形式发送,而不考虑其大小或缓冲时间。 如果未设置,则默认值为 100000。
maxBytes 在发送之前可以缓冲的最大大小(以字节为单位)。 如果达到此大小,则所有缓冲消息都会以批的形式发送,而不考虑其数量或缓冲时间。 默认值为 1000000 (1 MB)。

例如,如果将延迟时间设置为 1000、maxMessages 设置为 100,并将 maxBytes 设置为 1024,则消息在缓冲区中有 100 条消息或缓冲区中有 1,024 个字节时或自上次发送以来的 1,000 毫秒后发送消息(以先发送为准)。

若要配置批处理设置:

在操作体验数据流终结点设置页中,选择“高级”选项卡,然后使用“启用批处理”字段启用批处理。 使用“批处理延迟”、“最大字节数”“消息计数”字段来指定批处理设置。

仅当终结点用作目标,且其中数据流为创建器时,此设置才会生效。

分区处理策略

分区处理策略可控制在将消息发送到 Kafka 主题时如何将其分配给 Kafka 分区。 Kafka 分区是支持并行处理和容错的 Kafka 主题的逻辑段。 Kafka 主题中的每个消息都有一个分区和一个偏移量,用于标记消息并进行排序。

仅当终结点用作目标,且其中数据流为创建器时,此设置才会生效。

默认情况下,数据流会使用轮循机制算法将消息分配给随机分区。 但是,你可以使用不同的策略根据某些条件(例如 MQTT 主题名称或 MQTT 消息属性)将消息分配到分区。 这有助于实现更好的负载均衡、数据区域或消息排序。

说明
Default 使用轮循机制算法将消息分配给随机分区。 如果未指定策略,这是默认值。
Static 将消息分配给从数据流实例 ID 派生的固定分区号。 这意味着每个数据流实例都会将消息发送到不同的分区。 这有助于实现更好的负载均衡和数据区域。
Topic 使用来自数据流源的 MQTT 主题名称作为分区的键。 这意味着具有相同 MQTT 主题名称的消息将发送到同一分区。 这有助于实现更好的消息排序和数据区域。
Property 使用来自数据流源的 MQTT 消息属性作为分区的键。 指定 partitionKeyProperty 字段中的属性名称。 这意味着具有相同属性值的消息将发送到同一分区。 这有助于根据自定义条件实现更好的消息排序和数据区域。

例如,如果将分区处理策略设置为 Property,并将分区键属性设置为 device-id,则具有相同 device-id 属性的消息将发送到同一分区。

若要配置分区处理策略:

在操作体验数据流终结点设置页中,选择“高级”选项卡,然后使用“分区处理策略”字段来指定分区处理策略。 使用“分区键属性” 字段指定在策略设置为 Property 时用于分区的属性。

Kafka 确认

Kafka 确认 (acks) 用于控制发送到 Kafka 主题的消息的持久性和一致性。 当创建器向 Kafka 主题发送消息时,它可以从 Kafka 代理请求不同级别的确认,以确保消息已成功写入主题并在 Kafka 群集中复制。

仅当终结点用作目标(即数据流为创建器)时,此设置才会生效。

说明
None 数据流不会等待 Kafka 代理的任何确认。 此设置最快但最不持久的选项。
All 数据流会等待消息写入前导分区和所有后续分区。 此设置是最慢但最持久的选项。 此设置也是默认选项
One 数据流会等待消息写入前导分区和至少一个后续分区。
Zero 数据流会等待消息写入前导分区,但不会等待后续分区的任何确认。 这比 One 快,但持久性更低。

例如,如果将 Kafka 确认设置为 All,则数据流将等待消息写入前导分区和所有后续分区,然后再发送下一条消息。

若要配置 Kafka 确认:

在操作体验数据流终结点设置页中,选择“高级”选项卡,然后使用“Kafka 确认”字段来指定 Kafka 确认级别。

仅当终结点用作数据流为创建器的目标时,此设置才会生效。

复制 MQTT 属性

默认情况下,已启用复制 MQTT 属性设置。 这些用户属性包括用于存储发送消息的资产的名称的 subject 等值。

在操作体验数据流终结点设置页中,选择“高级”选项卡,然后使用“复制 MQTT 属性”字段旁边的复选框来启用或禁用复制 MQTT 属性。

以下内容介绍了 MQTT 属性如何转换为 Kafka 用户标头,以及设置启用时的相应情况。

Kafka 终结点是目标

当 Kafka 终结点是数据流目标时,所有 MQTT v5 规范定义的属性都会转换为 Kafka 用户标头。 例如,将“内容类型”转发到 Kafka 的 MQTT v5 消息将转换为 Kafka 用户标头 "Content Type":{specifiedValue}。 类似的规则适用于下表中定义的其他内置 MQTT 属性。

MQTT 属性 已转换的行为
有效负载格式指示器 键:“有效负载格式指示器”
值:“0”(有效负载是字节)或“1”(有效负载是 UTF-8)
响应主题 键:“响应主题”
值:从原始消息复制响应主题。
消息过期间隔 键:“消息过期间隔”
值:消息过期前的秒数的 UTF-8 表示形式。 有关详细信息,请参阅“消息过期间隔属性”。
关联数据: 键:“关联数据”
值:从原始消息复制关联数据。 与许多 UTF-8 编码的 MQTT v5 属性不同,关联数据可以是任意数据。
内容类型: 键:“内容类型”
值:从原始消息复制内容类型。

MQTT v5 用户属性键值对直接转换为 Kafka 用户标头。 如果消息中的用户标头的名称与内置 MQTT 属性(例如,名为“关联数据”的用户标头)相同,则未定义是转发 MQTT v5 规范属性值还是用户属性。

数据流永远不会从 MQTT 代理接收这些属性。 因此,数据流永远不会转发它们:

  • 主题别名
  • 订阅标识符
“消息过期间隔”属性

消息过期间隔”指定消息在被放弃之前可以在 MQTT 代理中保留多长时间。

当数据流收到指定了消息过期间隔的 MQTT 消息时,它会:

  • 记录邮件的接收时间。
  • 在将消息发送到目标之前,会从原始过期间隔时间减去消息已排队的时长。
  • 如果消息未过期(上述操作为 > 0),则会向目标发出该消息,并包含更新后的消息过期时间。
  • 如果消息已过期(上述操作为 <= 0),则目标不会发出该消息。

示例:

  • 数据流收到消息过期间隔 = 3600 秒的 MQTT 消息。 相应的目标暂时断开连接,但能够重新连接。 在此 MQTT 消息发送到目标之前,耗时 1000 秒。 在本例中,目标消息的“消息过期间隔”设置为 2600 (3600 - 1000) 秒。
  • 数据流收到消息过期间隔 = 3600 秒的 MQTT 消息。 相应的目标暂时断开连接,但能够重新连接。 但是,在这种情况下,重新连接需要 4000 秒。 消息已过期,数据流不会将此消息转发到目标。

Kafka 终结点是数据流源

注意

将事件中心终结点用作数据流源时,存在一个已知问题,其中 Kafka 标头在转换为 MQTT 时会损坏。 仅当使用事件中心时,才会发生此情况,而事件中心客户端在幕后使用 AMQP。 例如“foo”=“bar”,将转换“foo”,但值会变为“\xa1\x03bar”。

当 Kafka 终结点是数据流源时,Kafka 用户标头将转换为 MQTT v5 属性。 下表介绍了 Kafka 用户标头如何转换为 MQTT v5 属性。

Kafka 标头 已转换的行为
密钥 键:“密钥”
值:从原始消息复制密钥。
Timestamp 键:“时间戳”
值:Kafka 时间戳的 UTF-8 编码,这是从 Unix 时间戳之后的毫秒数。

Kafka 用户标头键/值对 (前提是它们都用 UTF-8 编码)会直接转换为 MQTT 用户键/值属性。

UTF-8 / 二进制不匹配

MQTT v5 只能支持基于 UTF-8 的属性。 如果数据流收到包含一个或多个非 UTF-8 标头的 Kafka 消息,数据流将:

  • 删除有问题的属性。
  • 按照前面的规则转发邮件的其余部分。

在 Kafka 源标头 => MQTT 目标属性中需要二进制传输的应用程序,必须首先对它们进行 UTF-8 编码,例如通过 Base64 进行编码。

>=64KB 属性不匹配

MQTT v5 属性必须小于 64 KB。 如果数据流收到包含一个或多个 >=64KB 的标头的 Kafka 消息,数据流将:

  • 删除有问题的属性。
  • 按照前面的规则转发邮件的其余部分。
使用事件中心和使用 AMQP 的创建器时的属性转换

如果有客户端转发消息,Kafka 数据流源终结点将执行以下操作之一:

  • 使用 Azure.Messaging.EventHubs 等客户端库将消息发送到事件中心
  • 直接使用 AMQP

需要注意的属性转换的细微差别。

应执行以下操作之一:

  • 避免发送属性
  • 如果必须发送属性,请发送 UTF-8 编码的值。

当事件中心将属性从 AMQP 转换为 Kafka 时,它会在其消息中包含基础 AMQP 编码类型。 有关此行为的详细信息,请参阅“使用不同协议在使用者和生成者之间交换事件”。

在以下代码示例中,当数据流终结点收到值 "foo":"bar" 时,它将接收相应属性作为 <0xA1 0x03 "bar">

using global::Azure.Messaging.EventHubs;
using global::Azure.Messaging.EventHubs.Producer;

var propertyEventBody = new BinaryData("payload");

var propertyEventData = new EventData(propertyEventBody)
{
  Properties =
  {
    {"foo", "bar"},
  }
};

var propertyEventAdded = eventBatch.TryAdd(propertyEventData);
await producerClient.SendAsync(eventBatch);

数据流终结点无法将有效负载属性 <0xA1 0x03 "bar"> 转发到 MQTT 消息,因为该数据不是 UTF-8。 但是,如果指定 UTF-8 字符串,数据流终结点将在发送到 MQTT 之前转换字符串。 如果使用 UTF-8 字符串,MQTT 消息会将 "foo":"bar" 作为用户属性。

仅转换 UTF-8 标头。 例如,假设以下方案将属性设置为 float:

Properties = 
{
  {"float-value", 11.9 },
}

数据流终结点会丢弃包含 "float-value" 字段的数据包。

并非所有事件数据属性(包括 propertyEventData.correlationId)都会转发。 有关详细信息,请参阅“事件用户属性”。

CloudEvents

CloudEvents 是以常见方式描述事件数据的一种方法。 CloudEvents 设置用于以 CloudEvents 格式发送和接收消息。 可以将 CloudEvents 用于事件驱动的体系结构,其中不同的服务需要在相同或不同的云提供商中相互通信。

CloudEventAttributes 选项为 PropagateCreateOrRemap

在操作体验数据流终结点设置页中,选择“高级”选项卡,然后使用“云事件属性”字段来指定 CloudEvents 设置。

以下部分介绍如何传播或创建和重新映射 CloudEvent 属性。

“传播”设置

CloudEvent 属性将传递给包含所需属性的消息。 如果消息不包含所需的属性,则消息按原样传递。 如果存在所需的属性,则会将 ce_ 前缀添加到 CloudEvent 属性名称中。

名称 必须 示例值 输出名称 输出值
specversion 1.0 ce-specversion 按原样传递
type ms.aio.telemetry ce-type 按原样传递
source aio://mycluster/myoven ce-source 按原样传递
id A234-1234-1234 ce-id 按原样传递
subject aio/myoven/telemetry/temperature ce-subject 按原样传递
time 2018-04-05T17:31:00Z ce-time 按原样传递。 没有重新标记时间戳。
datacontenttype application/json ce-datacontenttype 在可选的转换阶段之后更改为输出数据内容类型。
dataschema sr://fabrikam-schemas/123123123234234234234234#1.0.0 ce-dataschema 如果在转换配置中提供了输出数据转换架构,则 dataschema 将更改为输出架构。

CreateOrRemap 设置

CloudEvent 属性将传递给包含所需属性的消息。 如果消息不包含所需的属性,则将生成相应属性。

名称 必须 输出名称 如果缺少,则生成值
specversion ce-specversion 1.0
type ce-type ms.aio-dataflow.telemetry
source ce-source aio://<target-name>
id ce-id 目标客户端中生成的 UUID
subject ce-subject 发送消息的输出主题
time ce-time 在目标客户端中生成为 RFC 3339
datacontenttype ce-datacontenttype 在可选的转换阶段之后更改为输出数据内容类型
dataschema ce-dataschema 架构注册表中定义的架构

后续步骤

要了解有关数据流的详细信息,请参阅创建数据流