你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
Azure 流分析查询中的输入验证
输入验证是用于防范格式错误的事件或意外事件影响主要查询逻辑的一种技术。 查询已升级为显式处理和检查记录,因此它们不会破坏主要逻辑。
为了实现输入验证,我们向查询添加了两个初始步骤。 首先确保提交给核心业务逻辑的架构符合其预期。 然后,会审异常,并可以选择将无效记录路由到辅助输出中。
带有输入验证的查询的结构如下:
WITH preProcessingStage AS (
SELECT
-- Rename incoming fields, used for audit and debugging
field1 AS in_field1,
field2 AS in_field2,
...
-- Try casting fields in their expected type
TRY_CAST(field1 AS bigint) as field1,
TRY_CAST(field2 AS array) as field2,
...
FROM myInput TIMESTAMP BY myTimestamp
),
triagedOK AS (
SELECT -- Only fields in their new expected type
field1,
field2,
...
FROM preProcessingStage
WHERE ( ... ) -- Clauses make sure that the core business logic expectations are satisfied
),
triagedOut AS (
SELECT -- All fields to ease diagnostic
*
FROM preProcessingStage
WHERE NOT (...) -- Same clauses as triagedOK, opposed with NOT
)
-- Core business logic
SELECT
...
INTO myOutput
FROM triagedOK
...
-- Audit output. For human review, correction, and manual re-insertion downstream
SELECT
*
INTO BlobOutput -- To a storage adapter that doesn't require strong typing, here blob/adls
FROM triagedOut
若要查看设置了输入验证的查询综合示例,请参阅以下部分:具有输入验证的查询示例。
本文演示了如何实现此技术。
上下文
Azure 流分析 (ASA) 作业处理来自流的数据。 流是传输的已序列化原始数据序列(CSV、JSON、AVRO...)。若要从流中读取数据,应用程序需要知道所用的特定序列化格式。 在 ASA 中,必须在配置流输入时定义事件序列化格式。
反序列化数据后,需要应用一种架构,以赋予数据的意义。 架构是指流中字段的列表及其各自的数据类型。 使用 ASA 时,无需在输入级别设置传入数据的架构。 ASA 原生支持动态输入架构。 它预期字段(列)的列表及其类型在事件(行)之间会发生更改。 当未显式提供数据类型的情况下,ASA 还会推断数据类型,并根据需要尝试隐式强制转换类型。
动态架构处理是一项强大的功能,在流处理中发挥了关键的作用。 数据流通常包含来自多个源的数据,这些源具有多种事件类型,其中每种类型都有唯一的架构。 若要路由、筛选和处理此类流上的事件,不管这些事件的架构如何,ASA 都必须引入所有这些事件。
但是,动态架构处理提供的功能有一个潜在的缺点。 意外的事件可能会流经主要查询逻辑并破坏该逻辑。 例如,我们可以对 NVARCHAR(MAX)
类型的字段使用 ROUND。 ASA 会将其隐式转换为浮点值以匹配 ROUND
的签名。 在此处,我们预期或希望此字段始终包含数值。 但是,当我们收到字段设置为 "NaN"
的事件时,或者完全缺少该字段时,作业可能会失败。
通过输入验证,我们可将初步步骤添加到查询,以处理这种格式错误的事件。 我们主要使用 WITH 和 TRY_CAST 来实现此方法。
场景:对不可靠事件生成者进行输入验证
我们将生成一个新的 ASA 作业,用于从单个事件中心引入数据。 与大多数情况下一样,我们无需对数据生成方负责。 此处的生成方是指多个硬件供应商销售的 IoT 设备。
与利益干系人面谈后,我们就序列化格式和架构达成了一致。 所有设备将此类消息推送到通用事件中心,即 ASA 作业的输入。
架构协定的定义如下:
字段名称 | 字段类型 | 字段说明 |
---|---|---|
deviceId |
Integer | 唯一的设备标识符 |
readingTimestamp |
datetime | 由中心网关生成的消息时间 |
readingStr |
字符串 | |
readingNum |
Numeric | |
readingArray |
字符串数组 |
这种定义在 JSON 序列化下提供了以下示例消息:
{
"deviceId" : 1,
"readingTimestamp" : "2021-12-10T10:00:00",
"readingStr" : "A String",
"readingNum" : 1.7,
"readingArray" : ["A","B"]
}
我们已经可以看到架构协定与其实现之间的差异。 在 JSON 格式中,日期/时间没有数据类型。 日期/时间将作为字符串传输(参阅前面的 readingTimestamp
)。 ASA 可以轻松解决该问题,但会显示需要验证和显式强制转换类型。 对于在 CSV 中序列化的数据而言更是如此,因为所有值随后将作为字符串传输。
另外还有一种差异。 ASA 使用自身的类型系统,而该系统与传入的类型不匹配。 如果 ASA 对整数 (bigint)、日期/时间、字符串 (nvarchar(max)) 和数组使用内置类型,则它仅通过浮点值支持数值。 对于大多数应用程序而言,这种不匹配并不是问题。 但在某些特殊情况下,它可能会导致精度略有偏差。 在这种情况下,我们会在新字段中将数值转换为字符串。 然后在下游,我们使用支持定点小数的系统来检测并纠正潜在的偏差。
回到我们的查询,在此处我们打算:
- 将
readingStr
传递给 JavaScript UDF - 统计数组中的记录数
- 将
readingNum
舍入到小数点后的第二位 - 将数据插入 SQL 表中
目标 SQL 表具有以下架构:
CREATE TABLE [dbo].[readings](
[Device_Id] int NULL,
[Reading_Timestamp] datetime2(7) NULL,
[Reading_String] nvarchar(200) NULL,
[Reading_Num] decimal(18,2) NULL,
[Array_Count] int NULL
) ON [PRIMARY]
良好的做法是在作业执行过程中将发生的情况映射到每个字段:
字段 | 输入 (JSON) | 继承的类型 (ASA) | 输出 (Azure SQL) | 评论 |
---|---|---|---|---|
deviceId |
数字 | bigint | integer | |
readingTimestamp |
字符串 | nvarchar(MAX) | datetime2 | |
readingStr |
字符串 | nvarchar(MAX) | nvarchar(200) | 由 UDF 使用 |
readingNum |
数字 | FLOAT | decimal(18,2) | 有待舍入 |
readingArray |
array(string) | array of nvarchar(MAX) | integer | 有待计数 |
先决条件
我们将使用“ASA 工具”扩展在 Visual Studio Code 中开发查询。 此教程的前几个步骤将引导你安装所需的组件。
在 VS Code 中,我们将本地运行和本地输入/输出结合使用,以避免产生成本,并加速调试循环。 不需要设置事件中心或 Azure SQL 数据库。
基础查询
让我们从一个不使用输入验证的基本实现开始。 我们将在下一部分添加输入验证。
我们将在 VS Code 中创建一个新的 ASA 项目
在 input
文件夹中创建一个名为 data_readings.json
的新 JSON 文件,并在其中添加以下记录:
[
{
"deviceId" : 1,
"readingTimestamp" : "2021-12-10T10:00:00",
"readingStr" : "A String",
"readingNum" : 1.7145,
"readingArray" : ["A","B"]
},
{
"deviceId" : 2,
"readingTimestamp" : "2021-12-10T10:01:00",
"readingStr" : "Another String",
"readingNum" : 2.378,
"readingArray" : ["C"]
},
{
"deviceId" : 3,
"readingTimestamp" : "2021-12-10T10:01:20",
"readingStr" : "A Third String",
"readingNum" : -4.85436,
"readingArray" : ["D","E","F"]
},
{
"deviceId" : 4,
"readingTimestamp" : "2021-12-10T10:02:10",
"readingStr" : "A Forth String",
"readingNum" : 1.2126,
"readingArray" : ["G","G"]
}
]
然后定义一个名为 readings
的本地输入,并在其中引用上面创建的 JSON 文件。
配置后,该本地输入应如下所示:
{
"InputAlias": "readings",
"Type": "Data Stream",
"Format": "Json",
"FilePath": "data_readings.json",
"ScriptType": "InputMock"
}
通过预览数据,可以看到我们的记录已正确加载。
右键单击 Functions
文件夹并选择 ASA: Add Function
,创建名为 udfLen
的新 JavaScript UDF。 使用的代码是:
// Sample UDF that returns the length of a string for demonstration only: LEN will return the same thing in ASAQL
function main(arg1) {
return arg1.length;
}
在本地运行中不需要定义输出。 除非有多个输出,否则甚至不需要使用 INTO
。 在 .asaql
文件中,可将现有查询替换为:
SELECT
r.deviceId,
r.readingTimestamp,
SUBSTRING(r.readingStr,1,200) AS readingStr,
ROUND(r.readingNum,2) AS readingNum,
COUNT(a.ArrayValue) AS arrayCount
FROM readings AS r TIMESTAMP BY r.readingTimestamp
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
System.Timestamp(), --snapshot window
r.deviceId,
r.readingTimestamp,
r.readingStr,
r.readingNum
让我们快速浏览一下提交的查询:
- 若要统计每个数组中的记录数,首先需要将记录解包。 我们将使用 CROSS APPLY 和 GetArrayElements()(此处提供了更多示例)
- 我们还必须对所有字段运行
GROUP BY
,并将其全部投影到SELECT
中。 显式投影字段是良好的做法,因为SELECT *
会让错误从输入流向输出- 如果我们定义了时间窗口,可能需要使用 TIMESTAMP BY 定义时间戳。 在此处,无需定义时间戳即可使逻辑正常工作。 对于本地运行,如果不使用
TIMESTAMP BY
,所有记录将加载到单个时间戳,即运行开始时间。
- 如果我们定义了时间窗口,可能需要使用 TIMESTAMP BY 定义时间戳。 在此处,无需定义时间戳即可使逻辑正常工作。 对于本地运行,如果不使用
- 使用 UDF 来筛选
readingStr
少于两个字符的读数。 在此处我们应已使用 LEN。 我们只是出于演示目的使用了 UDF
可以启动运行并观察正在处理的数据:
deviceId | readingTimestamp | readingStr | readingNum | arrayCount |
---|---|---|---|---|
1 | 2021-12-10T10:00:00 | 一个字符串。 | 1.71 | 2 |
2 | 2021-12-10T10:01:00 | 另一个字符串 | 2.38 | 1 |
3 | 2021-12-10T10:01:20 | 第三个字符串 | -4.85 | 3 |
1 | 2021-12-10T10:02:10 | 第四个字符串 | 1.21 | 2 |
现在我们知道查询可正常运行,接下来让我们针对更多数据测试查询。 将 data_readings.json
的内容替换为以下记录:
[
{
"deviceId" : 1,
"readingTimestamp" : "2021-12-10T10:00:00",
"readingStr" : "A String",
"readingNum" : 1.7145,
"readingArray" : ["A","B"]
},
{
"deviceId" : 2,
"readingTimestamp" : "2021-12-10T10:01:00",
"readingNum" : 2.378,
"readingArray" : ["C"]
},
{
"deviceId" : 3,
"readingTimestamp" : "2021-12-10T10:01:20",
"readingStr" : "A Third String",
"readingNum" : "NaN",
"readingArray" : ["D","E","F"]
},
{
"deviceId" : 4,
"readingTimestamp" : "2021-12-10T10:02:10",
"readingStr" : "A Forth String",
"readingNum" : 1.2126,
"readingArray" : {}
}
]
在此处我们发现了以下问题:
- 设备 #1 一切正常
- 设备 #2 忘记了包含
readingStr
- 设备 #3 将
NaN
作为数字发送 - 设备 #4 发送了空记录而不是数组
现在运行作业不会获得正常的结果。 我们会收到以下错误消息之一:
设备 2 将提供:
[Error] 12/22/2021 10:05:59 PM : **System Exception** Function 'udflen' resulted in an error: 'TypeError: Unable to get property 'length' of undefined or null reference' Stack: TypeError: Unable to get property 'length' of undefined or null reference at main (Unknown script code:3:5)
[Error] 12/22/2021 10:05:59 PM : at Microsoft.EventProcessing.HostedRuntimes.JavaScript.JavaScriptHostedFunctionsRuntime.
设备 3 将提供:
[Error] 12/22/2021 9:52:32 PM : **System Exception** The 1st argument of function round has invalid type 'nvarchar(max)'. Only 'bigint', 'float' is allowed.
[Error] 12/22/2021 9:52:32 PM : at Microsoft.EventProcessing.SteamR.Sql.Runtime.Arithmetics.Round(CompilerPosition pos, Object value, Object length)
设备 4 将提供:
[Error] 12/22/2021 9:50:41 PM : **System Exception** Cannot cast value of type 'record' to type 'array' in expression 'r . readingArray'. At line '9' and column '30'. TRY_CAST function can be used to handle values with unexpected type.
[Error] 12/22/2021 9:50:41 PM : at Microsoft.EventProcessing.SteamR.Sql.Runtime.Cast.ToArray(CompilerPosition pos, Object value, Boolean isUserCast)
每次都允许格式错误的记录在未经验证的情况下从输入流向主要查询逻辑。 现在我们认识到了输入验证的价值。
实现输入验证
让我们扩展上述查询以验证输入。
输入验证的第一步是定义核心业务逻辑的架构预期。 回顾原始要求,我们的主要逻辑是:
- 将
readingStr
传递给 JavaScript UDF 以度量其长度 - 统计数组中的记录数
- 将
readingNum
舍入到小数点后的第二位 - 将数据插入 SQL 表中
对于每一点,我们可以列出预期:
- UDF 需要一个字符串类型的不能为 null 的参数(此处为 nvarchar(max))
GetArrayElements()
需要一个数组类型的参数,或 null 值Round
需要一个 bigint 或浮点类型的参数,或 null 值- 我们不应依赖 ASA 执行隐式强制转换,而应该自行执行这种强制转换,并在查询中处理类型冲突
一种方法是调整主要逻辑来处理这些异常。 但在本例中,我们相信主要逻辑是完美的。 因此让我们改为验证传入的数据。
首先,使用 WITH 添加一个输入验证层作为查询的第一步。 我们将使用 TRY_CAST 将字段转换为其预期类型,并在转换失败时将其设置为 NULL
:
WITH readingsValidated AS (
SELECT
-- Rename incoming fields, used for audit and debugging
deviceId AS in_deviceId,
readingTimestamp AS in_readingTimestamp,
readingStr AS in_readingStr,
readingNum AS in_readingNum,
readingArray AS in_readingArray,
-- Try casting fields in their expected type
TRY_CAST(deviceId AS bigint) as deviceId,
TRY_CAST(readingTimestamp AS datetime) as readingTimestamp,
TRY_CAST(readingStr AS nvarchar(max)) as readingStr,
TRY_CAST(readingNum AS float) as readingNum,
TRY_CAST(readingArray AS array) as readingArray
FROM readings TIMESTAMP BY readingTimestamp
)
-- For debugging only
SELECT * FROM readingsValidated
对于我们使用的最后一个输入文件(包含错误的文件),此查询将返回以下集:
in_deviceId | in_readingTimestamp | in_readingStr | in_readingNum | in_readingArray | deviceId | readingTimestamp | readingStr | readingNum | readingArray |
---|---|---|---|---|---|---|---|---|---|
1 | 2021-12-10T10:00:00 | 一个字符串。 | 1.7145 | ["A","B"] | 1 | 2021-12-10T10:00:00Z | 一个字符串。 | 1.7145 | ["A","B"] |
2 | 2021-12-10T10:01:00 | Null | 2.378 | ["C"] | 2 | 2021-12-10T10:01:00Z | Null | 2.378 | ["C"] |
3 | 2021-12-10T10:01:20 | 第三个字符串 | NaN | ["D","E","F"] | 3 | 2021-12-10T10:01:20Z | 第三个字符串 | NULL | ["D","E","F"] |
4 | 2021-12-10T10:02:10 | 第四个字符串 | 1.2126 | {} | 4 | 2021-12-10T10:02:10Z | 第四个字符串 | 1.2126 | NULL |
可以看到,上述两个错误正在得到解决。 我们已将 NaN
和 {}
转换为 NULL
。 现在,我们确信这些记录已正确插入到目标 SQL 表中。
现在必须确定如何处理存在缺失值或无效值的记录。 经过一番讨论,我们决定拒绝存在空的/无效 readingArray
或缺失 readingStr
的记录。
因此我们添加了另一个层,用于会审验证输出与主要逻辑之间的记录:
WITH readingsValidated AS (
...
),
readingsToBeProcessed AS (
SELECT
deviceId,
readingTimestamp,
readingStr,
readingNum,
readingArray
FROM readingsValidated
WHERE
readingStr IS NOT NULL
AND readingArray IS NOT NULL
),
readingsToBeRejected AS (
SELECT
*
FROM readingsValidated
WHERE -- Same clauses as readingsToBeProcessed, opposed with NOT
NOT (
readingStr IS NOT NULL
AND readingArray IS NOT NULL
)
)
-- For debugging only
SELECT * INTO Debug1 FROM readingsToBeProcessed
SELECT * INTO Debug2 FROM readingsToBeRejected
良好的做法是为两个输出编写一个 WHERE
子句,并在第二个输出中使用 NOT (...)
。 这样,任何记录都不能从两个输出中排除并丢失。
现在我们获得了两个输出。 Debug1 包含要发送到主要逻辑的记录:
deviceId | readingTimestamp | readingStr | readingNum | readingArray |
---|---|---|---|---|
1 | 2021-12-10T10:00:00Z | 一个字符串。 | 1.7145 | ["A","B"] |
3 | 2021-12-10T10:01:20Z | 第三个字符串 | Null | ["D","E","F"] |
Debug2 包含要拒绝的记录:
in_deviceId | in_readingTimestamp | in_readingStr | in_readingNum | in_readingArray | deviceId | readingTimestamp | readingStr | readingNum | readingArray |
---|---|---|---|---|---|---|---|---|---|
2 | 2021-12-10T10:01:00 | Null | 2.378 | ["C"] | 2 | 2021-12-10T10:01:00Z | NULL | 2.378 | ["C"] |
4 | 2021-12-10T10:02:10 | 第四个字符串 | 1.2126 | {} | 4 | 2021-12-10T10:02:10Z | 第四个字符串 | 1.2126 | NULL |
最后一步是将主要逻辑添加回来。 我们还将添加用于收集拒绝记录的输出。 此处最好是使用不强制执行强类型化的输出适配器,例如存储帐户。
在最后一部分可以找到完整查询。
WITH
readingsValidated AS (...),
readingsToBeProcessed AS (...),
readingsToBeRejected AS (...)
SELECT
r.deviceId,
r.readingTimestamp,
SUBSTRING(r.readingStr,1,200) AS readingStr,
ROUND(r.readingNum,2) AS readingNum,
COUNT(a.ArrayValue) AS arrayCount
INTO SQLOutput
FROM readingsToBeProcessed AS r
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
System.Timestamp(), --snapshot window
r.deviceId,
r.readingTimestamp,
r.readingStr,
r.readingNum
SELECT
*
INTO BlobOutput
FROM readingsToBeRejected
这样,我们将获得以下 SQLOutput 集,并且不会出错:
deviceId | readingTimestamp | readingStr | readingNum | readingArray |
---|---|---|---|---|
1 | 2021-12-10T10:00:00Z | 一个字符串。 | 1.7145 | 2 |
3 | 2021-12-10T10:01:20Z | 第三个字符串 | Null | 3 |
其他两条记录已发送到 BlobOutput 供人工审查和后期处理。 我们的查询现在是安全的。
具有输入验证的查询示例
WITH readingsValidated AS (
SELECT
-- Rename incoming fields, used for audit and debugging
deviceId AS in_deviceId,
readingTimestamp AS in_readingTimestamp,
readingStr AS in_readingStr,
readingNum AS in_readingNum,
readingArray AS in_readingArray,
-- Try casting fields in their expected type
TRY_CAST(deviceId AS bigint) as deviceId,
TRY_CAST(readingTimestamp AS datetime) as readingTimestamp,
TRY_CAST(readingStr AS nvarchar(max)) as readingStr,
TRY_CAST(readingNum AS float) as readingNum,
TRY_CAST(readingArray AS array) as readingArray
FROM readings TIMESTAMP BY readingTimestamp
),
readingsToBeProcessed AS (
SELECT
deviceId,
readingTimestamp,
readingStr,
readingNum,
readingArray
FROM readingsValidated
WHERE
readingStr IS NOT NULL
AND readingArray IS NOT NULL
),
readingsToBeRejected AS (
SELECT
*
FROM readingsValidated
WHERE -- Same clauses as readingsToBeProcessed, opposed with NOT
NOT (
readingStr IS NOT NULL
AND readingArray IS NOT NULL
)
)
-- Core business logic
SELECT
r.deviceId,
r.readingTimestamp,
SUBSTRING(r.readingStr,1,200) AS readingStr,
ROUND(r.readingNum,2) AS readingNum,
COUNT(a.ArrayValue) AS arrayCount
INTO SQLOutput
FROM readingsToBeProcessed AS r
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
System.Timestamp(), --snapshot window
r.deviceId,
r.readingTimestamp,
r.readingStr,
r.readingNum
-- Rejected output. For human review, correction, and manual re-insertion downstream
SELECT
*
INTO BlobOutput -- to a storage adapter that doesn't require strong typing, here blob/adls
FROM readingsToBeRejected
扩展输入验证
GetType 可用于显式检查类型。 它能够很好地处理投影中的 CASE 或设置级别的 WHERE。 GetType
还可用于根据元数据存储库动态检查传入的架构。 可以通过引用数据集加载存储库。
单元测试是确保查询具有复原能力的良好做法。 我们将构建一系列由输入文件及其预期输出组成的测试。 我们的查询必须与它生成的输出相匹配才能通过测试。 在 ASA 中,单元测试是通过 asa-streamanalytics-cicd npm 模块完成的。 应在部署管道中创建并测试具有各种格式错误事件的测试用例。
最后,可以在 VS Code 中进行一些小规模的集成测试。 可以通过在本地运行以生成实时输出将记录插入 SQL 表中。
获取支持
如需获取进一步的帮助,可前往 Azure 流分析的 Microsoft 问答页面。