共用方式為


Azure 串流分析查詢中的輸入驗證

輸入驗證是一種技術,用於保護主要查詢邏輯免於格式錯誤或非預期事件。 查詢會升級為明確處理並檢查記錄,使其無法中斷主要邏輯。

為了實作輸入驗證,我們會向查詢新增兩個初始步驟。 我們會先確定提交至核心商務邏輯的架構符合預期。 然後,我們會將例外狀況分類,並選擇性地將無效記錄路由傳送至次要輸出。

具有輸入驗證的查詢結構如下:

WITH preProcessingStage AS (
	SELECT
		-- Rename incoming fields, used for audit and debugging
		field1 AS in_field1,
		field2 AS in_field2,
		...

		-- Try casting fields in their expected type
		TRY_CAST(field1 AS bigint) as field1,
		TRY_CAST(field2 AS array) as field2,
		...

	FROM myInput TIMESTAMP BY myTimestamp
),

triagedOK AS (
	SELECT -- Only fields in their new expected type
		field1,
		field2,
		...
	FROM preProcessingStage
	WHERE ( ... ) -- Clauses make sure that the core business logic expectations are satisfied
),

triagedOut AS (
	SELECT -- All fields to ease diagnostic
		*
	FROM preProcessingStage
	WHERE NOT (...) -- Same clauses as triagedOK, opposed with NOT
)

-- Core business logic
SELECT
	...
INTO myOutput
FROM triagedOK
...

-- Audit output. For human review, correction, and manual re-insertion downstream
SELECT
	*
INTO BlobOutput -- To a storage adapter that doesn't require strong typing, here blob/adls
FROM triagedOut

若要查看使用輸入驗證的查詢設定完整範例,請參閱下列章節:使用輸入驗證查詢的範例

本文將說明如何實作這項技術。

上下文

Azure 串流分析 (ASA) 作業會處理來自串流的資料。 資料流是已傳送且序列化之未經處理資料的序列 (CSV、JSON、AVRO...)。若要從資料流讀取,應用程式必須知道所使用的特定序列化格式。 在 ASA 中設定串流輸入時,您必須定義事件序列化格式

資料還原序列化後,需要將架構進行套用以給予其含義。 架構是指串流的欄位清單,及其個別資料類型。 使用 ASA 時,傳入資料的架構不需要在輸入層級進行設定。 ASA 會改為原生支援動態輸入架構。 此功能預期欄位清單 (資料行),以及其類型,才能在事件 (資料列) 間進行變更。 ASA 也會在明確提供資料類型時推斷資料類型,並在必要時嘗試隱含轉換類型。

動態架構處理是功能強大的功能,也是串流處理的關鍵。 資料流程通常包含多個來源的資料,具有多個事件類型,而各個類型都有唯一的架構。 若要在此類串流上路由、篩選並處理事件,ASA 必須將事件全部內嵌,無論其架構為何。

Illustration of a pipeline with two fleet of devices sending data with conflicting schemas

但動態架構處理所提供的功能可能會存在潛在缺點。 非預期的事件可以流經主要查詢邏輯,並將其中斷。 例如,我們可以在類型 NVARCHAR(MAX) 欄位上使用 ROUND。 ASA 會將其隱含轉換為浮點數,以符合 ROUND 的簽章。 我們在這裡預期或希望此欄位一律包含數值。 但是當我們收到已將欄位設定 "NaN" 的事件時,或如果欄位完全遺失,則作業可能會失敗。

透過輸入驗證,我們會將初步步驟新增至查詢,以處理此類格式錯誤的事件。 我們主要使用 WITHTRY_CAST 來進行實作。

案例:不可靠事件產生者的輸入驗證

我們將組建新 ASA 作業,以從單一事件中樞內嵌資料。 如同最常見的情況,我們不負責資料產生者。 在這裡,生產者是由多個硬體廠商銷售的 IoT 裝置。

