从 Azure 事件中心获取数据
本文介绍如何将数据从事件中心获取到 Microsoft Fabric 中的 KQL 数据库。 Azure 事件中心是一个大数据流平台和事件引入服务,每秒可以处理和引导数百万个事件。
要将数据从事件中心流式传输到 Real-Time Intelligence,需要完成两个主要步骤。 第一步在 Azure 门户中执行,可在其中定义事件中心实例上的共享访问策略,并捕获以后通过此策略进行连接所需的详细信息。
第二步发生在 Fabric 中的 Real-Time Intelligence 中,可在其中将 KQL 数据库连接到事件中心并为传入数据配置架构。 此步骤将创建两个连接。 第一个连接称为“云连接”,它将 Microsoft Fabric 连接到事件中心实例。 第二个连接将“云连接”连接到 KQL 数据库。 配置完事件数据和架构后,可以使用 KQL 查询集查询流数据。
要使用 Eventstream 从事件中心获取数据,请参阅向事件流添加 Azure 事件中心源。
先决条件
- Azure 订阅。 创建免费 Azure 帐户
- 事件中心
- 具有已启用 Microsoft Fabric 的容量的工作区
- 具有编辑权限的 KQL 数据库
警告
事件中心不能位于防火墙后面。
在事件中心设置共享访问策略
在创建与事件中心数据的连接之前,需要在事件中心上设置共享访问策略 (SAS),并收集一些信息供稍后在设置连接时使用。 有关授权访问事件中心资源的详细信息,请参阅共享访问签名。
在 Azure 门户中,浏览到要连接的事件中心实例。
在“设置”下,选择“共享访问策略”
选择“+ 添加”以添加新的 SAS 策略,或选择具有“管理”权限的现有策略。
输入策略名称。
选择“管理”,然后选择“创建”。
收集云连接的信息
在 SAS 策略窗格中,记下以下四个字段。 你可能想要复制这些字段并将其粘贴到某个位置(如记事本),以便在后续步骤中使用。
字段参考 | 字段 | 说明 | 示例 |
---|---|---|---|
a | 事件中心实例 | 事件中心实例的名称。 | iotdata |
b | SAS 策略 | 在上一步中创建的 SAS 策略名称 | DocsTest |
c | 主密钥 | 与 SAS 策略关联的密钥 | 在此示例中,以 PGGIISb009 开头... |
d | 连接字符串 - 主键 | 在此字段中,只需复制事件中心命名空间,该命名空间可作为连接字符串的一部分找到。 | eventhubpm15910.servicebus.windows.net |
Source
配置
选择目标表。 如果要将数据引入新表,请选择“+ 新建表”并输入表名称。
注意
表名称最多可包含 1024 个字符,包括空格、字母数字、连字符和下划线。 不支持特殊字符。
选择“创建新连接”,或选择“现有连接”并跳至下一步。
创建新连接
根据下表填写“连接设置”:
设置 描述 示例值 事件中心命名空间 上表中的字段 d。 eventhubpm15910.servicebus.windows.net 事件中心 上表中的字段 a。 事件中心实例的名称。 iotdata 连接 若要在 Fabric 和事件中心之间使用现有的云连接,请选择此连接的名称。 否则,请选择“创建新连接”。 创建新连接 连接名称 新云连接的名称。 此名称是自动生成的,但可以覆盖。 在 Fabric 租户中必须是唯一的。 Connection 身份验证种类 自动填充。 目前仅支持共享访问密钥。 共享访问密钥 共享访问密钥名称 上表中的字段 b。 为共享访问策略提供的名称。 DocsTest 共享访问密钥 上表中的字段 c。 SAS 策略的主键。 选择“保存”。 在 Fabric 和事件中心之间创建了新的云数据连接。
将云连接连接到 KQL 数据库
无论是创建新的云连接,还是使用现有的云连接,都需要定义使用者组。 可以选择设置参数,进一步定义 KQL 数据库和云连接之间连接的各个方面。
根据表填写以下字段:
设置 描述 示例值 使用者组 在事件中心中定义的相关使用者组。 有关详细信息,请参阅使用者组。 添加新的使用者组后,需要从下拉列表中选择此组。 NewConsumer 更多参数 压缩 事件的数据压缩,如来自事件中心。 选项为“无”(默认)或 Gzip 压缩。 无 事件系统属性 有关详细信息,请参阅事件中心系统属性。 如果每个事件消息有多个记录,则系统属性将添加到第一个记录中。 请参阅事件系统属性。 事件检索开始日期 数据连接检索自事件检索开始日期起创建的现有事件中心事件。 它只能根据保留期检索事件中心保留的事件。 时区为 UTC。 如果未指定时间,则默认时间是创建数据连接的时间。
事件系统属性
系统属性在事件排队时存储由事件中心服务设置的属性。 与事件中心建立的数据连接可以基于给定的映射将选定的一组系统属性嵌入到在表中引入的数据。
properties | 数据类型 | 说明 |
---|---|---|
x-opt-enqueued-time | datetime | 将事件排队时的 UTC 时间。 |
x-opt-sequence-number | long | 事件中心分区流中的事件的逻辑序列号。 |
x-opt-offset | string | 事件与事件中心分区流之间的偏移量。 偏移量标识符在事件中心流的分区中独一无二。 |
x-opt-publisher | string | 发布服务器名称(如果消息已发送到发布服务器终结点)。 |
x-opt-partition-key | string | 存储了事件的相应分区的分区键。 |
检查
要完成引入过程,请选择“完成”。
可选:
选择“命令查看器”以查看和复制基于输入生成的自动命令。
通过从下拉列表中选择所需格式来更改自动推断的数据格式。 将以 EventData 对象的形式从事件中心读取数据。 支持的格式为 CSV、JSON、PSV、SCsv、SOHsv、TSV、TXT 和 TSVE。
编辑列。
浏览基于数据类型的高级选项。
如果预览窗口中显示的数据不完整,可能需要更多数据来创建包含所有必要数据字段的表。 使用以下命令从事件中心提取新数据:
- 丢弃显示的数据并提取新数据:丢弃显示的数据并搜索新事件。
- 提取更多数据:除已找到的事件外,还搜索更多事件。
编辑列
注意
- 对于表格格式(CSV、TSV、PSV),无法将列映射两次。 若要映射到现有列,请先删除新列。
- 不能更改已有列类型。 如果尝试映射到其他格式的列,结果可能出现空列。
以下参数决定了你可在表中进行的更改:
- 表类型为“新”或“现有”
- 映射类型为“新”或“现有”
表类型 | 映射类型 | 可用调整 |
---|---|---|
新建表 | 新映射 | 重命名列、更改数据类型、更改数据源、映射转换、添加列、删除列 |
现有表 | 新映射 | 新建列(随后可在其上更改数据类型、进行重命名和更新) |
现有表 | 现有映射 | 无 |
映射转换
某些数据格式映射(Parquet、JSON 和 Avro)支持简单的引入时间转换。 若要应用映射转换,请在“编辑列”窗口中创建或更新列。
可对具有 string 或 datetime 类型且源的数据类型为 int 或 long 的列执行映射转换。 支持的映射转换为:
- DateTimeFromUnixSeconds
- DateTimeFromUnixMilliseconds
- DateTimeFromUnixMicroseconds
- DateTimeFromUnixNanoseconds
事件中心捕获 Avro 文件的架构映射
使用事件中心数据的一种方法是通过 Azure Blob 存储或 Azure Data Lake Storage 中的 Azure 事件中心捕获事件。 然后可以使用事件网格数据连接在写入捕获文件时引入这些文件。
捕获文件的架构与发送到事件中心的原始事件的架构不同。 在设计目标表架构时,应考虑到这种差异。 具体而言,事件负载在捕获文件中表示为字节数组,并且事件网格 Azure 数据资源管理器数据连接不会自动解码此数组。 有关事件中心 Avro 捕获数据的文件架构的详细信息,请参阅探索 Azure 事件中心中捕获的 Avro 文件。
若要正确解码事件负载,请执行以下操作:
- 将捕获事件的
Body
字段映射到目标表中dynamic
类型的列。 - 应用一项更新策略,该策略使用 unicode_codepoints_to_string() 函数将字节数组转换为可读字符串。
基于数据类型的高级选项
表格(CSV、TSV、PSV):
如果要在现有表中引入表格格式,可以选择“高级”>“保留表架构”。 表格数据不一定要包括用于将源数据映射到现有列的列名称。 选中此选项后,映射将按顺序完成,表架构保持不变。 如果未选中此选项,无论数据结构如何,都为传入的数据创建新列。
要将第一行用作列名,请选择“高级”>“首行是列标题”。
JSON:
要确定 JSON 数据的列划分,请选择“高级”>“嵌套级别”,从 1 到 100。
如果选择“高级”>“跳过有错误的 JSON 行”,将以 JSON 格式引入数据。 如果未选中此复选框,则以 multijson 格式引入数据。
总结
如果数据引入成功完成,则“数据准备”窗口中的所有三个步骤都会带有绿色的对勾标记。 可以选择要查询、删除已引入数据的卡或查看引入摘要的仪表板。