Sdílet prostřednictvím


Ověřování vstupu v dotazech Azure Stream Analytics

Ověřování vstupu je technika, která slouží k ochraně hlavní logiky dotazu před poškozenými nebo neočekávanými událostmi. Dotaz se upgraduje tak, aby explicitně zpracovával a kontroloval záznamy, aby nemohl přerušit hlavní logiku.

K implementaci ověřování vstupu přidáme do dotazu dva počáteční kroky. Nejprve se ujistěte, že schéma odeslané do základní obchodní logiky odpovídá očekávání. Potom provedeme třídění výjimek a volitelně směrujeme neplatné záznamy do sekundárního výstupu.

Dotaz s ověřováním vstupu bude strukturovaný takto:

WITH preProcessingStage AS (
	SELECT
		-- Rename incoming fields, used for audit and debugging
		field1 AS in_field1,
		field2 AS in_field2,
		...

		-- Try casting fields in their expected type
		TRY_CAST(field1 AS bigint) as field1,
		TRY_CAST(field2 AS array) as field2,
		...

	FROM myInput TIMESTAMP BY myTimestamp
),

triagedOK AS (
	SELECT -- Only fields in their new expected type
		field1,
		field2,
		...
	FROM preProcessingStage
	WHERE ( ... ) -- Clauses make sure that the core business logic expectations are satisfied
),

triagedOut AS (
	SELECT -- All fields to ease diagnostic
		*
	FROM preProcessingStage
	WHERE NOT (...) -- Same clauses as triagedOK, opposed with NOT
)

-- Core business logic
SELECT
	...
INTO myOutput
FROM triagedOK
...

-- Audit output. For human review, correction, and manual re-insertion downstream
SELECT
	*
INTO BlobOutput -- To a storage adapter that doesn't require strong typing, here blob/adls
FROM triagedOut

Pokud chcete zobrazit komplexní příklad dotazu nastaveného s ověřováním vstupu, přečtěte si část: Příklad dotazu s ověřováním vstupu.

Tento článek ukazuje, jak tuto techniku implementovat.

Kontext

Úlohy Azure Stream Analytics (ASA) zpracovávají data přicházející ze streamů. Toky jsou sekvence nezpracovaných dat přenášených serializovanými daty (CSV, JSON, AVRO...). Ke čtení ze streamu bude aplikace potřebovat znát konkrétní použitý formát serializace. V ASA musí být při konfiguraci vstupu streamování definován formát serializace událostí.

Jakmile jsou data deserializována, je potřeba použít schéma, aby to znamenalo. Podle schématu znamenáme seznam polí v datovém proudu a jejich odpovídající datové typy. U ASA nemusí být schéma příchozích dat nastaveno na úrovni vstupu. ASA místo toho podporuje nativně dynamická vstupní schémata . Očekává, že se změní seznam polí (sloupců) a jejich typů mezi událostmi (řádky). ASA také odvodí datové typy, pokud nejsou explicitně poskytnuty žádné, a v případě potřeby se pokusí implicitně přetypovat typy.

Dynamické zpracování schématu je výkonná funkce, která je klíčem ke zpracování datových proudů. Datové proudy často obsahují data z více zdrojů s více typy událostí, z nichž každý má jedinečné schéma. Aby bylo možné směrovat, filtrovat a zpracovávat události na takových datových proudech, musí ASA ingestovat všechny události bez ohledu na jejich schéma.

Illustration of a pipeline with two fleet of devices sending data with conflicting schemas

Možnosti nabízené dynamickým zpracováním schémat ale mají potenciální nevýhodu. Neočekávané události můžou protékat logikou hlavního dotazu a narušit je. Jako příklad můžeme použít funkci ROUND u pole typu NVARCHAR(MAX). ASA implicitně přetypuje plovoucí, aby odpovídala podpisu ROUND. Tady očekáváme nebo doufáme, že toto pole bude vždy obsahovat číselné hodnoty. Ale když obdržíme událost s polem nastaveným na "NaN", nebo pokud pole zcela chybí, může úloha selhat.

Při ověřování vstupu přidáme do dotazu předběžné kroky pro zpracování takových poškozených událostí. K implementaci použijeme především WITH a TRY_CAST .