與利害關係人開會,我們會在序列化格式和架構上達成共識。 所有裝置都會將此類訊息推送至一般事件中樞、ASA 作業的輸入。

架構合約定義如下:

欄位名稱 欄位類型 欄位描述
deviceId 整數 唯一裝置識別碼
readingTimestamp Datetime 中央閘道所產生的訊息時間
readingStr String
readingNum 數值
readingArray 字串陣列

接著,這會在 JSON 序列化下提供下列範例訊息:

{
    "deviceId" : 1,
    "readingTimestamp" : "2021-12-10T10:00:00",
    "readingStr" : "A String",
    "readingNum" : 1.7,
    "readingArray" : ["A","B"]
}

我們已經可以看到架構合約和其實作間存在差異。 在 JSON 格式中,日期時間中沒有資料類型。 此格式會以字串形式傳輸 (請參閱上述的 readingTimestamp)。 ASA 可以輕鬆地解決問題,但此功能會顯示驗證和明確轉換類型的需求。 CSV 中序列化資料越多,因為所有值稍後都會以字串形式傳輸。

存在另一種差異。 ASA 會使用自身的類型系統,該系統不符合傳入的類型系統。 如果 ASA 具有整數 (Bigint)、日期時間、字串 (nvarchar (max)) 和陣列的內建類型,則此功能僅可透過浮點數支援數值。 對大部分的應用程式而言,此類不相符並非問題。 但在某些邊緣案例中,此類不相符可能會導致精確度稍微偏移。 在此情況下,我們會將數值轉換為新欄位中的字串。 然後下游會使用支援固定十進位的系統來偵測並更正潛在的偏移。

返回查詢,我們在這裡要執行下列操作:

  • 傳遞 readingStrJavaScript UDF
  • 計算陣列中的記錄數目
  • 四捨五入 readingNum 至第二位小數
  • 將資料插入 SQL 資料表

目的地 SQL 資料表具有下列架構:

CREATE TABLE [dbo].[readings](
    [Device_Id] int NULL,
    [Reading_Timestamp] datetime2(7) NULL,
    [Reading_String] nvarchar(200) NULL,
    [Reading_Num] decimal(18,2) NULL,
    [Array_Count] int NULL
) ON [PRIMARY]

最佳最法是在進行作業時將各個欄位的事件進行對應:

欄位 輸入 (JSON) 繼承類型 (ASA) 輸出 (Azure SQL) 註解
deviceId 數值 bigint 整數
readingTimestamp string nvarchar(MAX) datetime2
readingStr string nvarchar(MAX) nvarchar(200) 由 UDF 使用
readingNum 數值 float decimal(18,2) 要進行四捨五入
readingArray array(string) nvarchar(MAX) 陣列 整數 需進行計算

必要條件

我們將使用 ASA 工具延伸模組,在 Visual Studio Code 中開發查詢功能。 此教學課程的第一個步驟將引導您安裝必要元件。

在 VS Code 中,我們會將本機執行本機輸入/輸出結果搭配使用,此動作不會產生任何產成本,且能加速偵錯迴圈。 我們不需要設定事件中樞或 Azure SQL Database。

基礎查詢

讓我們從基本實作開始進行,無須輸入驗證。 我們會在下一章節用到此元件。

在 VS Code 中,我們將建立新的 ASA 專案

input 資料夾中,我們將建立名為 data_readings.json 的新 JSON 檔案,並將下列記錄新增至該檔案:

[
    {
        "deviceId" : 1,
        "readingTimestamp" : "2021-12-10T10:00:00",
        "readingStr" : "A String",
        "readingNum" : 1.7145,
        "readingArray" : ["A","B"]
    },
    {
        "deviceId" : 2,
        "readingTimestamp" : "2021-12-10T10:01:00",
        "readingStr" : "Another String",
        "readingNum" : 2.378,
        "readingArray" : ["C"]
    },
    {
        "deviceId" : 3,
        "readingTimestamp" : "2021-12-10T10:01:20",
        "readingStr" : "A Third String",
        "readingNum" : -4.85436,
        "readingArray" : ["D","E","F"]
    },
    {
        "deviceId" : 4,
        "readingTimestamp" : "2021-12-10T10:02:10",
        "readingStr" : "A Forth String",
        "readingNum" : 1.2126,
        "readingArray" : ["G","G"]
    }
]

