你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

使用流分析生成 IoT 解决方案

简介

本解决方案演示如何使用 Azure 流分析从数据获得实时见解。 开发人员可以轻松将数据流(例如点击流、日志和设备生成的时间)与历史记录或参考数据结合起来,获取业务信息。 由 Microsoft Azure 托管的 Azure 流分析是可完全管理的实时流计算服务,它提供内置冗余、低延迟及伸缩性,可让用户在几分钟之内就立刻上手。

完成后此解决方案,你将能够:

  • 熟悉 Azure 流分析门户。
  • 配置和部署流式处理作业。
  • 使用流分析查询语言来表达实际问题并解决这些问题。
  • 自信地使用流分析为客户开发流式处理解决方案。
  • 使用监视和日志记录体验来排解问题。

先决条件

若要完成本解决方案,需要满足以下先决条件:

方案简介:“你好,收费站!”

收费站是常见设施。 在世界各地的许多高速公路、桥梁和隧道中都可以看到它们的身影。 每个收费站有多个收费亭。 在人工收费亭中,需要停车来向服务员付费。 在自动收费亭中,位于每个收费亭顶部的传感器会在车辆通过收费亭时扫描挡风玻璃上贴附的 RFID 卡。 轻松地将车辆通过这些收费站的情况可视化为可执行有趣操作的事件流。

位于收费亭的汽车的图片

传入的数据

本解决方案使用两个数据流。 安装在收费站入口和出口处的传感器会生成第一个流。 第二个流是具有车辆登记数据的静态查找数据集。

入口数据流

入口数据流包含车辆进入收费站的相关信息。 将离开数据事件从示例应用中包含的 Web 应用实时流式传输到事件中心。

| TollID | EntryTime | LicensePlate | State | Make | Model | VehicleType | VehicleWeight | Toll | Tag |
| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- |
| 1 |2014-09-10 12:01:00.000 |JNB 7001 |NY |Honda |CRV |1 |0 |7 | |
| 1 |2014-09-10 12:02:00.000 |YXZ 1001 |NY |Toyota |Camry |1 |0 |4 |123456789 |
| 3 |2014-09-10 12:02:00.000 |ABC 1004 |CT |Ford |Taurus |1 |0 |5 |456789123 |
| 2 |2014-09-10 12:03:00.000 |XYZ 1003 |CT |Toyota |Corolla |1 |0 |4 | |
| 1 |2014-09-10 12:03:00.000 |BNJ 1007 |NY |Honda |CRV |1 |0 |5 |789123456 |
| 2 |2014-09-10 12:05:00.000 |CDE 1007 |NJ |Toyota |4x4 |1 |0 |6 |321987654 |

下面是每个列的简短说明:

说明
TollID 唯一标识收费亭的收费亭 ID
EntryTime 车辆进入收费亭的日期和时间(世界协调时)
牌照 车辆的牌照号码
状态 美国的某个州
制造商 汽车制造商
型号 车辆的型号
VehicleType 1 代表客车,2 代表商用车
WeightType 汽车的重量,单位为吨;0 代表客车
收费站 通行费,单位为美元
标记 车辆上可用于自动付费的电子标签;空白代表手动付费

出口数据流

出口数据流包含车辆离开收费站的相关信息。 将离开数据事件从示例应用中包含的 Web 应用实时流式传输到事件中心。

TollId ExitTime LicensePlate
1 2014-09-10T12:03:00Z JNB 7001
1 2014-09-10T12:03:00Z YXZ 1001
3 2014-09-10T12:04:00Z ABC 1004
2 2014-09-10T12:07:00Z XYZ 1003
1 2014-09-10T12:08:00Z BNJ 1007
2 2014-09-10T12:07:00Z CDE 1007

下面是每个列的简短说明:

说明
TollID 唯一标识收费亭的收费亭 ID
ExitTime 车辆离开收费亭的日期和时间(世界协调时)
LicensePlate 车辆的牌照号码

商用车登记数据

本解决方案使用商用车注册数据库的静态快照。 此数据作为 JSON 文件保存到示例随附的 Azure Blob 存储中。

