Walidacja danych wejściowych w zapytaniach usługi Azure Stream Analytics

Walidacja danych wejściowych to technika, która służy do ochrony głównej logiki zapytań przed źle sformułowanym lub nieoczekiwanymi zdarzeniami. Zapytanie jest uaktualniane do jawnego przetwarzania i sprawdzania rekordów, aby nie mogły przerwać głównej logiki.

Aby zaimplementować walidację danych wejściowych, do zapytania dodamy dwa początkowe kroki. Najpierw upewniamy się, że schemat przesłany do podstawowej logiki biznesowej jest zgodny z oczekiwaniami. Następnie klasyfikujemy wyjątki i opcjonalnie kierujemy nieprawidłowe rekordy do pomocniczych danych wyjściowych.

Zapytanie z weryfikacją danych wejściowych będzie ustrukturyzowane w następujący sposób:

WITH preProcessingStage AS (
	SELECT
		-- Rename incoming fields, used for audit and debugging
		field1 AS in_field1,
		field2 AS in_field2,
		...

		-- Try casting fields in their expected type
		TRY_CAST(field1 AS bigint) as field1,
		TRY_CAST(field2 AS array) as field2,
		...

	FROM myInput TIMESTAMP BY myTimestamp
),

triagedOK AS (
	SELECT -- Only fields in their new expected type
		field1,
		field2,
		...
	FROM preProcessingStage
	WHERE ( ... ) -- Clauses make sure that the core business logic expectations are satisfied
),

triagedOut AS (
	SELECT -- All fields to ease diagnostic
		*
	FROM preProcessingStage
	WHERE NOT (...) -- Same clauses as triagedOK, opposed with NOT
)

-- Core business logic
SELECT
	...
INTO myOutput
FROM triagedOK
...

-- Audit output. For human review, correction, and manual re-insertion downstream
SELECT
	*
INTO BlobOutput -- To a storage adapter that doesn't require strong typing, here blob/adls
FROM triagedOut

Aby zapoznać się z kompleksowym przykładem zapytania skonfigurowanego przy użyciu walidacji danych wejściowych, zobacz sekcję: Przykład zapytania z walidacją danych wejściowych.

W tym artykule pokazano, jak zaimplementować tę technikę.

Kontekst

Zadania usługi Azure Stream Analytics (ASA) przetwarzają dane pochodzące ze strumieni. Strumienie to sekwencje nieprzetworzonych danych, które są przesyłane serializowane (CSV, JSON, AVRO...). Aby odczytać ze strumienia, aplikacja musi znać używany konkretny format serializacji. W usłudze ASA należy zdefiniować format serializacji zdarzeń podczas konfigurowania danych wejściowych przesyłania strumieniowego.

Gdy dane zostaną zdeserializowane, należy zastosować schemat, aby nadać mu znaczenie. Według schematu oznaczamy listę pól w strumieniu i ich odpowiednie typy danych. W przypadku usługi ASA schemat danych przychodzących nie musi być ustawiany na poziomie danych wejściowych. Zamiast tego usługa ASA obsługuje dynamiczne schematy wejściowe natywnie. Oczekuje się, że lista pól (kolumn) i ich typów zmieni się między zdarzeniami (wierszami). Usługa ASA będzie również wywnioskować typy danych, gdy żadne nie zostanie podane jawnie, i spróbuje niejawnie rzutować typy w razie potrzeby.

Obsługa schematu dynamicznego to zaawansowana funkcja, klucz do przetwarzania strumieniowego. Strumienie danych często zawierają dane z wielu źródeł z wieloma typami zdarzeń, z których każdy ma unikatowy schemat. Aby kierować, filtrować i przetwarzać zdarzenia w takich strumieniach, usługa ASA musi pozyskiwać je wszystkie niezależnie od ich schematu.

Illustration of a pipeline with two fleet of devices sending data with conflicting schemas

