Dela via


Indataverifiering i Azure Stream Analytics-frågor

Indataverifiering är en teknik som används för att skydda huvudfrågelogik från felaktiga eller oväntade händelser. Frågan uppgraderas för att uttryckligen bearbeta och kontrollera poster så att de inte kan bryta huvudlogik.

För att implementera indataverifiering lägger vi till två inledande steg i en fråga. Vi ser först till att schemat som skickas till kärnaffärslogik matchar förväntningarna. Sedan sorterar vi undantag och dirigerar ogiltiga poster till sekundära utdata.

En fråga med indataverifiering kommer att struktureras på följande sätt:

WITH preProcessingStage AS (
	SELECT
		-- Rename incoming fields, used for audit and debugging
		field1 AS in_field1,
		field2 AS in_field2,
		...

		-- Try casting fields in their expected type
		TRY_CAST(field1 AS bigint) as field1,
		TRY_CAST(field2 AS array) as field2,
		...

	FROM myInput TIMESTAMP BY myTimestamp
),

triagedOK AS (
	SELECT -- Only fields in their new expected type
		field1,
		field2,
		...
	FROM preProcessingStage
	WHERE ( ... ) -- Clauses make sure that the core business logic expectations are satisfied
),

triagedOut AS (
	SELECT -- All fields to ease diagnostic
		*
	FROM preProcessingStage
	WHERE NOT (...) -- Same clauses as triagedOK, opposed with NOT
)

-- Core business logic
SELECT
	...
INTO myOutput
FROM triagedOK
...

-- Audit output. For human review, correction, and manual re-insertion downstream
SELECT
	*
INTO BlobOutput -- To a storage adapter that doesn't require strong typing, here blob/adls
FROM triagedOut

Ett omfattande exempel på en fråga som konfigurerats med indataverifiering finns i avsnittet: Exempel på fråga med indataverifiering.

Den här artikeln visar hur du implementerar den här tekniken.

Kontext

Azure Stream Analytics-jobb (ASA) bearbetar data som kommer från strömmar. Flöden är sekvenser av rådata som överförs serialiserade (CSV, JSON, AVRO...). Om du vill läsa från en dataström måste ett program känna till det specifika serialiseringsformat som används. I ASA måste händelse serialiseringsformatet definieras när du konfigurerar en strömmande indata.

När data har deserialiserats måste ett schema tillämpas för att ge dem innebörd. Med schema menar vi listan över fält i dataströmmen och deras respektive datatyper. Med ASA behöver schemat för inkommande data inte anges på indatanivå. ASA stöder i stället dynamiska indatascheman internt. Den förväntar sig att listan över fält (kolumner) och deras typer ändras mellan händelser (rader). ASA härleder också datatyper när ingen anges explicit och försöker implicit omvandla typer när det behövs.

Dynamisk schemahantering är en kraftfull funktion, nyckeln till dataströmbearbetning. Dataströmmar innehåller ofta data från flera källor, med flera händelsetyper, var och en med ett unikt schema. För att dirigera, filtrera och bearbeta händelser i sådana strömmar måste ASA mata in dem alla oavsett schema.

Illustration of a pipeline with two fleet of devices sending data with conflicting schemas

Men de funktioner som erbjuds av dynamisk schemahantering har en potentiell nackdel. Oväntade händelser kan flöda genom huvudfrågelogik och bryta den. Vi kan till exempel använda ROUND i ett fält av typen NVARCHAR(MAX). ASA omvandlar den implicit så att den matchar signaturen ROUNDför . Här förväntar vi oss, eller hoppas, att det här fältet alltid innehåller numeriska värden. Men när vi får en händelse med fältet inställt på "NaN", eller om fältet saknas helt, kan jobbet misslyckas.

Med indataverifiering lägger vi till preliminära steg i vår fråga för att hantera sådana felaktiga händelser. Vi använder främst WITH och TRY_CAST för att implementera det.

Scenario: validering av indata för otillförlitliga händelseproducenter