Scénář: Ověřování vstupu pro nespolehlivé producenty událostí

Vytvoříme novou úlohu ASA, která bude ingestovat data z jednoho centra událostí. Jak je tomu nejčastěji, nejsme zodpovědní za producenty dat. Producenti jsou zařízení IoT prodaná několika dodavateli hardwaru.

Se zúčastněnými stranami se shodujeme s formátem serializace a schématem. Všechna zařízení budou tyto zprávy odesílat do společného centra událostí, vstup úlohy ASA.

Kontrakt schématu je definován takto:

Název pole Typ pole Popis pole
deviceId Celé číslo Jedinečný identifikátor zařízení
readingTimestamp Datum a čas Čas zprávy vygenerovaný centrální bránou
readingStr String
readingNum Číslo
readingArray Pole řetězce

To pak dává následující ukázkovou zprávu v rámci serializace JSON:

{
    "deviceId" : 1,
    "readingTimestamp" : "2021-12-10T10:00:00",
    "readingStr" : "A String",
    "readingNum" : 1.7,
    "readingArray" : ["A","B"]
}

Už vidíme nesrovnalosti mezi kontraktem schématu a jeho implementací. Ve formátu JSON neexistuje žádný datový typ data a času. Bude přenášen jako řetězec (viz výše readingTimestamp ). ASA může problém snadno vyřešit, ale ukazuje potřebu ověřovat a explicitně přetypovat typy. Tím více pro data serializovaná ve sdíleném svazku clusteru, protože všechny hodnoty se pak přenesou jako řetězec.

Došlo k dalším nesrovnalostem. ASA používá vlastní systém typů, který neodpovídá příchozímu systému . Pokud má ASA předdefinované typy pro celé číslo (bigint), datetime, řetězec (nvarchar(max)) a pole, podporuje pouze číselnou hodnotu prostřednictvím float. Tato neshoda není problém u většiny aplikací. Ale v některých hraničních případech by to mohlo způsobit mírné odchylky přesnosti. V tomto případě bychom číselnou hodnotu převedli jako řetězec v novém poli. Potom bychom použili systém, který podporuje pevné desetinné číslo, abychom zjistili a opravili potenciální posuny.

Vraťte se k našemu dotazu, tady máme v úmyslu:

  • Předání readingStr do uživatelem definovaného kódu JavaScriptu
  • Spočítat počet záznamů v poli
  • Zaokrouhlení readingNum na druhé desetinné místo
  • Vložení dat do tabulky SQL

Cílová tabulka SQL má následující schéma:

CREATE TABLE [dbo].[readings](
    [Device_Id] int NULL,
    [Reading_Timestamp] datetime2(7) NULL,
    [Reading_String] nvarchar(200) NULL,
    [Reading_Num] decimal(18,2) NULL,
    [Array_Count] int NULL
) ON [PRIMARY]

Je vhodné namapovat, co se stane s každým polem při procházení úlohou:

Pole Vstup (JSON) Zděděný typ (ASA) Výstup (Azure SQL) Komentář
deviceId Číslo bigint integer
readingTimestamp string nvarchar(MAX) datetime2
readingStr string nvarchar(MAX) nvarchar(200) používané uživatelem definované uživatelem
readingNum Číslo float (číslo s plovoucí řádovou čárkou) decimal(18;2) zaokrouhlit
readingArray array(řetězec) pole nvarchar(MAX) integer počítáno

Požadavky

Dotaz budeme vyvíjet v editoru Visual Studio Code pomocí rozšíření ASA Tools . První kroky tohoto kurzu vás provedou instalací požadovaných komponent.

V editoru VS Code použijeme místní spuštění s místním vstupem a výstupem, aby se neúčtovaly žádné náklady, a urychlili jsme ladicí smyčku. Nebudeme muset nastavit centrum událostí ani Azure SQL Database.

Základní dotaz

Začněme základní implementací bez ověření vstupu. Přidáme ho v další části.

Ve VS Code vytvoříme nový projekt ASA.

input Ve složce vytvoříme nový soubor JSON s názvem data_readings.json a přidáme do ní následující záznamy:

[
    {
        "deviceId" : 1,
        "readingTimestamp" : "2021-12-10T10:00:00",
        "readingStr" : "A String",
        "readingNum" : 1.7145,
        "readingArray" : ["A","B"]
    },
    {
        "deviceId" : 2,
        "readingTimestamp" : "2021-12-10T10:01:00",
        "readingStr" : "Another String",
        "readingNum" : 2.378,
        "readingArray" : ["C"]
    },
    {
        "deviceId" : 3,
        "readingTimestamp" : "2021-12-10T10:01:20",
        "readingStr" : "A Third String",
        "readingNum" : -4.85436,
        "readingArray" : ["D","E","F"]
    },
    {
        "deviceId" : 4,
        "readingTimestamp" : "2021-12-10T10:02:10",
        "readingStr" : "A Forth String",
        "readingNum" : 1.2126,
        "readingArray" : ["G","G"]
    }
]

Pak nadefinujeme místní vstup s názvem readings, který odkazuje na soubor JSON, který jsme vytvořili výše.

Po nakonfigurování by měl vypadat takto:

{
    "InputAlias": "readings",
    "Type": "Data Stream",
    "Format": "Json",
    "FilePath": "data_readings.json",
    "ScriptType": "InputMock"
}

Při zobrazení náhledu dat můžeme pozorovat, že se záznamy správně načítají.

Vytvoříme novou uživatelem definovanou uživatelem JavaScriptu , kterou zavoláme udfLen tak, že klikneme pravým tlačítkem myši na Functions složku a vybereme ASA: Add Function. Kód, který použijeme, je:

// Sample UDF that returns the length of a string for demonstration only: LEN will return the same thing in ASAQL
function main(arg1) {
    return arg1.length;
}

V místních spuštěních nemusíme definovat výstupy. Ani nemusíme používat INTO , pokud není více než jeden výstup. .asaql V souboru můžeme existující dotaz nahradit:

SELECT
	r.deviceId,
	r.readingTimestamp,
	SUBSTRING(r.readingStr,1,200) AS readingStr,
	ROUND(r.readingNum,2) AS readingNum,
	COUNT(a.ArrayValue) AS arrayCount
FROM readings AS r TIMESTAMP BY r.readingTimestamp
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
	System.Timestamp(), --snapshot window
	r.deviceId,
	r.readingTimestamp,
	r.readingStr,
	r.readingNum

Pojďme rychle projít dotaz, který jsme odeslali:

  • Abychom mohli spočítat počet záznamů v jednotlivých polích, musíme je napřed rozbalit. Použijeme CROSS APPLY a GetArrayElements() (další ukázky zde).
    • Tím v dotazu zobrazíme dvě datové sady: původní vstup a maticové hodnoty. Abychom se ujistili, že pole nekombinujeme, definujeme aliasy (AS r) a použijeme je všude.
    • Abychom pak mohli hodnoty pole skutečně COUNT agregovat pomocí funkce GROUP BY, musíme je agregovat.
    • Pro to musíme definovat časové období. Vzhledem k tomu, že pro naši logiku ho nepotřebujeme, je okno snímku správnou volbou.
  • Musíme také mít všechna pole a promítat GROUP BY je všechny do SELECT. Explicitní promítání polí je dobrým postupem, protože SELECT * chyby procházejí ze vstupu do výstupu.
    • Pokud definujeme časové okno, můžeme definovat časové razítko pomocí timesTAMP BY. Tady není potřeba, aby naše logika fungovala. U místních spuštění se bez TIMESTAMP BY všech záznamů načtou do jednoho časového razítka čas spuštění.
  • Funkci definovanou uživatelem používáme k filtrování čtení, u kterých readingStr je méně než dva znaky. Měli jsme tady použít LEN . K demonstračnímu účelu používáme funkci definovanou uživatelem.

Můžeme spustit spuštění a sledovat zpracovávaná data:

deviceId readingTimestamp readingStr readingNum arrayCount
0 2021-12-10T10:00:00 Řetězec 1.71 2
2 2021-12-10T10:01:00 Jiný řetězec 2.38 1
3 2021-12-10T10:01:20 Třetí řetězec -4.85 3
0 2021-12-10T10:02:10 A Forth String 1.21 2

Když teď víme, že náš dotaz funguje, otestujeme ho na více datech. Pojďme obsah nahradit data_readings.json následujícími záznamy:

[
    {
        "deviceId" : 1,
        "readingTimestamp" : "2021-12-10T10:00:00",
        "readingStr" : "A String",
        "readingNum" : 1.7145,
        "readingArray" : ["A","B"]
    },
    {
        "deviceId" : 2,
        "readingTimestamp" : "2021-12-10T10:01:00",
        "readingNum" : 2.378,
        "readingArray" : ["C"]
    },
    {
        "deviceId" : 3,
        "readingTimestamp" : "2021-12-10T10:01:20",
        "readingStr" : "A Third String",
        "readingNum" : "NaN",
        "readingArray" : ["D","E","F"]
    },
    {
        "deviceId" : 4,
        "readingTimestamp" : "2021-12-10T10:02:10",
        "readingStr" : "A Forth String",
        "readingNum" : 1.2126,
        "readingArray" : {}
    }
]

Tady vidíme následující problémy:

  • Zařízení č. 1 udělalo všechno správně
  • Zařízení č. 2 zapomnělo zahrnout readingStr
  • Zařízení č. 3 odesláno NaN jako číslo
  • Zařízení č. 4 odeslalo prázdný záznam místo pole.

Spuštění úlohy by teď nemělo končit dobře. Zobrazí se jedna z následujících chybových zpráv:

Zařízení 2 nám poskytne:

[Error] 12/22/2021 10:05:59 PM : **System Exception** Function 'udflen' resulted in an error: 'TypeError: Unable to get property 'length' of undefined or null reference' Stack: TypeError: Unable to get property 'length' of undefined or null reference at main (Unknown script code:3:5)
[Error] 12/22/2021 10:05:59 PM :    at Microsoft.EventProcessing.HostedRuntimes.JavaScript.JavaScriptHostedFunctionsRuntime.

Zařízení 3 nám poskytne:

[Error] 12/22/2021 9:52:32 PM : **System Exception** The 1st argument of function round has invalid type 'nvarchar(max)'. Only 'bigint', 'float' is allowed.
[Error] 12/22/2021 9:52:32 PM :    at Microsoft.EventProcessing.SteamR.Sql.Runtime.Arithmetics.Round(CompilerPosition pos, Object value, Object length)

Zařízení 4 nám poskytne:

[Error] 12/22/2021 9:50:41 PM : **System Exception** Cannot cast value of type 'record' to type 'array' in expression 'r . readingArray'. At line '9' and column '30'. TRY_CAST function can be used to handle values with unexpected type.
[Error] 12/22/2021 9:50:41 PM :    at Microsoft.EventProcessing.SteamR.Sql.Runtime.Cast.ToArray(CompilerPosition pos, Object value, Boolean isUserCast)

Při každém chybném záznamu bylo povoleno tok ze vstupu do hlavní logiky dotazu bez ověření. Teď si uvědomujeme hodnotu ověření vstupu.

Implementace ověřování vstupu

Pojďme náš dotaz rozšířit, abychom ověřili vstup.

Prvním krokem ověření vstupu je definování očekávání schémat základní obchodní logiky. Když se podíváme na původní požadavek, naší hlavní logikou je:

  • Předání readingStr do uživatelem definovaného kódu JavaScriptu pro měření jeho délky
  • Spočítat počet záznamů v poli
  • Zaokrouhlení readingNum na druhé desetinné místo
  • Vložení dat do tabulky SQL

Pro každý bod můžeme uvést očekávání:

  • Funkce definovaná uživatelem vyžaduje argument typu string (nvarchar(max) zde), který nemůže mít hodnotu null.
  • GetArrayElements() vyžaduje argument typu matice nebo hodnotu null.
  • Round vyžaduje argument typu bigint nebo float nebo hodnotu null.
  • Místo toho, abychom se museli spoléhat na implicitní přetypování ASA, měli bychom to udělat sami a zpracovávat konflikty typů v dotazu.

Jedním ze způsobů, jak přejít, je přizpůsobit hlavní logiku pro řešení těchto výjimek. V tomto případě ale věříme, že naše hlavní logika je dokonalá. Pojďme tedy místo toho ověřit příchozí data.

Nejprve pomocí funkce WITH přidáme vstupní ověřovací vrstvu jako první krok dotazu. Pomocí TRY_CAST převedeme pole na očekávaný typ a nastavíme je, NULL pokud převod selže:

WITH readingsValidated AS (
	SELECT
		-- Rename incoming fields, used for audit and debugging
		deviceId AS in_deviceId,
		readingTimestamp AS in_readingTimestamp,
		readingStr AS in_readingStr,
		readingNum AS in_readingNum,
		readingArray AS in_readingArray,

		-- Try casting fields in their expected type
		TRY_CAST(deviceId AS bigint) as deviceId,
		TRY_CAST(readingTimestamp AS datetime) as readingTimestamp,
		TRY_CAST(readingStr AS nvarchar(max)) as readingStr,
		TRY_CAST(readingNum AS float) as readingNum,
		TRY_CAST(readingArray AS array) as readingArray

	FROM readings TIMESTAMP BY readingTimestamp
)

-- For debugging only
SELECT * FROM readingsValidated

S posledním vstupním souborem, který jsme použili (ten s chybami), tento dotaz vrátí následující sadu:

in_deviceId in_readingTimestamp in_readingStr in_readingNum in_readingArray deviceId readingTimestamp readingStr readingNum readingArray
0 2021-12-10T10:00:00 Řetězec 1.7145 ["A","B"] 0 2021-12-10T10:00:00.0000000Z Řetězec 1.7145 ["A","B"]
2 2021-12-10T10:01:00 NULL 2.378 ["C"] 2 2021-12-10T10:01:00.0000000Z NULL 2.378 ["C"]
3 2021-12-10T10:01:20 Třetí řetězec Nan ["D","E","F"] 3 2021-12-10T10:01:20.0000000Z Třetí řetězec NULL ["D","E","F"]
4 2021-12-10T10:02:10 A Forth String 1.2126 {} 4 2021-12-10T10:02:10.000000Z A Forth String 1.2126 NULL

Už vidíme, že se řeší dvě chyby. Změnili NaN jsme a {} do NULL. Teď jsme si jisti, že se tyto záznamy správně vloží do cílové tabulky SQL.

Teď se musíme rozhodnout, jak řešit záznamy s chybějícími nebo neplatnými hodnotami. Po nějaké diskusi se rozhodneme odmítnout záznamy s prázdným nebo neplatným readingArray nebo chybějícím readingStr.

Proto přidáme druhou vrstvu, která bude záznamy pro třídění mezi ověřovací vrstvou a hlavní logikou:

WITH readingsValidated AS (
	...
),

readingsToBeProcessed AS (
	SELECT
		deviceId,
		readingTimestamp,
		readingStr,
		readingNum,
		readingArray
	FROM readingsValidated
	WHERE
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
),

readingsToBeRejected AS (
	SELECT
		*
	FROM readingsValidated
	WHERE -- Same clauses as readingsToBeProcessed, opposed with NOT
	NOT (
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
	)
)

-- For debugging only
SELECT * INTO Debug1 FROM readingsToBeProcessed
SELECT * INTO Debug2 FROM readingsToBeRejected

Je vhodné napsat jednu WHERE klauzuli pro oba výstupy a použít NOT (...) ji ve druhém. Tímto způsobem nelze vyloučit žádné záznamy z výstupů i ztrát.

Teď získáme dva výstupy. Debug1 obsahuje záznamy, které se odešlou do hlavní logiky:

deviceId readingTimestamp readingStr readingNum readingArray
0 2021-12-10T10:00:00.0000000Z Řetězec 1.7145 ["A","B"]
3 2021-12-10T10:01:20.0000000Z Třetí řetězec NULL ["D","E","F"]

Debug2 obsahuje záznamy, které budou odmítnuty:

in_deviceId in_readingTimestamp in_readingStr in_readingNum in_readingArray deviceId readingTimestamp readingStr readingNum readingArray
2 2021-12-10T10:01:00 NULL 2.378 ["C"] 2 2021-12-10T10:01:00.0000000Z NULL 2.378 ["C"]
4 2021-12-10T10:02:10 A Forth String 1.2126 {} 4 2021-12-10T10:02:10.000000Z A Forth String 1.2126 NULL

Posledním krokem je přidání hlavní logiky zpět. Přidáme také výstup, který shromažďuje zamítnutí. Tady je nejlepší použít výstupní adaptér, který nevynucuje silné psaní, jako je účet úložiště.