Jednak możliwości oferowane przez obsługę schematu dynamicznego mają potencjalną wadę. Nieoczekiwane zdarzenia mogą przepływać przez główną logikę zapytania i przerywać je. Na przykład możemy użyć funkcji ROUND w polu typu NVARCHAR(MAX). Usługa ASA będzie niejawnie rzutować ją, aby dopasować ją do podpisu ROUND. W tym miejscu oczekujemy lub mamy nadzieję, że to pole zawsze będzie zawierać wartości liczbowe. Jeśli jednak otrzymamy zdarzenie z polem ustawionym na "NaN", lub jeśli pole jest całkowicie brakujące, zadanie może zakończyć się niepowodzeniem.

W przypadku walidacji danych wejściowych dodajemy wstępne kroki do zapytania w celu obsługi takich źle sformułowanych zdarzeń. Użyjemy przede wszystkim funkcji WITH i TRY_CAST , aby ją zaimplementować.

Scenariusz: walidacja danych wejściowych dla zawodnych producentów zdarzeń

Utworzymy nowe zadanie usługi ASA, które będzie pozyskiwać dane z jednego centrum zdarzeń. Tak jak najczęściej, nie jesteśmy odpowiedzialni za producentów danych. W tym miejscu producenci są urządzeniami IoT sprzedawanymi przez wielu dostawców sprzętu.

Na spotkaniu z uczestnikami projektu zgadzamy się na format serializacji i schemat. Wszystkie urządzenia będą wypychać takie komunikaty do wspólnego centrum zdarzeń, dane wejściowe zadania asa.

Kontrakt schematu jest definiowany w następujący sposób:

Nazwa pola Typ pola Opis pola
deviceId Integer Unikatowy identyfikator urządzenia
readingTimestamp Datetime Czas komunikatu generowany przez bramę centralną
readingStr String
readingNum Liczbowe
readingArray Tablica ciągów

Co z kolei daje nam następujący przykładowy komunikat w ramach serializacji JSON:

{
    "deviceId" : 1,
    "readingTimestamp" : "2021-12-10T10:00:00",
    "readingStr" : "A String",
    "readingNum" : 1.7,
    "readingArray" : ["A","B"]
}

Możemy już zobaczyć rozbieżność między kontraktem schematu a jego implementacją. W formacie JSON nie ma typu danych daty/godziny. Zostanie on przesłany jako ciąg (patrz readingTimestamp powyżej). Usługa ASA może łatwo rozwiązać ten problem, ale pokazuje potrzebę weryfikacji i jawnego rzutowania typów. Tym bardziej dane serializowane w pliku CSV, ponieważ wszystkie wartości są następnie przesyłane jako ciąg.

Istnieje kolejna rozbieżność. Usługa ASA używa własnego systemu typów, który nie jest zgodny z systemem przychodzącym. Jeśli usługa ASA ma wbudowane typy dla liczb całkowitych (bigint), datetime, string (nvarchar(max)) i tablic, obsługuje tylko liczbowe przez zmiennik zmiennoprzecinkowy. Ta niezgodność nie jest problemem dla większości aplikacji. Ale w niektórych przypadkach krawędzi może to spowodować niewielkie dryfy w precyzji. W takim przypadku przekonwertujemy wartość liczbową jako ciąg w nowym polu. Następnie użyjemy systemu obsługującego stałe liczby dziesiętne do wykrywania i poprawiania potencjalnych dryfów.

Wróć do naszego zapytania, w tym miejscu zamierzamy:

  • Przekazywanie readingStr do funkcji zdefiniowanej przez użytkownika języka JavaScript
  • Zlicz liczbę rekordów w tablicy
  • Zaokrąglaj readingNum do drugiego miejsca dziesiętnego
  • Wstawianie danych do tabeli SQL

Docelowa tabela SQL ma następujący schemat:

CREATE TABLE [dbo].[readings](
    [Device_Id] int NULL,
    [Reading_Timestamp] datetime2(7) NULL,
    [Reading_String] nvarchar(200) NULL,
    [Reading_Num] decimal(18,2) NULL,
    [Array_Count] int NULL
) ON [PRIMARY]