Vi kommer att skapa ett nytt ASA-jobb som matar in data från en enda händelsehubb. Som oftast är vi inte ansvariga för dataproducenterna. Här är producenterna IoT-enheter som säljs av flera maskinvaruleverantörer.

Möte med intressenterna, vi är överens om ett serialiseringsformat och ett schema. Alla enheter skickar sådana meddelanden till en vanlig händelsehubb, indata för ASA-jobbet.

Schemakontraktet definieras på följande sätt:

Fältnamn Fälttyp Fältbeskrivning
deviceId Integer Unik enhetsidentifierare
readingTimestamp Datetime Meddelandetid, genererad av en central gateway
readingStr String
readingNum Numerisk
readingArray Strängmatris

Vilket i sin tur ger oss följande exempelmeddelande under JSON-serialisering:

{
    "deviceId" : 1,
    "readingTimestamp" : "2021-12-10T10:00:00",
    "readingStr" : "A String",
    "readingNum" : 1.7,
    "readingArray" : ["A","B"]
}

Vi kan redan se en avvikelse mellan schemakontraktet och dess implementering. I JSON-format finns det ingen datatyp för datetime. Den överförs som en sträng (se readingTimestamp ovan). ASA kan enkelt åtgärda problemet, men det visar behovet av att verifiera och uttryckligen konvertera typer. Desto mer för data som serialiseras i CSV, eftersom alla värden sedan överförs som sträng.

Det finns en annan avvikelse. ASA använder sitt eget typsystem som inte matchar den inkommande. Om ASA har inbyggda typer för heltal (bigint), datetime, sträng (nvarchar(max)) och matriser stöder det bara numeriskt via flyttal. Det här matchningsfelet är inte ett problem för de flesta program. Men i vissa kantfall kan det orsaka små drifter i precision. I det här fallet konverterar vi det numeriska värdet som sträng i ett nytt fält. Sedan använder vi ett system som stöder fasta decimaler för att identifiera och korrigera potentiella drifter.

Tillbaka till vår fråga har vi för avsikt att:

  • Skicka readingStr till en JavaScript UDF
  • Räkna antalet poster i matrisen
  • Avrunda readingNum till den andra decimalen
  • Infoga data i en SQL-tabell

Sql-måltabellen har följande schema:

CREATE TABLE [dbo].[readings](
    [Device_Id] int NULL,
    [Reading_Timestamp] datetime2(7) NULL,
    [Reading_String] nvarchar(200) NULL,
    [Reading_Num] decimal(18,2) NULL,
    [Array_Count] int NULL
) ON [PRIMARY]

Det är en bra idé att mappa vad som händer med varje fält när det går igenom jobbet:

Fält Indata (JSON) Ärvd typ (ASA) Utdata (Azure SQL) Kommentar
deviceId Nummer bigint integer
readingTimestamp sträng nvarchar(MAX) datetime2
readingStr sträng nvarchar(MAX) nvarchar(200) används av UDF
readingNum Nummer flyttal decimal(18,2) som ska avrundas
readingArray array(string) matris med nvarchar(MAX) integer som ska räknas

Förutsättningar

Vi utvecklar frågan i Visual Studio Code med hjälp av ASA Tools-tillägget . De första stegen i den här självstudien vägleder dig genom att installera nödvändiga komponenter.

I VS Code använder vi lokala körningar med lokala indata/utdata för att inte medföra några kostnader och påskynda felsökningsloopen. Vi behöver inte konfigurera en händelsehubb eller en Azure SQL Database.

Grundläggande fråga

Vi börjar med en grundläggande implementering utan indataverifiering. Vi lägger till den i nästa avsnitt.

I VS Code skapar vi ett nytt ASA-projekt

input I mappen skapar vi en ny JSON-fil med namnet data_readings.json och lägger till följande poster i den:

[
    {
        "deviceId" : 1,
        "readingTimestamp" : "2021-12-10T10:00:00",
        "readingStr" : "A String",
        "readingNum" : 1.7145,
        "readingArray" : ["A","B"]
    },
    {
        "deviceId" : 2,
        "readingTimestamp" : "2021-12-10T10:01:00",
        "readingStr" : "Another String",
        "readingNum" : 2.378,
        "readingArray" : ["C"]
    },
    {
        "deviceId" : 3,
        "readingTimestamp" : "2021-12-10T10:01:20",
        "readingStr" : "A Third String",
        "readingNum" : -4.85436,
        "readingArray" : ["D","E","F"]
    },
    {
        "deviceId" : 4,
        "readingTimestamp" : "2021-12-10T10:02:10",
        "readingStr" : "A Forth String",
        "readingNum" : 1.2126,
        "readingArray" : ["G","G"]
    }
]

Sedan definierar vi en lokal indata, kallad readings, som refererar till JSON-filen som vi skapade ovan.

När den har konfigurerats bör den se ut så här:

{
    "InputAlias": "readings",
    "Type": "Data Stream",
    "Format": "Json",
    "FilePath": "data_readings.json",
    "ScriptType": "InputMock"
}

Med förhandsversionsdata kan vi se att våra poster läses in korrekt.

Vi skapar en ny JavaScript UDF med namnet udfLen genom att högerklicka på Functions mappen och välja ASA: Add Function. Koden vi använder är:

// Sample UDF that returns the length of a string for demonstration only: LEN will return the same thing in ASAQL
function main(arg1) {
    return arg1.length;
}

I lokala körningar behöver vi inte definiera utdata. Vi behöver inte ens använda INTO om det inte finns fler än en utdata. .asaql I filen kan vi ersätta den befintliga frågan med:

SELECT
	r.deviceId,
	r.readingTimestamp,
	SUBSTRING(r.readingStr,1,200) AS readingStr,
	ROUND(r.readingNum,2) AS readingNum,
	COUNT(a.ArrayValue) AS arrayCount
FROM readings AS r TIMESTAMP BY r.readingTimestamp
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
	System.Timestamp(), --snapshot window
	r.deviceId,
	r.readingTimestamp,
	r.readingStr,
	r.readingNum

Vi går snabbt igenom frågan vi skickade:

  • För att räkna antalet poster i varje matris måste vi först packa upp dem. Vi använder CROSS APPLY och GetArrayElements() (fler exempel här)
    • När vi gör det visas två datauppsättningar i frågan: den ursprungliga indatan och matrisvärdena. För att se till att vi inte blandar ihop fält definierar vi alias (AS r) och använder dem överallt
    • För att kunna aggregera COUNT matrisvärdena måste vi sedan aggregera med GROUP BY
    • För det måste vi definiera ett tidsfönster. Eftersom vi inte behöver någon för vår logik är ögonblicksbildsfönstret rätt val
  • Vi måste också alla GROUP BY fält och projicera dem alla i SELECT. Det är en bra idé att uttryckligen projicera fält, vilket SELECT * gör att felen flödar från indata till utdata
    • Om vi definierar ett tidsfönster kanske vi vill definiera en tidsstämpel med TIMESTAMP BY. Här är det inte nödvändigt att vår logik fungerar. För lokala körningar, utan TIMESTAMP BY att alla poster läses in på en enda tidsstämpel, körs starttiden.
  • Vi använder UDF för att filtrera avläsningar där readingStr det finns färre än två tecken. Vi borde ha använt LEN här. Vi använder endast en UDF i demonstrationssyfte

Vi kan starta en körning och observera de data som bearbetas:

deviceId readingTimestamp readingStr readingNum arrayCount
1 2021-12-10T10:00:00 En sträng 1,71 2
2 2021-12-10T10:01:00 En annan sträng 2,38 1
3 2021-12-10T10:01:20 En tredje sträng -4.85 3
1 2021-12-10T10:02:10 En framåtsträng 1.21 2

Nu när vi vet att vår fråga fungerar ska vi testa den mot mer data. Nu ska vi ersätta innehållet data_readings.json i med följande poster:

[
    {
        "deviceId" : 1,
        "readingTimestamp" : "2021-12-10T10:00:00",
        "readingStr" : "A String",
        "readingNum" : 1.7145,
        "readingArray" : ["A","B"]
    },
    {
        "deviceId" : 2,
        "readingTimestamp" : "2021-12-10T10:01:00",
        "readingNum" : 2.378,
        "readingArray" : ["C"]
    },
    {
        "deviceId" : 3,
        "readingTimestamp" : "2021-12-10T10:01:20",
        "readingStr" : "A Third String",
        "readingNum" : "NaN",
        "readingArray" : ["D","E","F"]
    },
    {
        "deviceId" : 4,
        "readingTimestamp" : "2021-12-10T10:02:10",
        "readingStr" : "A Forth String",
        "readingNum" : 1.2126,
        "readingArray" : {}
    }
]

Här kan vi se följande problem:

  • Enhet nr 1 gjorde allt rätt
  • Enhet nr 2 har glömt att inkludera en readingStr
  • Enhet nr 3 skickad NaN som ett nummer
  • Enhet nr 4 skickade en tom post i stället för en matris

Att köra jobbet nu borde inte sluta bra. Vi får något av följande felmeddelanden:

Enhet 2 ger oss:

[Error] 12/22/2021 10:05:59 PM : **System Exception** Function 'udflen' resulted in an error: 'TypeError: Unable to get property 'length' of undefined or null reference' Stack: TypeError: Unable to get property 'length' of undefined or null reference at main (Unknown script code:3:5)
[Error] 12/22/2021 10:05:59 PM :    at Microsoft.EventProcessing.HostedRuntimes.JavaScript.JavaScriptHostedFunctionsRuntime.

Enhet 3 ger oss:

[Error] 12/22/2021 9:52:32 PM : **System Exception** The 1st argument of function round has invalid type 'nvarchar(max)'. Only 'bigint', 'float' is allowed.
[Error] 12/22/2021 9:52:32 PM :    at Microsoft.EventProcessing.SteamR.Sql.Runtime.Arithmetics.Round(CompilerPosition pos, Object value, Object length)

Enhet 4 ger oss:

[Error] 12/22/2021 9:50:41 PM : **System Exception** Cannot cast value of type 'record' to type 'array' in expression 'r . readingArray'. At line '9' and column '30'. TRY_CAST function can be used to handle values with unexpected type.
[Error] 12/22/2021 9:50:41 PM :    at Microsoft.EventProcessing.SteamR.Sql.Runtime.Cast.ToArray(CompilerPosition pos, Object value, Boolean isUserCast)

Varje gång felformade poster tilläts flöda från indata till huvudfrågelogik utan att verifieras. Nu inser vi värdet för indatavalidering.

Implementera indataverifiering

Nu ska vi utöka vår fråga för att verifiera indata.

Det första steget i indatavalidering är att definiera schemaförväntningarna för kärnaffärslogik. När vi ser tillbaka på det ursprungliga kravet är vår huvudlogik att:

  • Skicka readingStr till en JavaScript UDF för att mäta dess längd
  • Räkna antalet poster i matrisen
  • Avrunda readingNum till den andra decimalen
  • Infoga data i en SQL-tabell

För varje punkt kan vi lista förväntningarna:

  • UDF kräver ett argument av typen sträng (nvarchar(max) här) som inte kan vara null
  • GetArrayElements() kräver ett argument av typen matris, eller ett null-värde
  • Round kräver ett argument av typen bigint eller float, eller ett null-värde
  • I stället för att förlita oss på implicit gjutning av ASA bör vi göra det själva och hantera typkonflikter i frågan

Ett sätt att gå är att anpassa huvudlogik för att hantera dessa undantag. Men i det här fallet anser vi att vår huvudlogik är perfekt. Så låt oss validera inkommande data i stället.

Först ska vi använda WITH för att lägga till ett indataverifieringslager som det första steget i frågan. Vi använder TRY_CAST för att konvertera fält till deras förväntade typ och ange dem till NULL om konverteringen misslyckas:

WITH readingsValidated AS (
	SELECT
		-- Rename incoming fields, used for audit and debugging
		deviceId AS in_deviceId,
		readingTimestamp AS in_readingTimestamp,
		readingStr AS in_readingStr,
		readingNum AS in_readingNum,
		readingArray AS in_readingArray,

		-- Try casting fields in their expected type
		TRY_CAST(deviceId AS bigint) as deviceId,
		TRY_CAST(readingTimestamp AS datetime) as readingTimestamp,
		TRY_CAST(readingStr AS nvarchar(max)) as readingStr,
		TRY_CAST(readingNum AS float) as readingNum,
		TRY_CAST(readingArray AS array) as readingArray

	FROM readings TIMESTAMP BY readingTimestamp
)

-- For debugging only
SELECT * FROM readingsValidated

Med den sista indatafilen som vi använde (den med fel) returnerar den här frågan följande uppsättning:

in_deviceId in_readingTimestamp in_readingStr in_readingNum in_readingArray deviceId readingTimestamp readingStr readingNum readingArray
1 2021-12-10T10:00:00 En sträng 1.7145 ["A","B"] 1 2021-12-10T10:00:00.0000000Z En sträng 1.7145 ["A","B"]
2 2021-12-10T10:01:00 NULL 2.378 ["C"] 2 2021-12-10T10:01:00.0000000Z NULL 2.378 ["C"]
3 2021-12-10T10:01:20 En tredje sträng Nan ["D","E","F"] 3 2021-12-10T10:01:20.0000000Z En tredje sträng NULL ["D","E","F"]
4 2021-12-10T10:02:10 En framåtsträng 1.2126 {} 4 2021-12-10T10:02:10.0000000Z En framåtsträng 1.2126 NULL

Vi kan redan se två av våra fel åtgärdas. Vi förvandlade NaN och {} till NULL. Vi är nu övertygade om att dessa poster kommer att infogas korrekt i SQL-måltabellen.

Nu måste vi bestämma hur posterna ska hanteras med saknade eller ogiltiga värden. Efter en diskussion bestämmer vi oss för att avvisa poster med en tom/ogiltig readingArray eller en saknad readingStr.

Så vi lägger till ett andra lager som kommer att sortera poster mellan validering ett och huvudlogik:

WITH readingsValidated AS (
	...
),

readingsToBeProcessed AS (
	SELECT
		deviceId,
		readingTimestamp,
		readingStr,
		readingNum,
		readingArray
	FROM readingsValidated
	WHERE
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
),

readingsToBeRejected AS (
	SELECT
		*
	FROM readingsValidated
	WHERE -- Same clauses as readingsToBeProcessed, opposed with NOT
	NOT (
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
	)
)

-- For debugging only
SELECT * INTO Debug1 FROM readingsToBeProcessed
SELECT * INTO Debug2 FROM readingsToBeRejected

Det är bra att skriva en enda WHERE sats för båda utdata och använda NOT (...) i den andra. På så sätt kan inga poster undantas från både utdata och förlorade.

Nu får vi två utdata. Felsökning1 har de poster som ska skickas till huvudlogik:

deviceId readingTimestamp readingStr readingNum readingArray
1 2021-12-10T10:00:00.0000000Z En sträng 1.7145 ["A","B"]
3 2021-12-10T10:01:20.0000000Z En tredje sträng NULL ["D","E","F"]

Felsökning2 har de poster som kommer att avvisas:

in_deviceId in_readingTimestamp in_readingStr in_readingNum in_readingArray deviceId readingTimestamp readingStr readingNum readingArray
2 2021-12-10T10:01:00 NULL 2.378 ["C"] 2 2021-12-10T10:01:00.0000000Z NULL 2.378 ["C"]
4 2021-12-10T10:02:10 En framåtsträng 1.2126 {} 4 2021-12-10T10:02:10.0000000Z En framåtsträng 1.2126 NULL

Det sista steget är att lägga tillbaka vår huvudlogik. Vi lägger också till utdata som samlar in avslag. Här är det bäst att använda ett utdatakort som inte framtvingar stark skrivning, som ett lagringskonto.

Den fullständiga frågan finns i det sista avsnittet.

