Azure 串流分析查詢中的輸入驗證
輸入驗證是一種技術,用於保護主要查詢邏輯免於格式錯誤或非預期事件。 查詢會升級為明確處理並檢查記錄,使其無法中斷主要邏輯。
為了實作輸入驗證,我們會向查詢新增兩個初始步驟。 我們會先確定提交至核心商務邏輯的架構符合預期。 然後,我們會將例外狀況分類,並選擇性地將無效記錄路由傳送至次要輸出。
具有輸入驗證的查詢結構如下:
WITH preProcessingStage AS (
SELECT
-- Rename incoming fields, used for audit and debugging
field1 AS in_field1,
field2 AS in_field2,
...
-- Try casting fields in their expected type
TRY_CAST(field1 AS bigint) as field1,
TRY_CAST(field2 AS array) as field2,
...
FROM myInput TIMESTAMP BY myTimestamp
),
triagedOK AS (
SELECT -- Only fields in their new expected type
field1,
field2,
...
FROM preProcessingStage
WHERE ( ... ) -- Clauses make sure that the core business logic expectations are satisfied
),
triagedOut AS (
SELECT -- All fields to ease diagnostic
*
FROM preProcessingStage
WHERE NOT (...) -- Same clauses as triagedOK, opposed with NOT
)
-- Core business logic
SELECT
...
INTO myOutput
FROM triagedOK
...
-- Audit output. For human review, correction, and manual re-insertion downstream
SELECT
*
INTO BlobOutput -- To a storage adapter that doesn't require strong typing, here blob/adls
FROM triagedOut
若要查看使用輸入驗證的查詢設定完整範例,請參閱下列章節:使用輸入驗證查詢的範例。
本文將說明如何實作這項技術。
上下文
Azure 串流分析 (ASA) 作業會處理來自串流的資料。 資料流是已傳送且序列化之未經處理資料的序列 (CSV、JSON、AVRO...)。若要從資料流讀取,應用程式必須知道所使用的特定序列化格式。 在 ASA 中設定串流輸入時,您必須定義事件序列化格式。
資料還原序列化後,需要將架構進行套用以給予其含義。 架構是指串流的欄位清單,及其個別資料類型。 使用 ASA 時,傳入資料的架構不需要在輸入層級進行設定。 ASA 會改為原生支援動態輸入架構。 此功能預期欄位清單 (資料行),以及其類型,才能在事件 (資料列) 間進行變更。 ASA 也會在明確提供資料類型時推斷資料類型,並在必要時嘗試隱含轉換類型。
動態架構處理是功能強大的功能,也是串流處理的關鍵。 資料流程通常包含多個來源的資料,具有多個事件類型,而各個類型都有唯一的架構。 若要在此類串流上路由、篩選並處理事件,ASA 必須將事件全部內嵌,無論其架構為何。
但動態架構處理所提供的功能可能會存在潛在缺點。 非預期的事件可以流經主要查詢邏輯,並將其中斷。 例如,我們可以在類型 NVARCHAR(MAX)
欄位上使用 ROUND。 ASA 會將其隱含轉換為浮點數,以符合 ROUND
的簽章。 我們在這裡預期或希望此欄位一律包含數值。 但是當我們收到已將欄位設定 "NaN"
的事件時,或如果欄位完全遺失,則作業可能會失敗。
透過輸入驗證,我們會將初步步驟新增至查詢,以處理此類格式錯誤的事件。 我們主要使用 WITH 和 TRY_CAST 來進行實作。
案例:不可靠事件產生者的輸入驗證
我們將組建新 ASA 作業,以從單一事件中樞內嵌資料。 如同最常見的情況,我們不負責資料產生者。 在這裡,生產者是由多個硬體廠商銷售的 IoT 裝置。
與利害關係人開會,我們會在序列化格式和架構上達成共識。 所有裝置都會將此類訊息推送至一般事件中樞、ASA 作業的輸入。
架構合約定義如下:
欄位名稱 | 欄位類型 | 欄位描述 |
---|---|---|
deviceId |
整數 | 唯一裝置識別碼 |
readingTimestamp |
Datetime | 中央閘道所產生的訊息時間 |
readingStr |
String | |
readingNum |
數值 | |
readingArray |
字串陣列 |
接著,這會在 JSON 序列化下提供下列範例訊息:
{
"deviceId" : 1,
"readingTimestamp" : "2021-12-10T10:00:00",
"readingStr" : "A String",
"readingNum" : 1.7,
"readingArray" : ["A","B"]
}
我們已經可以看到架構合約和其實作間存在差異。 在 JSON 格式中,日期時間中沒有資料類型。 此格式會以字串形式傳輸 (請參閱上述的 readingTimestamp
)。 ASA 可以輕鬆地解決問題,但此功能會顯示驗證和明確轉換類型的需求。 CSV 中序列化資料越多,因為所有值稍後都會以字串形式傳輸。
存在另一種差異。 ASA 會使用自身的類型系統,該系統不符合傳入的類型系統。 如果 ASA 具有整數 (Bigint)、日期時間、字串 (nvarchar (max)) 和陣列的內建類型,則此功能僅可透過浮點數支援數值。 對大部分的應用程式而言,此類不相符並非問題。 但在某些邊緣案例中,此類不相符可能會導致精確度稍微偏移。 在此情況下,我們會將數值轉換為新欄位中的字串。 然後下游會使用支援固定十進位的系統來偵測並更正潛在的偏移。
返回查詢,我們在這裡要執行下列操作:
- 傳遞
readingStr
至 JavaScript UDF - 計算陣列中的記錄數目
- 四捨五入
readingNum
至第二位小數 - 將資料插入 SQL 資料表
目的地 SQL 資料表具有下列架構:
CREATE TABLE [dbo].[readings](
[Device_Id] int NULL,
[Reading_Timestamp] datetime2(7) NULL,
[Reading_String] nvarchar(200) NULL,
[Reading_Num] decimal(18,2) NULL,
[Array_Count] int NULL
) ON [PRIMARY]
最佳最法是在進行作業時將各個欄位的事件進行對應:
欄位 | 輸入 (JSON) | 繼承類型 (ASA) | 輸出 (Azure SQL) | 註解 |
---|---|---|---|---|
deviceId |
數值 | bigint | 整數 | |
readingTimestamp |
string | nvarchar(MAX) | datetime2 | |
readingStr |
string | nvarchar(MAX) | nvarchar(200) | 由 UDF 使用 |
readingNum |
數值 | float | decimal(18,2) | 要進行四捨五入 |
readingArray |
array(string) | nvarchar(MAX) 陣列 | 整數 | 需進行計算 |
必要條件
我們將使用 ASA 工具延伸模組,在 Visual Studio Code 中開發查詢功能。 此教學課程的第一個步驟將引導您安裝必要元件。
在 VS Code 中,我們會將本機執行與本機輸入/輸出結果搭配使用,此動作不會產生任何產成本,且能加速偵錯迴圈。 我們不需要設定事件中樞或 Azure SQL Database。
基礎查詢
讓我們從基本實作開始進行,無須輸入驗證。 我們會在下一章節用到此元件。
在 VS Code 中,我們將建立新的 ASA 專案
在 input
資料夾中,我們將建立名為 data_readings.json
的新 JSON 檔案,並將下列記錄新增至該檔案:
[
{
"deviceId" : 1,
"readingTimestamp" : "2021-12-10T10:00:00",
"readingStr" : "A String",
"readingNum" : 1.7145,
"readingArray" : ["A","B"]
},
{
"deviceId" : 2,
"readingTimestamp" : "2021-12-10T10:01:00",
"readingStr" : "Another String",
"readingNum" : 2.378,
"readingArray" : ["C"]
},
{
"deviceId" : 3,
"readingTimestamp" : "2021-12-10T10:01:20",
"readingStr" : "A Third String",
"readingNum" : -4.85436,
"readingArray" : ["D","E","F"]
},
{
"deviceId" : 4,
"readingTimestamp" : "2021-12-10T10:02:10",
"readingStr" : "A Forth String",
"readingNum" : 1.2126,
"readingArray" : ["G","G"]
}
]
接著我們將定義稱為 readings
本機輸入,並參考我們上面建立的 JSON 檔案。
一旦設定完成應顯示如下:
{
"InputAlias": "readings",
"Type": "Data Stream",
"Format": "Json",
"FilePath": "data_readings.json",
"ScriptType": "InputMock"
}
透過預覽資料功能,我們可以觀察記錄是否已正確載入。
我們將以滑鼠右鍵按一下 Functions
資料夾並選取 ASA: Add Function
,來建立名為 udfLen
新的 JavaScript UDF。 我們使用的程式碼如下:
// Sample UDF that returns the length of a string for demonstration only: LEN will return the same thing in ASAQL
function main(arg1) {
return arg1.length;
}
在本機執行中,我們無須定義輸出。 除非有多個輸出,否則我們甚至無須使用 INTO
。 在 .asaql
檔案中,我們可以下方項目取代現有查詢:
SELECT
r.deviceId,
r.readingTimestamp,
SUBSTRING(r.readingStr,1,200) AS readingStr,
ROUND(r.readingNum,2) AS readingNum,
COUNT(a.ArrayValue) AS arrayCount
FROM readings AS r TIMESTAMP BY r.readingTimestamp
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
System.Timestamp(), --snapshot window
r.deviceId,
r.readingTimestamp,
r.readingStr,
r.readingNum
讓我們快速瀏覽提交的查詢:
- 若要計算每個陣列中的記錄數目,我們必須先將其解除封裝。 我們將使用 CROSS APPLY 和 GetArrayElements() (此處有更多範例)
- 我們也必須
GROUP BY
所有欄位,並將其全部投影在SELECT
中。 明確投影欄位是較佳的作法,因為SELECT *
會讓錯誤從輸入流向輸出- 如果我們定義時間範圍,我們可能會想要使用 TIMESTAMP BY 來定義時間戳記。 在這裡我們還無須讓邏輯運作。 針對本機執行,若無
TIMESTAMP BY
,所有記錄均會載入單一時間戳記,並執行開始時間。
- 如果我們定義時間範圍,我們可能會想要使用 TIMESTAMP BY 來定義時間戳記。 在這裡我們還無須讓邏輯運作。 針對本機執行,若無
- 我們會使用 UDF 來篩選
readingStr
少於兩個字元的讀數。 我們應該在此處使用 LEN。 我們使用 UDF 僅供示範用途
我們可以啟動執行,並觀察正在處理的資料:
deviceId | readingTimestamp | readingStr | readingNum | arrayCount |
---|---|---|---|---|
1 | 2021-12-10T10:00:00 | 字串 | 1.71 | 2 |
2 | 2021-12-10T10:01:00 | 另一個字串 | 2.38 | 1 |
3 | 2021-12-10T10:01:20 | 第三個字串 | -4.85 | 3 |
1 | 2021-12-10T10:02:10 | 第四個字串 | 1.21 | 2 |
既然知道查詢正常運作,讓我們針對更多資料進行測試。 我們以下列記錄取代 data_readings.json
的內容:
[
{
"deviceId" : 1,
"readingTimestamp" : "2021-12-10T10:00:00",
"readingStr" : "A String",
"readingNum" : 1.7145,
"readingArray" : ["A","B"]
},
{
"deviceId" : 2,
"readingTimestamp" : "2021-12-10T10:01:00",
"readingNum" : 2.378,
"readingArray" : ["C"]
},
{
"deviceId" : 3,
"readingTimestamp" : "2021-12-10T10:01:20",
"readingStr" : "A Third String",
"readingNum" : "NaN",
"readingArray" : ["D","E","F"]
},
{
"deviceId" : 4,
"readingTimestamp" : "2021-12-10T10:02:10",
"readingStr" : "A Forth String",
"readingNum" : 1.2126,
"readingArray" : {}
}
]
我們在這裡可以看到下列問題:
- 裝置 #1 已正確執行所有作業
- 裝置 #2 忘記包含
readingStr
- 裝置 #3 以數字傳送
NaN
- 裝置 #4 傳送空記錄,而非陣列
執行作業現在不應順利結束。 我們會收到下列其中一個錯誤訊息:
裝置 2 會提供我們:
[Error] 12/22/2021 10:05:59 PM : **System Exception** Function 'udflen' resulted in an error: 'TypeError: Unable to get property 'length' of undefined or null reference' Stack: TypeError: Unable to get property 'length' of undefined or null reference at main (Unknown script code:3:5)
[Error] 12/22/2021 10:05:59 PM : at Microsoft.EventProcessing.HostedRuntimes.JavaScript.JavaScriptHostedFunctionsRuntime.
裝置 3 會提供我們:
[Error] 12/22/2021 9:52:32 PM : **System Exception** The 1st argument of function round has invalid type 'nvarchar(max)'. Only 'bigint', 'float' is allowed.
[Error] 12/22/2021 9:52:32 PM : at Microsoft.EventProcessing.SteamR.Sql.Runtime.Arithmetics.Round(CompilerPosition pos, Object value, Object length)
裝置 4 會提供我們:
[Error] 12/22/2021 9:50:41 PM : **System Exception** Cannot cast value of type 'record' to type 'array' in expression 'r . readingArray'. At line '9' and column '30'. TRY_CAST function can be used to handle values with unexpected type.
[Error] 12/22/2021 9:50:41 PM : at Microsoft.EventProcessing.SteamR.Sql.Runtime.Cast.ToArray(CompilerPosition pos, Object value, Boolean isUserCast)
每次均需允許格式錯誤的記錄從輸入流向主要查詢邏輯,而無需驗證。 現在,我們了解輸入驗證的值。
實作輸入驗證
讓我們擴充查詢以驗證輸入。
輸入驗證的第一個步驟是定義核心商務邏輯的架構預期結果。 回顧原始需求,我們的主要邏輯是要執行下列行為:
- 傳遞
readingStr
至 JavaScript UDF 以測量其長度 - 計算陣列中的記錄數目
- 四捨五入
readingNum
至第二位小數 - 將資料插入 SQL 資料表
針對每個點,我們可以列出預期結果:
- UDF 需要類型字串的引數 (此處為 nvarchar(max)),該引數不可為 Null
GetArrayElements()
需要類型陣列的引數或 Null 值Round
需要類型 Bigint 或浮點數的引數,或 Null 值- 我們不應該依賴 ASA 的隱含轉換,而是自行執行,並在查詢中處理類型衝突
其中一種方式是調整主要邏輯來處理這些例外狀況。 但在此案例中,我們相信主要邏輯完美。 因此,讓我們改為驗證傳入的資料。
首先,讓我們使用 WITH,將輸入驗證層新增為查詢的第一個步驟。 我們會使用 TRY_CAST 將欄位轉換成其預期類型,並在轉換失敗時將其設定為 NULL
:
WITH readingsValidated AS (
SELECT
-- Rename incoming fields, used for audit and debugging
deviceId AS in_deviceId,
readingTimestamp AS in_readingTimestamp,
readingStr AS in_readingStr,
readingNum AS in_readingNum,
readingArray AS in_readingArray,
-- Try casting fields in their expected type
TRY_CAST(deviceId AS bigint) as deviceId,
TRY_CAST(readingTimestamp AS datetime) as readingTimestamp,
TRY_CAST(readingStr AS nvarchar(max)) as readingStr,
TRY_CAST(readingNum AS float) as readingNum,
TRY_CAST(readingArray AS array) as readingArray
FROM readings TIMESTAMP BY readingTimestamp
)
-- For debugging only
SELECT * FROM readingsValidated
在最後一個我們所使用的輸入檔案中 (該檔案包含錯誤),此查詢會傳回下列集合:
in_deviceId | in_readingTimestamp | in_readingStr | in_readingNum | in_readingArray | deviceId | readingTimestamp | readingStr | readingNum | readingArray |
---|---|---|---|---|---|---|---|---|---|
1 | 2021-12-10T10:00:00 | 字串 | 1.7145 | ["A","B"] | 1 | 2021-12-10T10:00:00.0000000Z | 字串 | 1.7145 | ["A","B"] |
2 | 2021-12-10T10:01:00 | NULL | 2.378 | ["C"] | 2 | 2021-12-10T10:01:00.0000000Z | NULL | 2.378 | ["C"] |
3 | 2021-12-10T10:01:20 | 第三個字串 | NaN | ["D","E","F"] | 3 | 2021-12-10T10:01:20.0000000Z | 第三個字串 | NULL | ["D","E","F"] |
4 | 2021-12-10T10:02:10 | 第四個字串 | 1.2126 | {} | 4 | 2021-12-10T10:02:10.0000000Z | 第四個字串 | 1.2126 | NULL |
我們已經可以看到兩個錯誤已進行處理。 我們已將 NaN
和 {}
轉換為 NULL
。 我們現在確信上述記錄也正確插入至目的地 SQL 資料表中。
我們現在必須決定如何處理遺漏或具有無效值的記錄。 經歷討論後,我們決定拒絕空白/具無效 readingArray
或遺漏 readingStr
的記錄。
因此我們會新增第二層,將驗證記錄與主要邏輯之間分類:
WITH readingsValidated AS (
...
),
readingsToBeProcessed AS (
SELECT
deviceId,
readingTimestamp,
readingStr,
readingNum,
readingArray
FROM readingsValidated
WHERE
readingStr IS NOT NULL
AND readingArray IS NOT NULL
),
readingsToBeRejected AS (
SELECT
*
FROM readingsValidated
WHERE -- Same clauses as readingsToBeProcessed, opposed with NOT
NOT (
readingStr IS NOT NULL
AND readingArray IS NOT NULL
)
)
-- For debugging only
SELECT * INTO Debug1 FROM readingsToBeProcessed
SELECT * INTO Debug2 FROM readingsToBeRejected
最好同時針對輸出撰寫單一 WHERE
子句,並在第二個輸出中使用 NOT (...)
。 如此一來才不會在輸入中同時排除任何記錄,並造成遺失。
現在我們會收到兩個輸出。 Debug1 具有將傳送至主要邏輯的記錄:
deviceId | readingTimestamp | readingStr | readingNum | readingArray |
---|---|---|---|---|
1 | 2021-12-10T10:00:00.0000000Z | 字串 | 1.7145 | ["A","B"] |
3 | 2021-12-10T10:01:20.0000000Z | 第三個字串 | NULL | ["D","E","F"] |
Debug2 具有將遭拒絕的記錄:
in_deviceId | in_readingTimestamp | in_readingStr | in_readingNum | in_readingArray | deviceId | readingTimestamp | readingStr | readingNum | readingArray |
---|---|---|---|---|---|---|---|---|---|
2 | 2021-12-10T10:01:00 | NULL | 2.378 | ["C"] | 2 | 2021-12-10T10:01:00.0000000Z | NULL | 2.378 | ["C"] |
4 | 2021-12-10T10:02:10 | 第四個字串 | 1.2126 | {} | 4 | 2021-12-10T10:02:10.0000000Z | 第四個字串 | 1.2126 | NULL |
最後一個步驟是將主要邏輯加回。 我們也會新增收集拒絕的輸出。 在這裡,最好使用不會強制執行強類型的輸出配接器,例如儲存體帳戶。
您可以在最後的一個章節中找到完整查詢。
WITH
readingsValidated AS (...),
readingsToBeProcessed AS (...),
readingsToBeRejected AS (...)
SELECT
r.deviceId,
r.readingTimestamp,
SUBSTRING(r.readingStr,1,200) AS readingStr,
ROUND(r.readingNum,2) AS readingNum,
COUNT(a.ArrayValue) AS arrayCount
INTO SQLOutput
FROM readingsToBeProcessed AS r
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
System.Timestamp(), --snapshot window
r.deviceId,
r.readingTimestamp,
r.readingStr,
r.readingNum
SELECT
*
INTO BlobOutput
FROM readingsToBeRejected
這會為 SQLOutput 提供下列集合,且不會發生任何錯誤:
deviceId | readingTimestamp | readingStr | readingNum | readingArray |
---|---|---|---|---|
1 | 2021-12-10T10:00:00.0000000Z | 字串 | 1.7145 | 2 |
3 | 2021-12-10T10:01:20.0000000Z | 第三個字串 | NULL | 3 |
其他兩筆記錄會傳送至 BlobOutput,以供人為檢閱和後置處理。 我們的查詢現在安全無虞。
具有輸入驗證的查詢範例
WITH readingsValidated AS (
SELECT
-- Rename incoming fields, used for audit and debugging
deviceId AS in_deviceId,
readingTimestamp AS in_readingTimestamp,
readingStr AS in_readingStr,
readingNum AS in_readingNum,
readingArray AS in_readingArray,
-- Try casting fields in their expected type
TRY_CAST(deviceId AS bigint) as deviceId,
TRY_CAST(readingTimestamp AS datetime) as readingTimestamp,
TRY_CAST(readingStr AS nvarchar(max)) as readingStr,
TRY_CAST(readingNum AS float) as readingNum,
TRY_CAST(readingArray AS array) as readingArray
FROM readings TIMESTAMP BY readingTimestamp
),
readingsToBeProcessed AS (
SELECT
deviceId,
readingTimestamp,
readingStr,
readingNum,
readingArray
FROM readingsValidated
WHERE
readingStr IS NOT NULL
AND readingArray IS NOT NULL
),
readingsToBeRejected AS (
SELECT
*
FROM readingsValidated
WHERE -- Same clauses as readingsToBeProcessed, opposed with NOT
NOT (
readingStr IS NOT NULL
AND readingArray IS NOT NULL
)
)
-- Core business logic
SELECT
r.deviceId,
r.readingTimestamp,
SUBSTRING(r.readingStr,1,200) AS readingStr,
ROUND(r.readingNum,2) AS readingNum,
COUNT(a.ArrayValue) AS arrayCount
INTO SQLOutput
FROM readingsToBeProcessed AS r
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
System.Timestamp(), --snapshot window
r.deviceId,
r.readingTimestamp,
r.readingStr,
r.readingNum
-- Rejected output. For human review, correction, and manual re-insertion downstream
SELECT
*
INTO BlobOutput -- to a storage adapter that doesn't require strong typing, here blob/adls
FROM readingsToBeRejected
擴充輸入驗證
GetType 可用來明確檢查類型。 此功能適用於投影中的 CASE,或在設定層級的 WHERE。 GetType
也可用於根據中繼資料存放庫動態檢查傳入架構。 您可以透過參考資料集載入存放庫。
單元測試 是確保查詢具有復原性的良好方式。 我們將組建一系列測試,其中包含輸入檔案及其預期的輸出。 我們的查詢必須符合所產生的輸出才能進行傳遞。 在 ASA 中,單元測試是透過 asa-streamanalytics-cicd npm 模組來完成。 應該在部署管線中建立及測試具有各種格式錯誤事件的測試案例。
最後,我們可以在 VS Code 中執行一些輕量型整合測試。 我們可以透過本機執行將記錄插入 SQL 資料表,以進行即時輸出。
取得支援
如需進一步的協助,請嘗試 Azure 串流分析的 Microsoft 問與答頁面。