Úplný dotaz najdete v poslední části.

WITH
readingsValidated AS (...),
readingsToBeProcessed AS (...),
readingsToBeRejected AS (...)

SELECT
	r.deviceId,
	r.readingTimestamp,
	SUBSTRING(r.readingStr,1,200) AS readingStr,
	ROUND(r.readingNum,2) AS readingNum,
	COUNT(a.ArrayValue) AS arrayCount
INTO SQLOutput
FROM readingsToBeProcessed AS r
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
	System.Timestamp(), --snapshot window
	r.deviceId,
	r.readingTimestamp,
	r.readingStr,
	r.readingNum

SELECT
	*
INTO BlobOutput
FROM readingsToBeRejected

To nám poskytne následující sadu pro SQLOutput bez možné chyby:

deviceId readingTimestamp readingStr readingNum readingArray
0 2021-12-10T10:00:00.0000000Z Řetězec 1.7145 2
3 2021-12-10T10:01:20.0000000Z Třetí řetězec NULL 3

Další dva záznamy se posílají do objektu BlobOutput pro lidské zpracování a následné zpracování. Náš dotaz je teď bezpečný.

Příklad dotazu s ověřováním vstupu

WITH readingsValidated AS (
	SELECT
		-- Rename incoming fields, used for audit and debugging
		deviceId AS in_deviceId,
		readingTimestamp AS in_readingTimestamp,
		readingStr AS in_readingStr,
		readingNum AS in_readingNum,
		readingArray AS in_readingArray,

		-- Try casting fields in their expected type
		TRY_CAST(deviceId AS bigint) as deviceId,
		TRY_CAST(readingTimestamp AS datetime) as readingTimestamp,
		TRY_CAST(readingStr AS nvarchar(max)) as readingStr,
		TRY_CAST(readingNum AS float) as readingNum,
		TRY_CAST(readingArray AS array) as readingArray

	FROM readings TIMESTAMP BY readingTimestamp
),

readingsToBeProcessed AS (
	SELECT
		deviceId,
		readingTimestamp,
		readingStr,
		readingNum,
		readingArray
	FROM readingsValidated
	WHERE
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
),

readingsToBeRejected AS (
	SELECT
		*
	FROM readingsValidated
	WHERE -- Same clauses as readingsToBeProcessed, opposed with NOT
	NOT (
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
	)
)

-- Core business logic
SELECT
	r.deviceId,
	r.readingTimestamp,
	SUBSTRING(r.readingStr,1,200) AS readingStr,
	ROUND(r.readingNum,2) AS readingNum,
	COUNT(a.ArrayValue) AS arrayCount
INTO SQLOutput
FROM readingsToBeProcessed AS r
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
	System.Timestamp(), --snapshot window
	r.deviceId,
	r.readingTimestamp,
	r.readingStr,
	r.readingNum

-- Rejected output. For human review, correction, and manual re-insertion downstream
SELECT
	*
INTO BlobOutput -- to a storage adapter that doesn't require strong typing, here blob/adls
FROM readingsToBeRejected

Rozšíření ověřování vstupu

GetType lze použít k explicitní kontrole typu. Funguje dobře s CASE v projekci nebo WHERE na nastavené úrovni. GetType lze také použít k dynamické kontrole příchozího schématu v úložišti metadat. Úložiště lze načíst prostřednictvím referenční datové sady.

Testování jednotek je dobrým postupem, jak zajistit odolnost našeho dotazu. Vytvoříme řadu testů, které se skládají ze vstupních souborů a jejich očekávaného výstupu. Náš dotaz bude muset odpovídat výstupu, který vygeneruje, aby se předal. V ASA se testování jednotek provádí prostřednictvím modulu npm asa-streamanalytics-cicd . Testovací případy s různými poškozenými událostmi by se měly vytvořit a otestovat v kanálu nasazení.

Nakonec můžeme provést testování light integrace ve VS Code. Záznamy můžeme do tabulky SQL vložit prostřednictvím místního spuštění do živého výstupu.

Získání podpory

Pokud potřebujete další pomoc, vyzkoušejte naši stránku pro otázky Microsoftu pro Azure Stream Analytics.

Další kroky