接著我們將定義稱為 readings 本機輸入,並參考我們上面建立的 JSON 檔案。

一旦設定完成應顯示如下:

{
    "InputAlias": "readings",
    "Type": "Data Stream",
    "Format": "Json",
    "FilePath": "data_readings.json",
    "ScriptType": "InputMock"
}

透過預覽資料功能,我們可以觀察記錄是否已正確載入。

我們將以滑鼠右鍵按一下 Functions 資料夾並選取 ASA: Add Function,來建立名為 udfLen 新的 JavaScript UDF。 我們使用的程式碼如下:

// Sample UDF that returns the length of a string for demonstration only: LEN will return the same thing in ASAQL
function main(arg1) {
    return arg1.length;
}

本機執行中,我們無須定義輸出。 除非有多個輸出,否則我們甚至無須使用 INTO。 在 .asaql 檔案中,我們可以下方項目取代現有查詢:

SELECT
	r.deviceId,
	r.readingTimestamp,
	SUBSTRING(r.readingStr,1,200) AS readingStr,
	ROUND(r.readingNum,2) AS readingNum,
	COUNT(a.ArrayValue) AS arrayCount
FROM readings AS r TIMESTAMP BY r.readingTimestamp
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
	System.Timestamp(), --snapshot window
	r.deviceId,
	r.readingTimestamp,
	r.readingStr,
	r.readingNum

讓我們快速瀏覽提交的查詢:

  • 若要計算每個陣列中的記錄數目,我們必須先將其解除封裝。 我們將使用 CROSS APPLYGetArrayElements() (此處有更多範例)
    • 如此一來,我們會在查詢中呈現兩個資料集:原始輸入和陣列值。 為了確保我們不會混用欄位,我們會定義別名 (AS r),並在他處使用此別名
    • 然後,若要實際 COUNT 陣列值,我們需要使用 GROUP BY 進行彙總
    • 為此,我們必須定義時間範圍。 在這裡我們無須針對邏輯選取此項目,快照集視窗就是正確選擇
  • 我們也必須 GROUP BY 所有欄位,並將其全部投影在 SELECT 中。 明確投影欄位是較佳的作法,因為 SELECT * 會讓錯誤從輸入流向輸出
    • 如果我們定義時間範圍,我們可能會想要使用 TIMESTAMP BY 來定義時間戳記。 在這裡我們還無須讓邏輯運作。 針對本機執行,若無 TIMESTAMP BY,所有記錄均會載入單一時間戳記,並執行開始時間。
  • 我們會使用 UDF 來篩選 readingStr 少於兩個字元的讀數。 我們應該在此處使用 LEN。 我們使用 UDF 僅供示範用途

我們可以啟動執行,並觀察正在處理的資料:

deviceId readingTimestamp readingStr readingNum arrayCount
1 2021-12-10T10:00:00 字串 1.71 2
2 2021-12-10T10:01:00 另一個字串 2.38 1
3 2021-12-10T10:01:20 第三個字串 -4.85 3
1 2021-12-10T10:02:10 第四個字串 1.21 2

既然知道查詢正常運作,讓我們針對更多資料進行測試。 我們以下列記錄取代 data_readings.json 的內容:

[
    {
        "deviceId" : 1,
        "readingTimestamp" : "2021-12-10T10:00:00",
        "readingStr" : "A String",
        "readingNum" : 1.7145,
        "readingArray" : ["A","B"]
    },
    {
        "deviceId" : 2,
        "readingTimestamp" : "2021-12-10T10:01:00",
        "readingNum" : 2.378,
        "readingArray" : ["C"]
    },
    {
        "deviceId" : 3,
        "readingTimestamp" : "2021-12-10T10:01:20",
        "readingStr" : "A Third String",
        "readingNum" : "NaN",
        "readingArray" : ["D","E","F"]
    },
    {
        "deviceId" : 4,
        "readingTimestamp" : "2021-12-10T10:02:10",
        "readingStr" : "A Forth String",
        "readingNum" : 1.2126,
        "readingArray" : {}
    }
]

我們在這裡可以看到下列問題:

  • 裝置 #1 已正確執行所有作業
  • 裝置 #2 忘記包含 readingStr
  • 裝置 #3 以數字傳送 NaN
  • 裝置 #4 傳送空記錄,而非陣列

執行作業現在不應順利結束。 我們會收到下列其中一個錯誤訊息:

裝置 2 會提供我們:

[Error] 12/22/2021 10:05:59 PM : **System Exception** Function 'udflen' resulted in an error: 'TypeError: Unable to get property 'length' of undefined or null reference' Stack: TypeError: Unable to get property 'length' of undefined or null reference at main (Unknown script code:3:5)
[Error] 12/22/2021 10:05:59 PM :    at Microsoft.EventProcessing.HostedRuntimes.JavaScript.JavaScriptHostedFunctionsRuntime.

裝置 3 會提供我們:

[Error] 12/22/2021 9:52:32 PM : **System Exception** The 1st argument of function round has invalid type 'nvarchar(max)'. Only 'bigint', 'float' is allowed.
[Error] 12/22/2021 9:52:32 PM :    at Microsoft.EventProcessing.SteamR.Sql.Runtime.Arithmetics.Round(CompilerPosition pos, Object value, Object length)

裝置 4 會提供我們:

[Error] 12/22/2021 9:50:41 PM : **System Exception** Cannot cast value of type 'record' to type 'array' in expression 'r . readingArray'. At line '9' and column '30'. TRY_CAST function can be used to handle values with unexpected type.
[Error] 12/22/2021 9:50:41 PM :    at Microsoft.EventProcessing.SteamR.Sql.Runtime.Cast.ToArray(CompilerPosition pos, Object value, Boolean isUserCast)

每次均需允許格式錯誤的記錄從輸入流向主要查詢邏輯,而無需驗證。 現在,我們了解輸入驗證的值。

實作輸入驗證

讓我們擴充查詢以驗證輸入。

輸入驗證的第一個步驟是定義核心商務邏輯的架構預期結果。 回顧原始需求,我們的主要邏輯是要執行下列行為:

  • 傳遞 readingStrJavaScript UDF 以測量其長度
  • 計算陣列中的記錄數目
  • 四捨五入 readingNum 至第二位小數
  • 將資料插入 SQL 資料表

針對每個點,我們可以列出預期結果:

  • UDF 需要類型字串的引數 (此處為 nvarchar(max)),該引數不可為 Null
  • GetArrayElements() 需要類型陣列的引數或 Null 值
  • Round 需要類型 Bigint 或浮點數的引數,或 Null 值
  • 我們不應該依賴 ASA 的隱含轉換,而是自行執行,並在查詢中處理類型衝突

其中一種方式是調整主要邏輯來處理這些例外狀況。 但在此案例中,我們相信主要邏輯完美。 因此,讓我們改為驗證傳入的資料。

首先,讓我們使用 WITH,將輸入驗證層新增為查詢的第一個步驟。 我們會使用 TRY_CAST 將欄位轉換成其預期類型,並在轉換失敗時將其設定為 NULL