牌照 RegistrationId Expired
SVT 6023 285429838 1
XLZ 3463 362715656 0
BAC 1005 876133137 1
RIV 8632 992711956 0
SNY 7188 592133890 0
ELH 9896 678427724 1

下面是每个列的简短说明:

说明
牌照 车辆的牌照号码
RegistrationId 车辆的登记 ID
Expired 车辆的注册状态:0 代表车辆注册仍有效,1 代表车辆注册已过期

设置 Azure 流分析的环境

若要完成本解决方案,需要一个 Microsoft Azure 订阅。 如果没有 Azure 帐户,则可以申请免费试用版

请务必按照本文末尾的“清理 Azure 帐户”部分中的步骤操作,以便充分利用 Azure 信用额度。

部署示例

只需单击几下鼠标,就能轻松将多个资源一起部署在某个资源组中。 解决方案定义托管在 https://github.com/Azure/azure-stream-analytics/tree/master/Samples/TollApp 上的 GitHub 存储库中。

在 Azure 门户中部署 TollApp 模板

  1. 若要将 TollApp 环境部署到 Azure,请使用此链接部署 TollApp Azure 模板

  2. 根据提示登录到 Azure 门户。

  3. 选择要对其中各种资源计费的订阅。

  4. 指定具有唯一名称的新资源组,例如 MyTollBooth

  5. 选择 Azure 位置。

  6. 在“间隔”中指定若干秒。 此值在示例 Web 应用中用于确定向事件中心发送数据的频率。

  7. 选中表示同意条款和条件的复选框。

  8. 选择“固定到仪表板”,以便稍后可以轻松找到资源。

  9. 选择“购买”以部署示例模板。

  10. 片刻之后,将会显示一条通知来确认部署成功

查看 Azure 流分析 TollApp 资源

  1. 登录到 Azure 门户。

  2. 找到在上一部分命名的资源组。

  3. 检查该资源组中是否列出了以下资源:

    • 一个 Azure Cosmos DB 帐户
    • 一个 Azure 流分析作业
    • 一个 Azure 存储帐户
    • 一个 Azure 事件中心
    • 两个 Web 应用

检查示例 TollApp 作业

  1. 从上一部分创建的资源组着手,选择以名称 tollapp 开头的流分析流式处理作业(名称包含随机字符,以确保唯一性)。

  2. 在该作业的“概述”页上,观察“查询”框以查看查询语法。

    SELECT TollId, System.Timestamp AS WindowEnd, COUNT(*) AS Count
    INTO CosmosDB
    FROM EntryStream TIMESTAMP BY EntryTime
    GROUP BY TUMBLINGWINDOW(minute, 3), TollId
    

    为了解释查询的意图,我们假设需要统计进入某个收费亭的汽车数目。 由于进入高速公路收费亭的车流是连续性的,这些入口事件类似于永不停止的流。 若要量化流,必须定义要不断度量的“时间段”。 让我们进一步将问题细化为“每三分钟有多少车辆进入收费站?”,这通常称为“轮转计数”。

    如你所见,Azure 流分析会使用类似 SQL 的查询语言,并添加几个扩展来指定与时间相关的查询方面。 有关详细信息,请参阅时间管理和查询中所用的开窗构造。

  3. 检查 TollApp 示例作业的输入。 当前查询中仅使用了 EntryStream 输入。

    • EntryStream 输入是一个事件中心连接,用于将代表每次汽车进入高速公路收费亭的数据排队。 示例中包含的 Web 应用将会创建事件,而这些数据将在此事件中心内排队。 请注意,此输入在流式处理查询的 FROM 子句中查询。
    • ExitStream 输入是一个事件中心连接,用于将代表每次汽车离开高速公路收费亭的数据排队。 此流输入在稍后的查询语法变体中使用。
    • Registration 输入是一个 Azure Blob 存储连接,指向按需用于查找的静态 registration.json 文件。 此参考数据输入在稍后的查询语法变体中使用。
  4. 检查 TollApp 示例作业的输出。

    • Azure Cosmos DB 输出是接收输出接收器事件的 Azure Cosmos DB 数据库容器。 请注意,此输出在流式处理查询的 INTO 子句中使用。