WITH
readingsValidated AS (...),
readingsToBeProcessed AS (...),
readingsToBeRejected AS (...)

SELECT
	r.deviceId,
	r.readingTimestamp,
	SUBSTRING(r.readingStr,1,200) AS readingStr,
	ROUND(r.readingNum,2) AS readingNum,
	COUNT(a.ArrayValue) AS arrayCount
INTO SQLOutput
FROM readingsToBeProcessed AS r
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
	System.Timestamp(), --snapshot window
	r.deviceId,
	r.readingTimestamp,
	r.readingStr,
	r.readingNum

SELECT
	*
INTO BlobOutput
FROM readingsToBeRejected

Vilket ger oss följande uppsättning för SQLOutput, utan något möjligt fel:

deviceId readingTimestamp readingStr readingNum readingArray
1 2021-12-10T10:00:00.0000000Z En sträng 1.7145 2
3 2021-12-10T10:01:20.0000000Z En tredje sträng NULL 3

De andra två posterna skickas till ett BlobOutput för mänsklig granskning och efter bearbetning. Vår fråga är nu säker.

Exempel på fråga med indataverifiering

WITH readingsValidated AS (
	SELECT
		-- Rename incoming fields, used for audit and debugging
		deviceId AS in_deviceId,
		readingTimestamp AS in_readingTimestamp,
		readingStr AS in_readingStr,
		readingNum AS in_readingNum,
		readingArray AS in_readingArray,

		-- Try casting fields in their expected type
		TRY_CAST(deviceId AS bigint) as deviceId,
		TRY_CAST(readingTimestamp AS datetime) as readingTimestamp,
		TRY_CAST(readingStr AS nvarchar(max)) as readingStr,
		TRY_CAST(readingNum AS float) as readingNum,
		TRY_CAST(readingArray AS array) as readingArray

	FROM readings TIMESTAMP BY readingTimestamp
),

readingsToBeProcessed AS (
	SELECT
		deviceId,
		readingTimestamp,
		readingStr,
		readingNum,
		readingArray
	FROM readingsValidated
	WHERE
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
),

readingsToBeRejected AS (
	SELECT
		*
	FROM readingsValidated
	WHERE -- Same clauses as readingsToBeProcessed, opposed with NOT
	NOT (
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
	)
)

-- Core business logic
SELECT
	r.deviceId,
	r.readingTimestamp,
	SUBSTRING(r.readingStr,1,200) AS readingStr,
	ROUND(r.readingNum,2) AS readingNum,
	COUNT(a.ArrayValue) AS arrayCount
INTO SQLOutput
FROM readingsToBeProcessed AS r
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
	System.Timestamp(), --snapshot window
	r.deviceId,
	r.readingTimestamp,
	r.readingStr,
	r.readingNum

-- Rejected output. For human review, correction, and manual re-insertion downstream
SELECT
	*
INTO BlobOutput -- to a storage adapter that doesn't require strong typing, here blob/adls
FROM readingsToBeRejected

Utöka indataverifiering

GetType kan användas för att uttryckligen söka efter en typ. Det fungerar bra med CASE i projektionen eller WHERE på den angivna nivån. GetType kan också användas för att dynamiskt kontrollera det inkommande schemat mot en metadatalagringsplats. Lagringsplatsen kan läsas in via en referensdatauppsättning.

Enhetstestning är en bra metod för att säkerställa att vår fråga är motståndskraftig. Vi skapar en serie tester som består av indatafiler och deras förväntade utdata. Vår fråga måste matcha de utdata som den genererar för att skickas. I ASA utförs enhetstestning via modulen asa-streamanalytics-cicd npm. Testfall med olika felaktiga händelser bör skapas och testas i distributionspipelinen.

Slutligen kan vi utföra några lätta integreringstester i VS Code. Vi kan infoga poster i SQL-tabellen via en lokal körning till liveutdata.

Få support

Om du vill ha mer hjälp kan du prova vår frågesida för Microsoft Q&A för Azure Stream Analytics.

Nästa steg