Dobrym rozwiązaniem jest mapowanie tego, co dzieje się z każdym polem podczas wykonywania zadania:

Pole Dane wejściowe (JSON) Typ dziedziczony (ASA) Dane wyjściowe (Azure SQL) Komentarz
deviceId Liczba bigint integer
readingTimestamp string nvarchar(MAX) datetime2
readingStr string nvarchar(MAX) nvarchar(200) używane przez funkcję zdefiniowanej przez użytkownika
readingNum Liczba liczba zmiennoprzecinkowa dziesiętne (18,2) zaokrąglone
readingArray array(string) tablica nvarchar(MAX) integer do zliczenia

Wymagania wstępne

Utworzymy zapytanie w programie Visual Studio Code przy użyciu rozszerzenia NARZĘDZIA ASA. Pierwsze kroki tego samouczka przeprowadzą Cię przez proces instalowania wymaganych składników.

W programie VS Code użyjemy lokalnych przebiegów z lokalnymi danymi wejściowymi/wyjściowymi, aby nie ponosić żadnych kosztów i przyspieszyć pętlę debugowania. Nie musimy konfigurować centrum zdarzeń ani usługi Azure SQL Database.

Zapytanie podstawowe

Zacznijmy od podstawowej implementacji bez walidacji danych wejściowych. Dodamy go w następnej sekcji.

W programie VS Code utworzymy nowy projekt ASA

W folderze input utworzymy nowy plik JSON o nazwie data_readings.json i dodamy do niego następujące rekordy:

[
    {
        "deviceId" : 1,
        "readingTimestamp" : "2021-12-10T10:00:00",
        "readingStr" : "A String",
        "readingNum" : 1.7145,
        "readingArray" : ["A","B"]
    },
    {
        "deviceId" : 2,
        "readingTimestamp" : "2021-12-10T10:01:00",
        "readingStr" : "Another String",
        "readingNum" : 2.378,
        "readingArray" : ["C"]
    },
    {
        "deviceId" : 3,
        "readingTimestamp" : "2021-12-10T10:01:20",
        "readingStr" : "A Third String",
        "readingNum" : -4.85436,
        "readingArray" : ["D","E","F"]
    },
    {
        "deviceId" : 4,
        "readingTimestamp" : "2021-12-10T10:02:10",
        "readingStr" : "A Forth String",
        "readingNum" : 1.2126,
        "readingArray" : ["G","G"]
    }
]

Następnie zdefiniujemy lokalne dane wejściowe o nazwie readings, odwołując się do utworzonego powyżej pliku JSON.

Po skonfigurowaniu powinien wyglądać następująco:

{
    "InputAlias": "readings",
    "Type": "Data Stream",
    "Format": "Json",
    "FilePath": "data_readings.json",
    "ScriptType": "InputMock"
}

W przypadku danych podglądu możemy zaobserwować, że nasze rekordy są prawidłowo ładowane.

Utworzymy nową funkcję zdefiniowaną przez użytkownika języka JavaScript wywoływaną udfLen przez kliknięcie prawym przyciskiem myszy Functions folderu i wybranie pozycji ASA: Add Function. Kod, który użyjemy, to:

// Sample UDF that returns the length of a string for demonstration only: LEN will return the same thing in ASAQL
function main(arg1) {
    return arg1.length;
}

W przypadku przebiegów lokalnych nie musimy definiować danych wyjściowych. Nie musimy nawet używać INTO , chyba że istnieje więcej niż jedno dane wyjściowe. .asaql W pliku możemy zastąpić istniejące zapytanie:

SELECT
	r.deviceId,
	r.readingTimestamp,
	SUBSTRING(r.readingStr,1,200) AS readingStr,
	ROUND(r.readingNum,2) AS readingNum,
	COUNT(a.ArrayValue) AS arrayCount
FROM readings AS r TIMESTAMP BY r.readingTimestamp
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
	System.Timestamp(), --snapshot window
	r.deviceId,
	r.readingTimestamp,
	r.readingStr,
	r.readingNum