启动 TollApp 流式处理作业

遵循以下步骤启动流式处理作业:

  1. 在作业的“概述”页上选择“启动”。

  2. 在“启动作业”窗格中选择“立即”。

  3. 等待片刻,作业运行后,请在流式处理作业的“概述”页上查看“监视”图表。 此图应显示数千个输入事件和数十个输出事件。

查看 Azure Cosmos DB 输出数据

  1. 找到包含 TollApp 资源的资源组。

  2. 选择名称模式为 tollapp<random>-cosmos 的 Azure Cosmos DB 帐户。

  3. 选择“数据资源管理器”标题打开“数据资源管理器”页。

  4. 展开“tollAppDatabase”>“tollAppCollection”>“文档”。

  5. 提供输出后,ID 列表中会显示多个文档。

  6. 选择每个 ID 以查看 JSON 文档。 请注意每个 tollidwindowend time,以及该窗口中的 count of cars

  7. 再过三分钟后,将提供另外一组四个文档,每个 tollid 对应一个文档。

报告每辆汽车的总时间

一辆车通过收费亭所需的平均时间可帮助评估流程的效率和客户体验。

要得出总时间,请将 EntryTime 流与 ExitTime 流相联接。 在等量匹配的 TollId 和 LicencePlate 列中联接两个输入流。 JOIN 运算符要求指定弹性时间,说明联接事件之间可接受的时间差。 使用 DATEDIFF 函数指定事件之间的时间差不能超过 15 分钟。 另外,将 DATEDIFF 函数应用到出口及入口时间,以计算汽车经过收费站的实际时间。 请注意相比 JOIN 条件,在 SELECT 语句中使用 DATEDIFF 的差异。

SELECT EntryStream.TollId, EntryStream.EntryTime, ExitStream.ExitTime, EntryStream.LicensePlate, DATEDIFF (minute, EntryStream.EntryTime, ExitStream.ExitTime) AS DurationInMinutes
INTO CosmosDB
FROM EntryStream TIMESTAMP BY EntryTime
JOIN ExitStream TIMESTAMP BY ExitTime
ON (EntryStream.TollId= ExitStream.TollId AND EntryStream.LicensePlate = ExitStream.LicensePlate)
AND DATEDIFF (minute, EntryStream, ExitStream ) BETWEEN 0 AND 15

更新 TollApp 流式处理作业查询语法:

  1. 在作业的“概述”页上选择“停止”。

  2. 片刻之后,将出现作业已停止的通知。

  3. 在“作业拓扑”标题下,选择“<> 查询”

  4. 粘贴调整后的流式处理 SQL 查询。

  5. 选择“保存”以保存查询。 选择“是”确认保存更改。

  6. 在作业的“概述”页上选择“启动”。

  7. 在“启动作业”窗格中选择“立即”。

查看输出中的总时间

重复前一部分所述的步骤,查看流式处理作业返回的 Azure Cosmos DB 输出数据。 查看最新的 JSON 文档。

例如,以下文档显示了一辆特定牌照的示例汽车、entrytimeexit time,以及 DATEDIFF 计算的 durationinminutes 字段(显示该汽车在收费亭的停留时间为两分钟):

{
    "tollid": 4,
    "entrytime": "2018-04-05T06:51:39.0491173Z",
    "exittime": "2018-04-05T06:53:09.0491173Z",
    "licenseplate": "JVR 9425",
    "durationinminutes": 2,
    "id": "ff52eb25-d580-7566-2879-1f52bba6601e",
    "_rid": "+8E4AI1DZgBjAAAAAAAAAA==",
    "_self": "dbs/+8E4AA==/colls/+8E4AI1DZgA=/docs/+8E4AI1DZgBjAAAAAAAAAA==/",
    "_etag": "\"ad02f6b8-0000-0000-0000-5ac5c8330000\"",
    "_attachments": "attachments/",
    "_ts": 1522911283
}

报告注册已过期的汽车