WITH readingsValidated AS (
	SELECT
		-- Rename incoming fields, used for audit and debugging
		deviceId AS in_deviceId,
		readingTimestamp AS in_readingTimestamp,
		readingStr AS in_readingStr,
		readingNum AS in_readingNum,
		readingArray AS in_readingArray,

		-- Try casting fields in their expected type
		TRY_CAST(deviceId AS bigint) as deviceId,
		TRY_CAST(readingTimestamp AS datetime) as readingTimestamp,
		TRY_CAST(readingStr AS nvarchar(max)) as readingStr,
		TRY_CAST(readingNum AS float) as readingNum,
		TRY_CAST(readingArray AS array) as readingArray

	FROM readings TIMESTAMP BY readingTimestamp
)

-- For debugging only
SELECT * FROM readingsValidated

在最後一個我們所使用的輸入檔案中 (該檔案包含錯誤),此查詢會傳回下列集合:

in_deviceId in_readingTimestamp in_readingStr in_readingNum in_readingArray deviceId readingTimestamp readingStr readingNum readingArray
1 2021-12-10T10:00:00 字串 1.7145 ["A","B"] 1 2021-12-10T10:00:00.0000000Z 字串 1.7145 ["A","B"]
2 2021-12-10T10:01:00 NULL 2.378 ["C"] 2 2021-12-10T10:01:00.0000000Z NULL 2.378 ["C"]
3 2021-12-10T10:01:20 第三個字串 NaN ["D","E","F"] 3 2021-12-10T10:01:20.0000000Z 第三個字串 NULL ["D","E","F"]
4 2021-12-10T10:02:10 第四個字串 1.2126 {} 4 2021-12-10T10:02:10.0000000Z 第四個字串 1.2126 NULL

我們已經可以看到兩個錯誤已進行處理。 我們已將 NaN{} 轉換為 NULL。 我們現在確信上述記錄也正確插入至目的地 SQL 資料表中。

我們現在必須決定如何處理遺漏或具有無效值的記錄。 經歷討論後,我們決定拒絕空白/具無效 readingArray 或遺漏 readingStr 的記錄。

因此我們會新增第二層,將驗證記錄與主要邏輯之間分類:

WITH readingsValidated AS (
	...
),

readingsToBeProcessed AS (
	SELECT
		deviceId,
		readingTimestamp,
		readingStr,
		readingNum,
		readingArray
	FROM readingsValidated
	WHERE
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
),

readingsToBeRejected AS (
	SELECT
		*
	FROM readingsValidated
	WHERE -- Same clauses as readingsToBeProcessed, opposed with NOT
	NOT (
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
	)
)

-- For debugging only
SELECT * INTO Debug1 FROM readingsToBeProcessed
SELECT * INTO Debug2 FROM readingsToBeRejected

最好同時針對輸出撰寫單一 WHERE 子句,並在第二個輸出中使用 NOT (...)。 如此一來才不會在輸入中同時排除任何記錄,並造成遺失。

現在我們會收到兩個輸出。 Debug1 具有將傳送至主要邏輯的記錄:

deviceId readingTimestamp readingStr readingNum readingArray
1 2021-12-10T10:00:00.0000000Z 字串 1.7145 ["A","B"]
3 2021-12-10T10:01:20.0000000Z 第三個字串 NULL ["D","E","F"]

Debug2 具有將遭拒絕的記錄:

in_deviceId in_readingTimestamp in_readingStr in_readingNum in_readingArray deviceId readingTimestamp readingStr readingNum readingArray
2 2021-12-10T10:01:00 NULL 2.378 ["C"] 2 2021-12-10T10:01:00.0000000Z NULL 2.378 ["C"]
4 2021-12-10T10:02:10 第四個字串 1.2126 {} 4 2021-12-10T10:02:10.0000000Z 第四個字串 1.2126 NULL

最後一個步驟是將主要邏輯加回。 我們也會新增收集拒絕的輸出。 在這裡,最好使用不會強制執行強類型的輸出配接器,例如儲存體帳戶。

您可以在最後的一個章節中找到完整查詢。