Szybko przejmijmy przesłane zapytanie:

  • Aby zliczyć liczbę rekordów w każdej tablicy, najpierw musimy je rozpakować. Użyjemy metody CROSS APPLY i GetArrayElements() (więcej przykładów tutaj)
    • W tym celu w zapytaniu są wyświetlane dwa zestawy danych: oryginalne dane wejściowe i wartości tablicy. Aby upewnić się, że nie mieszamy pól, definiujemy aliasy (AS r) i używamy ich wszędzie
    • Następnie, aby rzeczywiście COUNT wartości tablicy, musimy agregować za pomocą funkcji GROUP BY
    • W tym celu musimy zdefiniować przedział czasu. Ponieważ nie potrzebujemy jednej dla naszej logiki, okno migawki jest właściwym wyborem
  • Musimy również umieścić GROUP BY wszystkie pola i projektować je wszystkie w obiekcie SELECT. Jawne projekcje pól jest dobrym rozwiązaniem, ponieważ SELECT * błędy będą przepływać z danych wejściowych do danych wyjściowych
    • Jeśli zdefiniujemy przedział czasu, możemy zdefiniować znacznik czasu z sygnaturą czasową TIMESTAMP BY. W tym miejscu nie jest konieczne, aby nasza logika działała. W przypadku przebiegów lokalnych bez TIMESTAMP BY wszystkich rekordów są ładowane na jeden znacznik czasu uruchomienia.
  • Używamy funkcji zdefiniowanej przez użytkownika do filtrowania odczytów, w których readingStr jest mniej niż dwa znaki. W tym miejscu powinniśmy użyć len . Używamy funkcji zdefiniowanej przez użytkownika tylko do celów demonstracyjnych

Możemy uruchomić przebieg i obserwować przetwarzane dane:

deviceId readingTimestamp readingStr readingNum arrayCount
1 2021-12-10T10:00:00 Ciąg 1,71 2
2 2021-12-10T10:01:00 Inny ciąg 2.38 1
3 2021-12-10T10:01:20 Trzeci ciąg -4.85 3
1 2021-12-10T10:02:10 Ciąg forth 1.21 2

Teraz, gdy wiemy, że nasze zapytanie działa, przetestujmy je pod kątem większej ilości danych. Zastąpmy zawartość następujących data_readings.json rekordów:

[
    {
        "deviceId" : 1,
        "readingTimestamp" : "2021-12-10T10:00:00",
        "readingStr" : "A String",
        "readingNum" : 1.7145,
        "readingArray" : ["A","B"]
    },
    {
        "deviceId" : 2,
        "readingTimestamp" : "2021-12-10T10:01:00",
        "readingNum" : 2.378,
        "readingArray" : ["C"]
    },
    {
        "deviceId" : 3,
        "readingTimestamp" : "2021-12-10T10:01:20",
        "readingStr" : "A Third String",
        "readingNum" : "NaN",
        "readingArray" : ["D","E","F"]
    },
    {
        "deviceId" : 4,
        "readingTimestamp" : "2021-12-10T10:02:10",
        "readingStr" : "A Forth String",
        "readingNum" : 1.2126,
        "readingArray" : {}
    }
]

Poniżej przedstawiono następujące problemy:

  • Urządzenie nr 1 zrobiło wszystko dobrze
  • Urządzenie nr 2 nie może zawierać readingStr
  • Urządzenie nr 3 wysłane NaN jako liczba
  • Urządzenie nr 4 wysłało pusty rekord zamiast tablicy

Uruchomienie zadania nie powinno teraz zakończyć się dobrze. Zostanie wyświetlony jeden z następujących komunikatów o błędach:

Urządzenie 2 da nam:

[Error] 12/22/2021 10:05:59 PM : **System Exception** Function 'udflen' resulted in an error: 'TypeError: Unable to get property 'length' of undefined or null reference' Stack: TypeError: Unable to get property 'length' of undefined or null reference at main (Unknown script code:3:5)
[Error] 12/22/2021 10:05:59 PM :    at Microsoft.EventProcessing.HostedRuntimes.JavaScript.JavaScriptHostedFunctionsRuntime.

