Walidacja danych wejściowych w zapytaniach usługi Azure Stream Analytics
Walidacja danych wejściowych to technika, która służy do ochrony głównej logiki zapytań przed źle sformułowanym lub nieoczekiwanymi zdarzeniami. Zapytanie jest uaktualniane do jawnego przetwarzania i sprawdzania rekordów, aby nie mogły przerwać głównej logiki.
Aby zaimplementować walidację danych wejściowych, do zapytania dodamy dwa początkowe kroki. Najpierw upewniamy się, że schemat przesłany do podstawowej logiki biznesowej jest zgodny z oczekiwaniami. Następnie klasyfikujemy wyjątki i opcjonalnie kierujemy nieprawidłowe rekordy do pomocniczych danych wyjściowych.
Zapytanie z weryfikacją danych wejściowych będzie ustrukturyzowane w następujący sposób:
WITH preProcessingStage AS (
SELECT
-- Rename incoming fields, used for audit and debugging
field1 AS in_field1,
field2 AS in_field2,
...
-- Try casting fields in their expected type
TRY_CAST(field1 AS bigint) as field1,
TRY_CAST(field2 AS array) as field2,
...
FROM myInput TIMESTAMP BY myTimestamp
),
triagedOK AS (
SELECT -- Only fields in their new expected type
field1,
field2,
...
FROM preProcessingStage
WHERE ( ... ) -- Clauses make sure that the core business logic expectations are satisfied
),
triagedOut AS (
SELECT -- All fields to ease diagnostic
*
FROM preProcessingStage
WHERE NOT (...) -- Same clauses as triagedOK, opposed with NOT
)
-- Core business logic
SELECT
...
INTO myOutput
FROM triagedOK
...
-- Audit output. For human review, correction, and manual re-insertion downstream
SELECT
*
INTO BlobOutput -- To a storage adapter that doesn't require strong typing, here blob/adls
FROM triagedOut
Aby zapoznać się z kompleksowym przykładem zapytania skonfigurowanego przy użyciu walidacji danych wejściowych, zobacz sekcję: Przykład zapytania z walidacją danych wejściowych.
W tym artykule pokazano, jak zaimplementować tę technikę.
Kontekst
Zadania usługi Azure Stream Analytics (ASA) przetwarzają dane pochodzące ze strumieni. Strumienie to sekwencje nieprzetworzonych danych, które są przesyłane serializowane (CSV, JSON, AVRO...). Aby odczytać ze strumienia, aplikacja musi znać używany konkretny format serializacji. W usłudze ASA należy zdefiniować format serializacji zdarzeń podczas konfigurowania danych wejściowych przesyłania strumieniowego.
Gdy dane zostaną zdeserializowane, należy zastosować schemat, aby nadać mu znaczenie. Według schematu oznaczamy listę pól w strumieniu i ich odpowiednie typy danych. W przypadku usługi ASA schemat danych przychodzących nie musi być ustawiany na poziomie danych wejściowych. Zamiast tego usługa ASA obsługuje dynamiczne schematy wejściowe natywnie. Oczekuje się, że lista pól (kolumn) i ich typów zmieni się między zdarzeniami (wierszami). Usługa ASA będzie również wywnioskować typy danych, gdy żadne nie zostanie podane jawnie, i spróbuje niejawnie rzutować typy w razie potrzeby.
Obsługa schematu dynamicznego to zaawansowana funkcja, klucz do przetwarzania strumieniowego. Strumienie danych często zawierają dane z wielu źródeł z wieloma typami zdarzeń, z których każdy ma unikatowy schemat. Aby kierować, filtrować i przetwarzać zdarzenia w takich strumieniach, usługa ASA musi pozyskiwać je wszystkie niezależnie od ich schematu.
Jednak możliwości oferowane przez obsługę schematu dynamicznego mają potencjalną wadę. Nieoczekiwane zdarzenia mogą przepływać przez główną logikę zapytania i przerywać je. Na przykład możemy użyć funkcji ROUND w polu typu NVARCHAR(MAX)
. Usługa ASA będzie niejawnie rzutować ją, aby dopasować ją do podpisu ROUND
. W tym miejscu oczekujemy lub mamy nadzieję, że to pole zawsze będzie zawierać wartości liczbowe. Jeśli jednak otrzymamy zdarzenie z polem ustawionym na "NaN"
, lub jeśli pole jest całkowicie brakujące, zadanie może zakończyć się niepowodzeniem.
W przypadku walidacji danych wejściowych dodajemy wstępne kroki do zapytania w celu obsługi takich źle sformułowanych zdarzeń. Użyjemy przede wszystkim funkcji WITH i TRY_CAST , aby ją zaimplementować.
Scenariusz: walidacja danych wejściowych dla zawodnych producentów zdarzeń
Utworzymy nowe zadanie usługi ASA, które będzie pozyskiwać dane z jednego centrum zdarzeń. Tak jak najczęściej, nie jesteśmy odpowiedzialni za producentów danych. W tym miejscu producenci są urządzeniami IoT sprzedawanymi przez wielu dostawców sprzętu.
Na spotkaniu z uczestnikami projektu zgadzamy się na format serializacji i schemat. Wszystkie urządzenia będą wypychać takie komunikaty do wspólnego centrum zdarzeń, dane wejściowe zadania asa.
Kontrakt schematu jest definiowany w następujący sposób:
Nazwa pola | Typ pola | Opis pola |
---|---|---|
deviceId |
Integer | Unikatowy identyfikator urządzenia |
readingTimestamp |
Datetime | Czas komunikatu generowany przez bramę centralną |
readingStr |
String | |
readingNum |
Liczbowe | |
readingArray |
Tablica ciągów |
Co z kolei daje nam następujący przykładowy komunikat w ramach serializacji JSON:
{
"deviceId" : 1,
"readingTimestamp" : "2021-12-10T10:00:00",
"readingStr" : "A String",
"readingNum" : 1.7,
"readingArray" : ["A","B"]
}
Możemy już zobaczyć rozbieżność między kontraktem schematu a jego implementacją. W formacie JSON nie ma typu danych daty/godziny. Zostanie on przesłany jako ciąg (patrz readingTimestamp
powyżej). Usługa ASA może łatwo rozwiązać ten problem, ale pokazuje potrzebę weryfikacji i jawnego rzutowania typów. Tym bardziej dane serializowane w pliku CSV, ponieważ wszystkie wartości są następnie przesyłane jako ciąg.
Istnieje kolejna rozbieżność. Usługa ASA używa własnego systemu typów, który nie jest zgodny z systemem przychodzącym. Jeśli usługa ASA ma wbudowane typy dla liczb całkowitych (bigint), datetime, string (nvarchar(max)) i tablic, obsługuje tylko liczbowe przez zmiennik zmiennoprzecinkowy. Ta niezgodność nie jest problemem dla większości aplikacji. Ale w niektórych przypadkach krawędzi może to spowodować niewielkie dryfy w precyzji. W takim przypadku przekonwertujemy wartość liczbową jako ciąg w nowym polu. Następnie użyjemy systemu obsługującego stałe liczby dziesiętne do wykrywania i poprawiania potencjalnych dryfów.
Wróć do naszego zapytania, w tym miejscu zamierzamy:
- Przekazywanie
readingStr
do funkcji zdefiniowanej przez użytkownika języka JavaScript - Zlicz liczbę rekordów w tablicy
- Zaokrąglaj
readingNum
do drugiego miejsca dziesiętnego - Wstawianie danych do tabeli SQL
Docelowa tabela SQL ma następujący schemat:
CREATE TABLE [dbo].[readings](
[Device_Id] int NULL,
[Reading_Timestamp] datetime2(7) NULL,
[Reading_String] nvarchar(200) NULL,
[Reading_Num] decimal(18,2) NULL,
[Array_Count] int NULL
) ON [PRIMARY]
Dobrym rozwiązaniem jest mapowanie tego, co dzieje się z każdym polem podczas wykonywania zadania:
Pole | Dane wejściowe (JSON) | Typ dziedziczony (ASA) | Dane wyjściowe (Azure SQL) | Komentarz |
---|---|---|---|---|
deviceId |
Liczba | bigint | integer | |
readingTimestamp |
string | nvarchar(MAX) | datetime2 | |
readingStr |
string | nvarchar(MAX) | nvarchar(200) | używane przez funkcję zdefiniowanej przez użytkownika |
readingNum |
Liczba | liczba zmiennoprzecinkowa | dziesiętne (18,2) | zaokrąglone |
readingArray |
array(string) | tablica nvarchar(MAX) | integer | do zliczenia |
Wymagania wstępne
Utworzymy zapytanie w programie Visual Studio Code przy użyciu rozszerzenia NARZĘDZIA ASA. Pierwsze kroki tego samouczka przeprowadzą Cię przez proces instalowania wymaganych składników.
W programie VS Code użyjemy lokalnych przebiegów z lokalnymi danymi wejściowymi/wyjściowymi, aby nie ponosić żadnych kosztów i przyspieszyć pętlę debugowania. Nie musimy konfigurować centrum zdarzeń ani usługi Azure SQL Database.
Zapytanie podstawowe
Zacznijmy od podstawowej implementacji bez walidacji danych wejściowych. Dodamy go w następnej sekcji.
W programie VS Code utworzymy nowy projekt ASA
W folderze input
utworzymy nowy plik JSON o nazwie data_readings.json
i dodamy do niego następujące rekordy:
[
{
"deviceId" : 1,
"readingTimestamp" : "2021-12-10T10:00:00",
"readingStr" : "A String",
"readingNum" : 1.7145,
"readingArray" : ["A","B"]
},
{
"deviceId" : 2,
"readingTimestamp" : "2021-12-10T10:01:00",
"readingStr" : "Another String",
"readingNum" : 2.378,
"readingArray" : ["C"]
},
{
"deviceId" : 3,
"readingTimestamp" : "2021-12-10T10:01:20",
"readingStr" : "A Third String",
"readingNum" : -4.85436,
"readingArray" : ["D","E","F"]
},
{
"deviceId" : 4,
"readingTimestamp" : "2021-12-10T10:02:10",
"readingStr" : "A Forth String",
"readingNum" : 1.2126,
"readingArray" : ["G","G"]
}
]
Następnie zdefiniujemy lokalne dane wejściowe o nazwie readings
, odwołując się do utworzonego powyżej pliku JSON.
Po skonfigurowaniu powinien wyglądać następująco:
{
"InputAlias": "readings",
"Type": "Data Stream",
"Format": "Json",
"FilePath": "data_readings.json",
"ScriptType": "InputMock"
}
W przypadku danych podglądu możemy zaobserwować, że nasze rekordy są prawidłowo ładowane.
Utworzymy nową funkcję zdefiniowaną przez użytkownika języka JavaScript wywoływaną udfLen
przez kliknięcie prawym przyciskiem myszy Functions
folderu i wybranie pozycji ASA: Add Function
. Kod, który użyjemy, to:
// Sample UDF that returns the length of a string for demonstration only: LEN will return the same thing in ASAQL
function main(arg1) {
return arg1.length;
}
W przypadku przebiegów lokalnych nie musimy definiować danych wyjściowych. Nie musimy nawet używać INTO
, chyba że istnieje więcej niż jedno dane wyjściowe. .asaql
W pliku możemy zastąpić istniejące zapytanie:
SELECT
r.deviceId,
r.readingTimestamp,
SUBSTRING(r.readingStr,1,200) AS readingStr,
ROUND(r.readingNum,2) AS readingNum,
COUNT(a.ArrayValue) AS arrayCount
FROM readings AS r TIMESTAMP BY r.readingTimestamp
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
System.Timestamp(), --snapshot window
r.deviceId,
r.readingTimestamp,
r.readingStr,
r.readingNum
Szybko przejmijmy przesłane zapytanie:
- Aby zliczyć liczbę rekordów w każdej tablicy, najpierw musimy je rozpakować. Użyjemy metody CROSS APPLY i GetArrayElements() (więcej przykładów tutaj)
- W tym celu w zapytaniu są wyświetlane dwa zestawy danych: oryginalne dane wejściowe i wartości tablicy. Aby upewnić się, że nie mieszamy pól, definiujemy aliasy (
AS r
) i używamy ich wszędzie - Następnie, aby rzeczywiście
COUNT
wartości tablicy, musimy agregować za pomocą funkcji GROUP BY - W tym celu musimy zdefiniować przedział czasu. Ponieważ nie potrzebujemy jednej dla naszej logiki, okno migawki jest właściwym wyborem
- W tym celu w zapytaniu są wyświetlane dwa zestawy danych: oryginalne dane wejściowe i wartości tablicy. Aby upewnić się, że nie mieszamy pól, definiujemy aliasy (
- Musimy również umieścić
GROUP BY
wszystkie pola i projektować je wszystkie w obiekcieSELECT
. Jawne projekcje pól jest dobrym rozwiązaniem, ponieważSELECT *
błędy będą przepływać z danych wejściowych do danych wyjściowych- Jeśli zdefiniujemy przedział czasu, możemy zdefiniować znacznik czasu z sygnaturą czasową TIMESTAMP BY. W tym miejscu nie jest konieczne, aby nasza logika działała. W przypadku przebiegów lokalnych bez
TIMESTAMP BY
wszystkich rekordów są ładowane na jeden znacznik czasu uruchomienia.
- Jeśli zdefiniujemy przedział czasu, możemy zdefiniować znacznik czasu z sygnaturą czasową TIMESTAMP BY. W tym miejscu nie jest konieczne, aby nasza logika działała. W przypadku przebiegów lokalnych bez
- Używamy funkcji zdefiniowanej przez użytkownika do filtrowania odczytów, w których
readingStr
jest mniej niż dwa znaki. W tym miejscu powinniśmy użyć len . Używamy funkcji zdefiniowanej przez użytkownika tylko do celów demonstracyjnych
Możemy uruchomić przebieg i obserwować przetwarzane dane:
deviceId | readingTimestamp | readingStr | readingNum | arrayCount |
---|---|---|---|---|
1 | 2021-12-10T10:00:00 | Ciąg | 1,71 | 2 |
2 | 2021-12-10T10:01:00 | Inny ciąg | 2.38 | 1 |
3 | 2021-12-10T10:01:20 | Trzeci ciąg | -4.85 | 3 |
1 | 2021-12-10T10:02:10 | Ciąg forth | 1.21 | 2 |
Teraz, gdy wiemy, że nasze zapytanie działa, przetestujmy je pod kątem większej ilości danych. Zastąpmy zawartość następujących data_readings.json
rekordów:
[
{
"deviceId" : 1,
"readingTimestamp" : "2021-12-10T10:00:00",
"readingStr" : "A String",
"readingNum" : 1.7145,
"readingArray" : ["A","B"]
},
{
"deviceId" : 2,
"readingTimestamp" : "2021-12-10T10:01:00",
"readingNum" : 2.378,
"readingArray" : ["C"]
},
{
"deviceId" : 3,
"readingTimestamp" : "2021-12-10T10:01:20",
"readingStr" : "A Third String",
"readingNum" : "NaN",
"readingArray" : ["D","E","F"]
},
{
"deviceId" : 4,
"readingTimestamp" : "2021-12-10T10:02:10",
"readingStr" : "A Forth String",
"readingNum" : 1.2126,
"readingArray" : {}
}
]
Poniżej przedstawiono następujące problemy:
- Urządzenie nr 1 zrobiło wszystko dobrze
- Urządzenie nr 2 nie może zawierać
readingStr
- Urządzenie nr 3 wysłane
NaN
jako liczba - Urządzenie nr 4 wysłało pusty rekord zamiast tablicy
Uruchomienie zadania nie powinno teraz zakończyć się dobrze. Zostanie wyświetlony jeden z następujących komunikatów o błędach:
Urządzenie 2 da nam:
[Error] 12/22/2021 10:05:59 PM : **System Exception** Function 'udflen' resulted in an error: 'TypeError: Unable to get property 'length' of undefined or null reference' Stack: TypeError: Unable to get property 'length' of undefined or null reference at main (Unknown script code:3:5)
[Error] 12/22/2021 10:05:59 PM : at Microsoft.EventProcessing.HostedRuntimes.JavaScript.JavaScriptHostedFunctionsRuntime.
Urządzenie 3 da nam:
[Error] 12/22/2021 9:52:32 PM : **System Exception** The 1st argument of function round has invalid type 'nvarchar(max)'. Only 'bigint', 'float' is allowed.
[Error] 12/22/2021 9:52:32 PM : at Microsoft.EventProcessing.SteamR.Sql.Runtime.Arithmetics.Round(CompilerPosition pos, Object value, Object length)
Urządzenie 4 da nam:
[Error] 12/22/2021 9:50:41 PM : **System Exception** Cannot cast value of type 'record' to type 'array' in expression 'r . readingArray'. At line '9' and column '30'. TRY_CAST function can be used to handle values with unexpected type.
[Error] 12/22/2021 9:50:41 PM : at Microsoft.EventProcessing.SteamR.Sql.Runtime.Cast.ToArray(CompilerPosition pos, Object value, Boolean isUserCast)
Za każdym razem, gdy źle sformułowane rekordy mogły przepływać z danych wejściowych do głównej logiki zapytania bez sprawdzania poprawności. Teraz zdajemy sobie sprawę z wartości walidacji danych wejściowych.
Implementowanie walidacji danych wejściowych
Rozszerzmy nasze zapytanie, aby zweryfikować dane wejściowe.
Pierwszym krokiem weryfikacji danych wejściowych jest zdefiniowanie oczekiwań schematu podstawowej logiki biznesowej. Patrząc wstecz na pierwotne wymaganie, naszą główną logiką jest:
- Przekazywanie
readingStr
do funkcji zdefiniowanej przez użytkownika języka JavaScript w celu pomiaru jego długości - Zlicz liczbę rekordów w tablicy
- Zaokrąglaj
readingNum
do drugiego miejsca dziesiętnego - Wstawianie danych do tabeli SQL
Dla każdego punktu możemy wymienić oczekiwania:
- Funkcja zdefiniowanej przez użytkownika wymaga argumentu ciągu typu (nvarchar(max), który nie może mieć wartości null
GetArrayElements()
wymaga argumentu typu tablicy lub wartości nullRound
wymaga argumentu typu bigint lub float albo wartości null- Zamiast polegać na niejawnych rzutowaniu usługi ASA, powinniśmy to zrobić samodzielnie i obsługiwać konflikty typów w zapytaniu
Jednym ze sposobów jest dostosowanie głównej logiki do obsługi tych wyjątków. Ale w tym przypadku uważamy, że nasza główna logika jest idealna. Zweryfikujmy więc dane przychodzące.
Najpierw użyjmy funkcji WITH , aby dodać warstwę weryfikacji danych wejściowych jako pierwszy krok zapytania. Użyjemy TRY_CAST, aby przekonwertować pola na ich oczekiwany typ i ustawimy je na NULL
wartość , jeśli konwersja zakończy się niepowodzeniem:
WITH readingsValidated AS (
SELECT
-- Rename incoming fields, used for audit and debugging
deviceId AS in_deviceId,
readingTimestamp AS in_readingTimestamp,
readingStr AS in_readingStr,
readingNum AS in_readingNum,
readingArray AS in_readingArray,
-- Try casting fields in their expected type
TRY_CAST(deviceId AS bigint) as deviceId,
TRY_CAST(readingTimestamp AS datetime) as readingTimestamp,
TRY_CAST(readingStr AS nvarchar(max)) as readingStr,
TRY_CAST(readingNum AS float) as readingNum,
TRY_CAST(readingArray AS array) as readingArray
FROM readings TIMESTAMP BY readingTimestamp
)
-- For debugging only
SELECT * FROM readingsValidated
W przypadku ostatniego użytego pliku wejściowego (z błędami) to zapytanie zwróci następujący zestaw:
in_deviceId | in_readingTimestamp | in_readingStr | in_readingNum | in_readingArray | deviceId | readingTimestamp | readingStr | readingNum | readingArray |
---|---|---|---|---|---|---|---|---|---|
1 | 2021-12-10T10:00:00 | Ciąg | 1.7145 | ["A","B"] | 1 | 2021-12-10T10:00:00.000000Z | Ciąg | 1.7145 | ["A","B"] |
2 | 2021-12-10T10:01:00 | NULL | 2.378 | ["C"] | 2 | 2021-12-10T10:01:00.000000Z | NULL | 2.378 | ["C"] |
3 | 2021-12-10T10:01:20 | Trzeci ciąg | Nan | ["D","E","F"] | 3 | 2021-12-10T10:01:20.000000Z | Trzeci ciąg | NULL | ["D","E","F"] |
100 | 2021-12-10T10:02:10 | Ciąg forth | 1.2126 | {} | 100 | 2021-12-10T10:02:10.000000Z | Ciąg forth | 1.2126 | NULL |
Widzimy już dwa nasze błędy, które są rozwiązywane. Przekształciliśmy element NaN
i {}
w NULL
. Teraz mamy pewność, że te rekordy zostaną poprawnie wstawione w docelowej tabeli SQL.
Teraz musimy zdecydować, jak adresować rekordy z brakującymi lub nieprawidłowymi wartościami. Po zakończeniu dyskusji decydujemy się odrzucić rekordy z pustym/nieprawidłowym readingArray
lub brakującym readingStr
elementem .
Dlatego dodamy drugą warstwę, która będzie klasyfikować rekordy między jedną i główną logiką weryfikacji:
WITH readingsValidated AS (
...
),
readingsToBeProcessed AS (
SELECT
deviceId,
readingTimestamp,
readingStr,
readingNum,
readingArray
FROM readingsValidated
WHERE
readingStr IS NOT NULL
AND readingArray IS NOT NULL
),
readingsToBeRejected AS (
SELECT
*
FROM readingsValidated
WHERE -- Same clauses as readingsToBeProcessed, opposed with NOT
NOT (
readingStr IS NOT NULL
AND readingArray IS NOT NULL
)
)
-- For debugging only
SELECT * INTO Debug1 FROM readingsToBeProcessed
SELECT * INTO Debug2 FROM readingsToBeRejected
Dobrym rozwiązaniem jest napisanie pojedynczej WHERE
klauzuli dla danych wyjściowych i użycie NOT (...)
ich w drugim. W ten sposób nie można wykluczyć żadnych rekordów zarówno z danych wyjściowych, jak i utraconych.
Teraz otrzymujemy dwa dane wyjściowe. Debug1 zawiera rekordy, które zostaną wysłane do głównej logiki:
deviceId | readingTimestamp | readingStr | readingNum | readingArray |
---|---|---|---|---|
1 | 2021-12-10T10:00:00.000000Z | Ciąg | 1.7145 | ["A","B"] |
3 | 2021-12-10T10:01:20.000000Z | Trzeci ciąg | NULL | ["D","E","F"] |
Debug2 zawiera rekordy, które zostaną odrzucone:
in_deviceId | in_readingTimestamp | in_readingStr | in_readingNum | in_readingArray | deviceId | readingTimestamp | readingStr | readingNum | readingArray |
---|---|---|---|---|---|---|---|---|---|
2 | 2021-12-10T10:01:00 | NULL | 2.378 | ["C"] | 2 | 2021-12-10T10:01:00.000000Z | NULL | 2.378 | ["C"] |
100 | 2021-12-10T10:02:10 | Ciąg forth | 1.2126 | {} | 100 | 2021-12-10T10:02:10.000000Z | Ciąg forth | 1.2126 | NULL |
Ostatnim krokiem jest dodanie głównej logiki z powrotem. Dodamy również dane wyjściowe, które zbierają odrzucenia. W tym miejscu najlepiej użyć karty wyjściowej, która nie wymusza silnego pisania, takiego jak konto magazynu.
Pełne zapytanie można znaleźć w ostatniej sekcji.
WITH
readingsValidated AS (...),
readingsToBeProcessed AS (...),
readingsToBeRejected AS (...)
SELECT
r.deviceId,
r.readingTimestamp,
SUBSTRING(r.readingStr,1,200) AS readingStr,
ROUND(r.readingNum,2) AS readingNum,
COUNT(a.ArrayValue) AS arrayCount
INTO SQLOutput
FROM readingsToBeProcessed AS r
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
System.Timestamp(), --snapshot window
r.deviceId,
r.readingTimestamp,
r.readingStr,
r.readingNum
SELECT
*
INTO BlobOutput
FROM readingsToBeRejected
Co da nam następujący zestaw dla sqlOutput, bez możliwego błędu:
deviceId | readingTimestamp | readingStr | readingNum | readingArray |
---|---|---|---|---|
1 | 2021-12-10T10:00:00.000000Z | Ciąg | 1.7145 | 2 |
3 | 2021-12-10T10:01:20.000000Z | Trzeci ciąg | NULL | 3 |
Pozostałe dwa rekordy są wysyłane do obiektu BlobOutput w celu przeglądu przez człowieka i przetwarzania końcowego. Nasze zapytanie jest teraz bezpieczne.
Przykład zapytania z walidacją danych wejściowych
WITH readingsValidated AS (
SELECT
-- Rename incoming fields, used for audit and debugging
deviceId AS in_deviceId,
readingTimestamp AS in_readingTimestamp,
readingStr AS in_readingStr,
readingNum AS in_readingNum,
readingArray AS in_readingArray,
-- Try casting fields in their expected type
TRY_CAST(deviceId AS bigint) as deviceId,
TRY_CAST(readingTimestamp AS datetime) as readingTimestamp,
TRY_CAST(readingStr AS nvarchar(max)) as readingStr,
TRY_CAST(readingNum AS float) as readingNum,
TRY_CAST(readingArray AS array) as readingArray
FROM readings TIMESTAMP BY readingTimestamp
),
readingsToBeProcessed AS (
SELECT
deviceId,
readingTimestamp,
readingStr,
readingNum,
readingArray
FROM readingsValidated
WHERE
readingStr IS NOT NULL
AND readingArray IS NOT NULL
),
readingsToBeRejected AS (
SELECT
*
FROM readingsValidated
WHERE -- Same clauses as readingsToBeProcessed, opposed with NOT
NOT (
readingStr IS NOT NULL
AND readingArray IS NOT NULL
)
)
-- Core business logic
SELECT
r.deviceId,
r.readingTimestamp,
SUBSTRING(r.readingStr,1,200) AS readingStr,
ROUND(r.readingNum,2) AS readingNum,
COUNT(a.ArrayValue) AS arrayCount
INTO SQLOutput
FROM readingsToBeProcessed AS r
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
System.Timestamp(), --snapshot window
r.deviceId,
r.readingTimestamp,
r.readingStr,
r.readingNum
-- Rejected output. For human review, correction, and manual re-insertion downstream
SELECT
*
INTO BlobOutput -- to a storage adapter that doesn't require strong typing, here blob/adls
FROM readingsToBeRejected
Rozszerzanie walidacji danych wejściowych
Funkcja GetType może służyć do jawnego sprawdzania typu. Działa dobrze w projekcji CASE lub WHERE na poziomie ustawionym. GetType
można również użyć do dynamicznego sprawdzania schematu przychodzącego względem repozytorium metadanych. Repozytorium można załadować za pomocą zestawu danych referencyjnych.
Testowanie jednostkowe jest dobrym rozwiązaniem, aby upewnić się, że nasze zapytanie jest odporne. Skompilujemy serię testów, które składają się z plików wejściowych i ich oczekiwanych danych wyjściowych. Nasze zapytanie będzie musiało być zgodne z danymi wyjściowymi, które generuje, aby przekazać. W usłudze ASA testowanie jednostkowe odbywa się za pośrednictwem modułu npm asa-streamanalytics-cicd . Przypadki testowe z różnymi źle sformułowanymi zdarzeniami powinny być tworzone i testowane w potoku wdrażania.
Na koniec możemy przeprowadzić testy integracji lekkiej w programie VS Code. Możemy wstawić rekordy do tabeli SQL za pośrednictwem lokalnego uruchomienia do danych wyjściowych na żywo.
Uzyskiwanie pomocy technicznej
Aby uzyskać dalszą pomoc, wypróbuj stronę pytań i odpowiedzi firmy Microsoft dotyczącą usługi Azure Stream Analytics.
Następne kroki
- Konfigurowanie potoków ciągłej integracji/ciągłego wdrażania i testowania jednostkowego przy użyciu pakietu npm
- Omówienie lokalnych przebiegów usługi Stream Analytics w programie Visual Studio Code za pomocą narzędzi ASA
- Testowanie zapytań usługi Stream Analytics lokalnie przy użyciu przykładowych danych przy użyciu programu Visual Studio Code
- Testowanie zapytań usługi Stream Analytics lokalnie względem danych wejściowych strumienia na żywo przy użyciu programu Visual Studio Code
- Eksplorowanie zadań usługi Azure Stream Analytics za pomocą programu Visual Studio Code (wersja zapoznawcza)