WITH
readingsValidated AS (...),
readingsToBeProcessed AS (...),
readingsToBeRejected AS (...)

SELECT
	r.deviceId,
	r.readingTimestamp,
	SUBSTRING(r.readingStr,1,200) AS readingStr,
	ROUND(r.readingNum,2) AS readingNum,
	COUNT(a.ArrayValue) AS arrayCount
INTO SQLOutput
FROM readingsToBeProcessed AS r
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
	System.Timestamp(), --snapshot window
	r.deviceId,
	r.readingTimestamp,
	r.readingStr,
	r.readingNum

SELECT
	*
INTO BlobOutput
FROM readingsToBeRejected

這會為 SQLOutput 提供下列集合,且不會發生任何錯誤:

deviceId readingTimestamp readingStr readingNum readingArray
1 2021-12-10T10:00:00.0000000Z 字串 1.7145 2
3 2021-12-10T10:01:20.0000000Z 第三個字串 NULL 3

其他兩筆記錄會傳送至 BlobOutput,以供人為檢閱和後置處理。 我們的查詢現在安全無虞。

具有輸入驗證的查詢範例

WITH readingsValidated AS (
	SELECT
		-- Rename incoming fields, used for audit and debugging
		deviceId AS in_deviceId,
		readingTimestamp AS in_readingTimestamp,
		readingStr AS in_readingStr,
		readingNum AS in_readingNum,
		readingArray AS in_readingArray,

		-- Try casting fields in their expected type
		TRY_CAST(deviceId AS bigint) as deviceId,
		TRY_CAST(readingTimestamp AS datetime) as readingTimestamp,
		TRY_CAST(readingStr AS nvarchar(max)) as readingStr,
		TRY_CAST(readingNum AS float) as readingNum,
		TRY_CAST(readingArray AS array) as readingArray

	FROM readings TIMESTAMP BY readingTimestamp
),

readingsToBeProcessed AS (
	SELECT
		deviceId,
		readingTimestamp,
		readingStr,
		readingNum,
		readingArray
	FROM readingsValidated
	WHERE
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
),

readingsToBeRejected AS (
	SELECT
		*
	FROM readingsValidated
	WHERE -- Same clauses as readingsToBeProcessed, opposed with NOT
	NOT (
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
	)
)

-- Core business logic
SELECT
	r.deviceId,
	r.readingTimestamp,
	SUBSTRING(r.readingStr,1,200) AS readingStr,
	ROUND(r.readingNum,2) AS readingNum,
	COUNT(a.ArrayValue) AS arrayCount
INTO SQLOutput
FROM readingsToBeProcessed AS r
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
	System.Timestamp(), --snapshot window
	r.deviceId,
	r.readingTimestamp,
	r.readingStr,
	r.readingNum

-- Rejected output. For human review, correction, and manual re-insertion downstream
SELECT
	*
INTO BlobOutput -- to a storage adapter that doesn't require strong typing, here blob/adls
FROM readingsToBeRejected

擴充輸入驗證

GetType 可用來明確檢查類型。 此功能適用於投影中的 CASE,或在設定層級的 WHEREGetType 也可用於根據中繼資料存放庫動態檢查傳入架構。 您可以透過參考資料集載入存放庫。

單元測試 是確保查詢具有復原性的良好方式。 我們將組建一系列測試,其中包含輸入檔案及其預期的輸出。 我們的查詢必須符合所產生的輸出才能進行傳遞。 在 ASA 中,單元測試是透過 asa-streamanalytics-cicd npm 模組來完成。 應該在部署管線中建立及測試具有各種格式錯誤事件的測試案例。

最後,我們可以在 VS Code 中執行一些輕量型整合測試。 我們可以透過本機執行將記錄插入 SQL 資料表,以進行即時輸出。

取得支援

如需進一步的協助,請嘗試 Azure 串流分析的 Microsoft 問與答頁面

下一步