Urządzenie 3 da nam:

[Error] 12/22/2021 9:52:32 PM : **System Exception** The 1st argument of function round has invalid type 'nvarchar(max)'. Only 'bigint', 'float' is allowed.
[Error] 12/22/2021 9:52:32 PM :    at Microsoft.EventProcessing.SteamR.Sql.Runtime.Arithmetics.Round(CompilerPosition pos, Object value, Object length)

Urządzenie 4 da nam:

[Error] 12/22/2021 9:50:41 PM : **System Exception** Cannot cast value of type 'record' to type 'array' in expression 'r . readingArray'. At line '9' and column '30'. TRY_CAST function can be used to handle values with unexpected type.
[Error] 12/22/2021 9:50:41 PM :    at Microsoft.EventProcessing.SteamR.Sql.Runtime.Cast.ToArray(CompilerPosition pos, Object value, Boolean isUserCast)

Za każdym razem, gdy źle sformułowane rekordy mogły przepływać z danych wejściowych do głównej logiki zapytania bez sprawdzania poprawności. Teraz zdajemy sobie sprawę z wartości walidacji danych wejściowych.

Implementowanie walidacji danych wejściowych

Rozszerzmy nasze zapytanie, aby zweryfikować dane wejściowe.

Pierwszym krokiem weryfikacji danych wejściowych jest zdefiniowanie oczekiwań schematu podstawowej logiki biznesowej. Patrząc wstecz na pierwotne wymaganie, naszą główną logiką jest:

  • Przekazywanie readingStr do funkcji zdefiniowanej przez użytkownika języka JavaScript w celu pomiaru jego długości
  • Zlicz liczbę rekordów w tablicy
  • Zaokrąglaj readingNum do drugiego miejsca dziesiętnego
  • Wstawianie danych do tabeli SQL