Azure 流分析可以使用参考数据静态快照来与时态数据流联接。 若要演示此功能,请使用以下示例问题。 Registration 输入是一个静态 blob json 文件,其中列出牌照过期时间。 基于牌照执行联接,可将参考数据与通过收费亭的每辆汽车进行比较。

如果某辆商用车已向收费公司登记,则可以直接通过收费亭,而不用停车接受检查。 使用注册查找表来识别注册已过期的所有商用车。

SELECT EntryStream.EntryTime, EntryStream.LicensePlate, EntryStream.TollId, Registration.RegistrationId
INTO CosmosDB
FROM EntryStream TIMESTAMP BY EntryTime
JOIN Registration
ON EntryStream.LicensePlate = Registration.LicensePlate
WHERE Registration.Expired = '1'
  1. 重复前一部分所述的步骤,更新 TollApp 流式处理作业查询语法。

  2. 重复前一部分所述的步骤,查看流式处理作业返回的 Azure Cosmos DB 输出数据。

示例输出:

    {
        "entrytime": "2018-04-05T08:01:28.0252168Z",
        "licenseplate": "GMT 3221",
        "tollid": 1,
        "registrationid": "763220582",
        "id": "47db0535-9716-4eb2-db58-de7886966cbf",
        "_rid": "y+F8AJ9QWACSAQAAAAAAAA==",
        "_self": "dbs/y+F8AA==/colls/y+F8AJ9QWAA=/docs/y+F8AJ9QWACSAQAAAAAAAA==/",
        "_etag": "\"88007d8d-0000-0000-0000-5ac5d7e20000\"",
        "_attachments": "attachments/",
        "_ts": 1522915298
    }

横向扩展作业

Azure 流分析可弹性缩放,因而能够处理大量数据。 Azure 流分析查询可以使用 PARTITION BY 子句来告诉系统此步骤会横向扩展。PartitionId 是系统加入以与输入(事件中心)的分区 ID 匹配的特殊列。

若要横向扩展对分区的查询,请将查询语法编辑为以下代码:

SELECT TollId, System.Timestamp AS WindowEnd, COUNT(*)AS Count
INTO CosmosDB
FROM EntryStream
TIMESTAMP BY EntryTime
PARTITION BY PartitionId
GROUP BY TUMBLINGWINDOW(minute,3), TollId, PartitionId

将流式处理作业纵向扩展为更多的流单元:

  1. 停止当前作业。

  2. 在“<> 查询”页中更新查询语法,然后保存更改。

  3. 在流式处理作业的“配置”标题下,选择“缩放”。

  4. 将“流单元”滑块从 1 滑到 6。 流单元定义作业能够接收的计算能力大小。 选择“保存” 。

  5. 启动流式处理作业,以演示其他缩放操作。 Azure 流分析可在更多的计算资源之间分配工作,并可以使用 PARTITION BY 子句中指定的列跨资源划分工作,从而提高吞吐量。

监视作业

“监视器” 区域包含正在运行的作业的相关统计信息。 需要完成首次配置,才能使用同一区域中的存储帐户(按本文档其余部分命名收费站)。

Azure 流分析作业监视

还可通过作业仪表板的“设置”区域访问“活动日志” 。

清理 TollApp 资源

  1. 在 Azure 门户中停止流分析作业。

  2. 找到包含与 TollApp 模板相关的八个资源的资源组。

  3. 选择“删除资源组” 。 键入资源组名称以确认删除。

结论

本解决方案介绍了 Azure 流分析服务。 其中演示如何为流分析作业配置输入和输出。 通过使用收费站数据方案,该解决方案解释了动态数据空间会引发的常见问题类型,以及如何在 Azure 流分析中使用类似于 SQL 的简单查询来解决这些问题。 本解决方案介绍了用于处理时态数据的 SQL 扩展构造。 其中说明如何联接不同的数据流、如何使用静态参考数据来扩充数据流,以及如何扩大查询来获得更高的吞吐量。

尽管此解决方案提供了详细介绍,但它不可能面面俱到。 可通过在常用流分析使用模式的查询示例中使用 SAQL 语言,发现更多查询模式。