Dla każdego punktu możemy wymienić oczekiwania:

  • Funkcja zdefiniowanej przez użytkownika wymaga argumentu ciągu typu (nvarchar(max), który nie może mieć wartości null
  • GetArrayElements() wymaga argumentu typu tablicy lub wartości null
  • Round wymaga argumentu typu bigint lub float albo wartości null
  • Zamiast polegać na niejawnych rzutowaniu usługi ASA, powinniśmy to zrobić samodzielnie i obsługiwać konflikty typów w zapytaniu

Jednym ze sposobów jest dostosowanie głównej logiki do obsługi tych wyjątków. Ale w tym przypadku uważamy, że nasza główna logika jest idealna. Zweryfikujmy więc dane przychodzące.

Najpierw użyjmy funkcji WITH , aby dodać warstwę weryfikacji danych wejściowych jako pierwszy krok zapytania. Użyjemy TRY_CAST, aby przekonwertować pola na ich oczekiwany typ i ustawimy je na NULL wartość , jeśli konwersja zakończy się niepowodzeniem:

WITH readingsValidated AS (
	SELECT
		-- Rename incoming fields, used for audit and debugging
		deviceId AS in_deviceId,
		readingTimestamp AS in_readingTimestamp,
		readingStr AS in_readingStr,
		readingNum AS in_readingNum,
		readingArray AS in_readingArray,

		-- Try casting fields in their expected type
		TRY_CAST(deviceId AS bigint) as deviceId,
		TRY_CAST(readingTimestamp AS datetime) as readingTimestamp,
		TRY_CAST(readingStr AS nvarchar(max)) as readingStr,
		TRY_CAST(readingNum AS float) as readingNum,
		TRY_CAST(readingArray AS array) as readingArray

	FROM readings TIMESTAMP BY readingTimestamp
)

-- For debugging only
SELECT * FROM readingsValidated

W przypadku ostatniego użytego pliku wejściowego (z błędami) to zapytanie zwróci następujący zestaw:

in_deviceId in_readingTimestamp in_readingStr in_readingNum in_readingArray deviceId readingTimestamp readingStr readingNum readingArray
1 2021-12-10T10:00:00 Ciąg 1.7145 ["A","B"] 1 2021-12-10T10:00:00.000000Z Ciąg 1.7145 ["A","B"]
2 2021-12-10T10:01:00 NULL 2.378 ["C"] 2 2021-12-10T10:01:00.000000Z NULL 2.378 ["C"]
3 2021-12-10T10:01:20 Trzeci ciąg Nan ["D","E","F"] 3 2021-12-10T10:01:20.000000Z Trzeci ciąg NULL ["D","E","F"]
100 2021-12-10T10:02:10 Ciąg forth 1.2126 {} 100 2021-12-10T10:02:10.000000Z Ciąg forth 1.2126 NULL

Widzimy już dwa nasze błędy, które są rozwiązywane. Przekształciliśmy element NaN i {} w NULL. Teraz mamy pewność, że te rekordy zostaną poprawnie wstawione w docelowej tabeli SQL.

Teraz musimy zdecydować, jak adresować rekordy z brakującymi lub nieprawidłowymi wartościami. Po zakończeniu dyskusji decydujemy się odrzucić rekordy z pustym/nieprawidłowym readingArray lub brakującym readingStrelementem .

Dlatego dodamy drugą warstwę, która będzie klasyfikować rekordy między jedną i główną logiką weryfikacji:

WITH readingsValidated AS (
	...
),

readingsToBeProcessed AS (
	SELECT
		deviceId,
		readingTimestamp,
		readingStr,
		readingNum,
		readingArray
	FROM readingsValidated
	WHERE
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
),

readingsToBeRejected AS (
	SELECT
		*
	FROM readingsValidated
	WHERE -- Same clauses as readingsToBeProcessed, opposed with NOT
	NOT (
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
	)
)

-- For debugging only
SELECT * INTO Debug1 FROM readingsToBeProcessed
SELECT * INTO Debug2 FROM readingsToBeRejected

Dobrym rozwiązaniem jest napisanie pojedynczej WHERE klauzuli dla danych wyjściowych i użycie NOT (...) ich w drugim. W ten sposób nie można wykluczyć żadnych rekordów zarówno z danych wyjściowych, jak i utraconych.

Teraz otrzymujemy dwa dane wyjściowe. Debug1 zawiera rekordy, które zostaną wysłane do głównej logiki:

deviceId readingTimestamp readingStr readingNum readingArray
1 2021-12-10T10:00:00.000000Z Ciąg 1.7145 ["A","B"]
3 2021-12-10T10:01:20.000000Z Trzeci ciąg NULL ["D","E","F"]

Debug2 zawiera rekordy, które zostaną odrzucone:

in_deviceId in_readingTimestamp in_readingStr in_readingNum in_readingArray deviceId readingTimestamp readingStr readingNum readingArray
2 2021-12-10T10:01:00 NULL 2.378 ["C"] 2 2021-12-10T10:01:00.000000Z NULL 2.378 ["C"]
100 2021-12-10T10:02:10 Ciąg forth 1.2126 {} 100 2021-12-10T10:02:10.000000Z Ciąg forth 1.2126 NULL

Ostatnim krokiem jest dodanie głównej logiki z powrotem. Dodamy również dane wyjściowe, które zbierają odrzucenia. W tym miejscu najlepiej użyć karty wyjściowej, która nie wymusza silnego pisania, takiego jak konto magazynu.

Pełne zapytanie można znaleźć w ostatniej sekcji.

WITH
readingsValidated AS (...),
readingsToBeProcessed AS (...),
readingsToBeRejected AS (...)

SELECT
	r.deviceId,
	r.readingTimestamp,
	SUBSTRING(r.readingStr,1,200) AS readingStr,
	ROUND(r.readingNum,2) AS readingNum,
	COUNT(a.ArrayValue) AS arrayCount
INTO SQLOutput
FROM readingsToBeProcessed AS r
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
	System.Timestamp(), --snapshot window
	r.deviceId,
	r.readingTimestamp,
	r.readingStr,
	r.readingNum

SELECT
	*
INTO BlobOutput
FROM readingsToBeRejected

Co da nam następujący zestaw dla sqlOutput, bez możliwego błędu:

deviceId readingTimestamp readingStr readingNum readingArray
1 2021-12-10T10:00:00.000000Z Ciąg 1.7145 2
3 2021-12-10T10:01:20.000000Z Trzeci ciąg NULL 3

Pozostałe dwa rekordy są wysyłane do obiektu BlobOutput w celu przeglądu przez człowieka i przetwarzania końcowego. Nasze zapytanie jest teraz bezpieczne.

Przykład zapytania z walidacją danych wejściowych

WITH readingsValidated AS (
	SELECT
		-- Rename incoming fields, used for audit and debugging
		deviceId AS in_deviceId,
		readingTimestamp AS in_readingTimestamp,
		readingStr AS in_readingStr,
		readingNum AS in_readingNum,
		readingArray AS in_readingArray,

		-- Try casting fields in their expected type
		TRY_CAST(deviceId AS bigint) as deviceId,
		TRY_CAST(readingTimestamp AS datetime) as readingTimestamp,
		TRY_CAST(readingStr AS nvarchar(max)) as readingStr,
		TRY_CAST(readingNum AS float) as readingNum,
		TRY_CAST(readingArray AS array) as readingArray

	FROM readings TIMESTAMP BY readingTimestamp
),

readingsToBeProcessed AS (
	SELECT
		deviceId,
		readingTimestamp,
		readingStr,
		readingNum,
		readingArray
	FROM readingsValidated
	WHERE
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
),

readingsToBeRejected AS (
	SELECT
		*
	FROM readingsValidated
	WHERE -- Same clauses as readingsToBeProcessed, opposed with NOT
	NOT (
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
	)
)

-- Core business logic
SELECT
	r.deviceId,
	r.readingTimestamp,
	SUBSTRING(r.readingStr,1,200) AS readingStr,
	ROUND(r.readingNum,2) AS readingNum,
	COUNT(a.ArrayValue) AS arrayCount
INTO SQLOutput
FROM readingsToBeProcessed AS r
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
	System.Timestamp(), --snapshot window
	r.deviceId,
	r.readingTimestamp,
	r.readingStr,
	r.readingNum

-- Rejected output. For human review, correction, and manual re-insertion downstream
SELECT
	*
INTO BlobOutput -- to a storage adapter that doesn't require strong typing, here blob/adls
FROM readingsToBeRejected

Rozszerzanie walidacji danych wejściowych

Funkcja GetType może służyć do jawnego sprawdzania typu. Działa dobrze w projekcji CASE lub WHERE na poziomie ustawionym. GetType można również użyć do dynamicznego sprawdzania schematu przychodzącego względem repozytorium metadanych. Repozytorium można załadować za pomocą zestawu danych referencyjnych.

Testowanie jednostkowe jest dobrym rozwiązaniem, aby upewnić się, że nasze zapytanie jest odporne. Skompilujemy serię testów, które składają się z plików wejściowych i ich oczekiwanych danych wyjściowych. Nasze zapytanie będzie musiało być zgodne z danymi wyjściowymi, które generuje, aby przekazać. W usłudze ASA testowanie jednostkowe odbywa się za pośrednictwem modułu npm asa-streamanalytics-cicd . Przypadki testowe z różnymi źle sformułowanymi zdarzeniami powinny być tworzone i testowane w potoku wdrażania.

Na koniec możemy przeprowadzić testy integracji lekkiej w programie VS Code. Możemy wstawić rekordy do tabeli SQL za pośrednictwem lokalnego uruchomienia do danych wyjściowych na żywo.

Uzyskiwanie pomocy technicznej

Aby uzyskać dalszą pomoc, wypróbuj stronę pytań i odpowiedzi firmy Microsoft dotyczącą usługi Azure Stream Analytics.

Następne kroki