Bagikan melalui


Validasi input dalam kueri Azure Stream Analytics

Validasi input adalah teknik yang digunakan untuk melindungi logika kueri utama dari peristiwa yang salah bentuk atau tidak terduga. Kueri ditingkatkan untuk memproses dan memeriksa catatan secara eksplisit sehingga tidak dapat memecahkan logika utama.

Untuk menerapkan validasi input, kami menambahkan dua langkah awal ke kueri. Pertama-tama kami memastikan skema yang diajukan ke logika bisnis inti sesuai dengan harapannya. Kami kemudian melakukan triase pengecualian, dan secara opsional mengarahkan catatan yang tidak valid ke output sekunder.

Kueri dengan validasi input akan disusun sebagai berikut:

WITH preProcessingStage AS (
	SELECT
		-- Rename incoming fields, used for audit and debugging
		field1 AS in_field1,
		field2 AS in_field2,
		...

		-- Try casting fields in their expected type
		TRY_CAST(field1 AS bigint) as field1,
		TRY_CAST(field2 AS array) as field2,
		...

	FROM myInput TIMESTAMP BY myTimestamp
),

triagedOK AS (
	SELECT -- Only fields in their new expected type
		field1,
		field2,
		...
	FROM preProcessingStage
	WHERE ( ... ) -- Clauses make sure that the core business logic expectations are satisfied
),

triagedOut AS (
	SELECT -- All fields to ease diagnostic
		*
	FROM preProcessingStage
	WHERE NOT (...) -- Same clauses as triagedOK, opposed with NOT
)

-- Core business logic
SELECT
	...
INTO myOutput
FROM triagedOK
...

-- Audit output. For human review, correction, and manual re-insertion downstream
SELECT
	*
INTO BlobOutput -- To a storage adapter that doesn't require strong typing, here blob/adls
FROM triagedOut

Untuk melihat contoh lengkap kueri yang disiapkan dengan validasi input, lihat bagian: Contoh kueri dengan validasi input.

Artikel ini mengilustrasikan cara menerapkan teknik ini.

Konteks

Pekerjaan Azure Stream Analytics (ASA) memproses data yang berasal dari aliran. Aliran adalah urutan data mentah yang ditransmisikan berseri (CSV, JSON, AVRO...). Untuk membaca dari aliran, aplikasi perlu mengetahui format serialisasi khusus yang digunakan. Di ASA, format serialisasi peristiwa harus ditentukan saat mengonfigurasi input streaming.

Setelah data dideserialisasi, skema perlu diterapkan untuk memberikan makna. Yang kami maksud dengan skema adalah daftar bidang dalam aliran, dan jenis datanya masing-masing. Dengan ASA, skema data yang masuk tidak perlu diatur pada level input. ASA malah mendukung skema masukan dinamis secara asli. Ini mengharapkan daftar bidang (kolom), dan jenisnya, berubah di antara peristiwa (baris). ASA juga akan menyimpulkan jenis data ketika tidak ada yang disediakan secara eksplisit, dan mencoba untuk secara implisit mentransmisikan jenis saat diperlukan.

Penanganan skema dinamis adalah fitur yang kuat, kunci untuk pemrosesan streaming. Aliran data sering kali berisi data dari berbagai sumber, dengan beberapa jenis peristiwa, masing-masing dengan skema unik. Untuk merutekan, memfilter, dan memproses peristiwa pada aliran tersebut, ASA harus menyerap semuanya apa pun skemanya.

Illustration of a pipeline with two fleet of devices sending data with conflicting schemas

Tetapi kemampuan yang ditawarkan oleh penanganan skema dinamis disertai dengan potensi kerugian. Peristiwa tak terduga dapat mengalir melalui logika kueri utama dan memecahkannya. Sebagai contoh, kita dapat menggunakan ROUND pada bidang jenis NVARCHAR(MAX). ASA secara implisit akan mentransmisikannya ke float agar sesuai dengan tanda tangan ROUND. Di sini kita mengharapkan, atau berharap, bidang ini akan selalu berisi nilai numerik. Tetapi ketika kita menerima suatu peristiwa dengan bidang yang diatur ke "NaN", atau jika bidang sepenuhnya hilang, pekerjaan itu mungkin gagal.

Dengan validasi input, kami menambahkan langkah-langkah awal ke kueri kami untuk menangani peristiwa yang salah informasi tersebut. Kami terutama akan menggunakan WITH dan TRY_CAST untuk menerapkannya.

Skenario: validasi input untuk produsen peristiwa yang tidak dapat diandalkan

Kami akan membangun pekerjaan ASA baru yang akan menyerap data dari satu hub peristiwa. Seperti yang sering terjadi, kami tidak bertanggung jawab atas pembuat data. Di sini produsennya adalah perangkat IoT yang dijual oleh beberapa vendor perangkat keras.

Bertemu dengan para pemangku kepentingan, kami menyepakati format dan skema serialisasi. Semua perangkat akan mendorong pesan tersebut ke hub peristiwa umum, input dari pekerjaan ASA.

Kontrak skema didefinisikan sebagai berikut:

Nama bidang Jenis bidang Deskripsi bidang
deviceId Bilangan bulat Pengidentifikasi perangkat unik
readingTimestamp Tanggalwaktu Waktu pesan, yang dihasilkan oleh gateway pusat
readingStr String
readingNum Numerik
readingArray Array String

Yang pada gilirannya memberi kita contoh pesan berikut di bawah serialisasi JSON:

{
    "deviceId" : 1,
    "readingTimestamp" : "2021-12-10T10:00:00",
    "readingStr" : "A String",
    "readingNum" : 1.7,
    "readingArray" : ["A","B"]
}

Kita sudah bisa melihat perbedaan antara kontrak skema dan implementasinya. Dalam format JSON, tidak ada jenis data untuk tanggalwaktu. Ini akan ditransmisikan sebagai string (lihat readingTimestamp di atas). ASA dapat dengan mudah mengatasi masalah ini, tetapi ini menunjukkan perlunya memvalidasi dan secara eksplisit mentransmisikan jenis. Terlebih lagi untuk data yang diserialisasi dalam CSV, karena semua nilai kemudian ditransmisikan sebagai string.

Ada perbedaan lain. ASA menggunakan sistem jenisnya sendiri yang tidak cocok dengan sistem yang masuk. Jika ASA memiliki jenis bawaan untuk bilangan bulat (bigint), tanggalwaktu, string (nvarchar(max)) dan array, ASA hanya mendukung numerik melalui float. Ketidakcocokan ini tidak menjadi masalah bagi sebagian besar aplikasi. Tetapi dalam kasus edge tertentu, itu bisa menyebabkan sedikit penyimpangan dalam presisi. Dalam hal ini, kita akan mengonversi nilai numerik sebagai string di bidang baru. Kemudian di hilir, kita akan menggunakan sistem yang mendukung desimal tetap untuk mendeteksi dan memperbaiki potensi penyimpangan.

Kembali ke kueri, di sini kami bermaksud untuk:

  • Teruskan readingStr ke JavaScript UDF
  • Hitung jumlah rekaman dalam array
  • Bulatkan readingNum ke tempat desimal kedua
  • Menyisipkan data ke dalam tabel SQL

Tabel SQL tujuan memiliki skema berikut:

CREATE TABLE [dbo].[readings](
    [Device_Id] int NULL,
    [Reading_Timestamp] datetime2(7) NULL,
    [Reading_String] nvarchar(200) NULL,
    [Reading_Num] decimal(18,2) NULL,
    [Array_Count] int NULL
) ON [PRIMARY]

Ini adalah praktik yang baik untuk memetakan apa yang terjadi pada setiap bidang saat melewati pekerjaan:

Bidang Input (JSON) Jenis yang diwariskan (ASA) Output (Azure SQL) Komentar
deviceId number bigint Integer
readingTimestamp string nvarchar(MAX) tanggalwaktu2
readingStr string nvarchar(MAX) nvarchar(200) digunakan oleh UDF
readingNum number float desimal(18,2) untuk dibulatkan
readingArray array(string) array dari nvarchar(MAX) Integer untuk dihitung

Prasyarat

Kami akan mengembangkan kueri dalam Visual Studio Code menggunakan ekstensi Alat ASA. Langkah pertama dari tutorial ini akan memandu Anda dalam memasang komponen yang diperlukan.

Dalam Visual Studio Code, kami akan menggunakan penjalanan lokal dengan I/O lokal agar tidak dikenakan biaya apa pun, dan mempercepat perulangan debug. Kami tidak perlu menyiapkan hub peristiwa atau Azure SQL Database.

Kueri dasar

Mari kita mulai dengan implementasi dasar, tanpa validasi input. Kita akan menambahkannya di bagian berikutnya.

Di Visual Studio Code, kita akan membuat proyek ASA baru

Di folder input, kita akan membuat file JSON baru bernama data_readings.json dan menambahkan catatan berikut ke dalamnya:

[
    {
        "deviceId" : 1,
        "readingTimestamp" : "2021-12-10T10:00:00",
        "readingStr" : "A String",
        "readingNum" : 1.7145,
        "readingArray" : ["A","B"]
    },
    {
        "deviceId" : 2,
        "readingTimestamp" : "2021-12-10T10:01:00",
        "readingStr" : "Another String",
        "readingNum" : 2.378,
        "readingArray" : ["C"]
    },
    {
        "deviceId" : 3,
        "readingTimestamp" : "2021-12-10T10:01:20",
        "readingStr" : "A Third String",
        "readingNum" : -4.85436,
        "readingArray" : ["D","E","F"]
    },
    {
        "deviceId" : 4,
        "readingTimestamp" : "2021-12-10T10:02:10",
        "readingStr" : "A Forth String",
        "readingNum" : 1.2126,
        "readingArray" : ["G","G"]
    }
]

Kemudian kita akan menentukan input lokal, yang disebut readings, yang merujuk pada file JSON yang kita buat di atas.

Setelah dikonfigurasi akan terlihat seperti ini:

{
    "InputAlias": "readings",
    "Type": "Data Stream",
    "Format": "Json",
    "FilePath": "data_readings.json",
    "ScriptType": "InputMock"
}

Dengan data pratinjau, kita dapat mengamati bahwa catatan kita dimuat dengan benar.

Kita akan membuat UDF JavaScript baru yang dipanggil udfLen dengan mengeklik kanan folder Functions dan memilih ASA: Add Function. Kode yang akan kita gunakan adalah:

// Sample UDF that returns the length of a string for demonstration only: LEN will return the same thing in ASAQL
function main(arg1) {
    return arg1.length;
}

Dalam eksekusi lokal, kita tidak perlu menentukan output. Kita bahkan tidak perlu menggunakan INTO kecuali ada lebih dari satu output. Dalam file .asaql, kita dapat mengganti kueri yang ada dengan:

SELECT
	r.deviceId,
	r.readingTimestamp,
	SUBSTRING(r.readingStr,1,200) AS readingStr,
	ROUND(r.readingNum,2) AS readingNum,
	COUNT(a.ArrayValue) AS arrayCount
FROM readings AS r TIMESTAMP BY r.readingTimestamp
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
	System.Timestamp(), --snapshot window
	r.deviceId,
	r.readingTimestamp,
	r.readingStr,
	r.readingNum

Mari kita segera menelusuri kueri yang kita kirimkan:

  • Untuk menghitung jumlah catatan di setiap array, pertama-tama kita perlu membongkarnya. Kita akan menggunakan CROSS APPLY dan GetArrayElements() (lebih banyak sampel di sini)
    • Dengan demikian, kita memunculkan dua set data dalam kueri: input asli dan nilai array. Untuk memastikan tidak mencampuradukkan bidang, kita menentukan alias (AS r) dan menggunakannya di mana-mana
    • Kemudian untuk benar-benar COUNT nilai array, kita perlu mengagregasi dengan GROUP BY
    • Untuk itu kita harus menentukan jendela waktu. Di sini karena kita tidak memerlukannya untuk logika kita, jendela snapshot adalah pilihan yang tepat
  • Kita juga harus GROUP BY semua bidang, dan memproyeksikan semuanya di SELECT. Secara eksplisit memproyeksikan bidang adalah praktik yang baik, karena SELECT * akan membiarkan kesalahan mengalir dari input ke output
    • Jika kita menentukan jendela waktu, kita mungkin ingin menentukan stempel waktu dengan TIMESTAMP BY. Di sini logika kita tidak perlu bekerja. Untuk eksekusi lokal, tanpa TIMESTAMP BY rekaman catatan dimuat pada satu stempel waktu, waktu mulai eksekusi.
  • Kita menggunakan UDF untuk memfilter pembacaan di mana readingStr memiliki kurang dari dua karakter. Kita seharusnya menggunakan LEN di sini. Kita menggunakan UDF hanya untuk tujuan demonstrasi

Kita dapat memulai eksekusi dan mengamati data yang sedang diproses:

deviceId readingTimestamp readingStr readingNum arrayCount
1 2021-12-10T10:00:00 String 1,71 2
2 2021-12-10T10:01:00 String Lain 2,38 1
3 2021-12-10T10:01:20 String Ketiga -4,85 3
1 2021-12-10T10:02:10 String Keempat 1.21 2

Sekarang setelah kita mengetahui bahwa kueri kita berfungsi, mari kita uji dengan lebih banyak data. Mari kita ganti konten data_readings.json dengan rekaman berikut:

[
    {
        "deviceId" : 1,
        "readingTimestamp" : "2021-12-10T10:00:00",
        "readingStr" : "A String",
        "readingNum" : 1.7145,
        "readingArray" : ["A","B"]
    },
    {
        "deviceId" : 2,
        "readingTimestamp" : "2021-12-10T10:01:00",
        "readingNum" : 2.378,
        "readingArray" : ["C"]
    },
    {
        "deviceId" : 3,
        "readingTimestamp" : "2021-12-10T10:01:20",
        "readingStr" : "A Third String",
        "readingNum" : "NaN",
        "readingArray" : ["D","E","F"]
    },
    {
        "deviceId" : 4,
        "readingTimestamp" : "2021-12-10T10:02:10",
        "readingStr" : "A Forth String",
        "readingNum" : 1.2126,
        "readingArray" : {}
    }
]

Di sini kita dapat melihat masalah berikut:

  • Perangkat #1 melakukan segalanya dengan benar
  • Perangkat #2 lupa menyertakan readingStr
  • Perangkat #3 mengirim NaN sebagai angka
  • Perangkat #4 mengirim rekaman kosong, bukan array

Menjalankan pekerjaan sekarang seharusnya tidak berakhir dengan baik. Kita akan mendapatkan salah satu pesan kesalahan berikut:

Perangkat 2 akan memberi kita:

[Error] 12/22/2021 10:05:59 PM : **System Exception** Function 'udflen' resulted in an error: 'TypeError: Unable to get property 'length' of undefined or null reference' Stack: TypeError: Unable to get property 'length' of undefined or null reference at main (Unknown script code:3:5)
[Error] 12/22/2021 10:05:59 PM :    at Microsoft.EventProcessing.HostedRuntimes.JavaScript.JavaScriptHostedFunctionsRuntime.

Perangkat 3 akan memberi kita:

[Error] 12/22/2021 9:52:32 PM : **System Exception** The 1st argument of function round has invalid type 'nvarchar(max)'. Only 'bigint', 'float' is allowed.
[Error] 12/22/2021 9:52:32 PM :    at Microsoft.EventProcessing.SteamR.Sql.Runtime.Arithmetics.Round(CompilerPosition pos, Object value, Object length)

Perangkat 4 akan memberi kita:

[Error] 12/22/2021 9:50:41 PM : **System Exception** Cannot cast value of type 'record' to type 'array' in expression 'r . readingArray'. At line '9' and column '30'. TRY_CAST function can be used to handle values with unexpected type.
[Error] 12/22/2021 9:50:41 PM :    at Microsoft.EventProcessing.SteamR.Sql.Runtime.Cast.ToArray(CompilerPosition pos, Object value, Boolean isUserCast)

Setiap kali catatan yang salah informasi diizinkan mengalir dari input ke logika kueri utama tanpa divalidasi. Sekarang kita menyadari nilai validasi input.

Menerapkan validasi input

Mari kita perpanjang kueri kita untuk memvalidasi input.

Langkah pertama validasi input adalah menentukan harapan skema logika bisnis inti. Melihat kembali persyaratan asli, logika utama kami adalah:

  • Teruskan readingStr ke JavaScript UDF untuk mengukur panjangnya
  • Hitung jumlah rekaman dalam array
  • Bulatkan readingNum ke tempat desimal kedua
  • Menyisipkan data ke dalam tabel SQL

Untuk setiap poin kita dapat membuat daftar harapan:

  • UDF memerlukan argumen jenis string (nvarchar(max) di sini) yang tidak bisa null
  • GetArrayElements() memerlukan argumen jenis array, atau nilai null
  • Round memerlukan argumen jenis bigint atau float, atau nilai nol
  • Daripada mengandalkan transmisi implisit ASA, kita harus melakukannya sendiri dan menangani konflik jenis dalam kueri

Salah satu cara yang harus dilakukan adalah mengadaptasi logika utama untuk menangani pengecualian ini. Namun dalam hal ini, kami percaya logika utama kami sempurna. Jadi mari kita validasi data yang masuk.

Pertama, mari gunakan WITH untuk menambahkan lapisan validasi input sebagai langkah pertama kueri. Kita akan menggunakan TRY_CAST untuk mengonversi bidang ke jenis yang diharapkan, dan mengaturnya ke NULL jika konversi gagal:

WITH readingsValidated AS (
	SELECT
		-- Rename incoming fields, used for audit and debugging
		deviceId AS in_deviceId,
		readingTimestamp AS in_readingTimestamp,
		readingStr AS in_readingStr,
		readingNum AS in_readingNum,
		readingArray AS in_readingArray,

		-- Try casting fields in their expected type
		TRY_CAST(deviceId AS bigint) as deviceId,
		TRY_CAST(readingTimestamp AS datetime) as readingTimestamp,
		TRY_CAST(readingStr AS nvarchar(max)) as readingStr,
		TRY_CAST(readingNum AS float) as readingNum,
		TRY_CAST(readingArray AS array) as readingArray

	FROM readings TIMESTAMP BY readingTimestamp
)

-- For debugging only
SELECT * FROM readingsValidated

Dengan file input terakhir yang kita gunakan (yang memiliki kesalahan), kueri ini akan mengembalikan set berikut:

in_deviceId in_readingTimestamp in_readingStr in_readingNum in_readingArray deviceId readingTimestamp readingStr readingNum readingArray
1 2021-12-10T10:00:00 String 1,7145 ["A","B"] 1 2021-12-10T10:00:00.0000000Z String 1,7145 ["A","B"]
2 2021-12-10T10:01:00 NULL 2,378 ["C"] 2 2021-12-10T10:01:00.0000000Z NULL 2,378 ["C"]
3 2021-12-10T10:01:20 String Ketiga NaN ["D","E","F"] 3 2021-12-10T10:01:20.0000000Z String Ketiga NULL ["D","E","F"]
4 2021-12-10T10:02:10 String Keempat 1,2126 {} 4 2021-12-10T10:02:10.0000000Z String Keempat 1,2126 NULL

Kita sudah dapat melihat dua kesalahan kita sedang ditangani. Kita mengubah NaN dan {} menjadi NULL. Kita sekarang yakin rekaman ini akan dimasukkan dengan benar di tabel SQL tujuan.

Kita sekarang harus memutuskan bagaimana mengatasi rekaman dengan nilai yang hilang atau tidak valid. Setelah beberapa diskusi, kita memutuskan untuk menolak catatan dengan readingArray kosong/tidak valid atau readingStr yang hilang.

Jadi kita menambahkan lapisan kedua yang akan melakukan triase catatan antara yang validasi dan logika utama:

WITH readingsValidated AS (
	...
),

readingsToBeProcessed AS (
	SELECT
		deviceId,
		readingTimestamp,
		readingStr,
		readingNum,
		readingArray
	FROM readingsValidated
	WHERE
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
),

readingsToBeRejected AS (
	SELECT
		*
	FROM readingsValidated
	WHERE -- Same clauses as readingsToBeProcessed, opposed with NOT
	NOT (
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
	)
)

-- For debugging only
SELECT * INTO Debug1 FROM readingsToBeProcessed
SELECT * INTO Debug2 FROM readingsToBeRejected

Merupakan praktik yang baik untuk menulis satu klausul WHERE untuk kedua output, dan menggunakan NOT (...) di yang kedua. Dengan cara itu tidak ada catatan yang dapat dikecualikan baik dari output maupun yang hilang.

Sekarang kita mendapatkan dua output. Debug1 memiliki rekaman yang akan dikirim ke logika utama:

deviceId readingTimestamp readingStr readingNum readingArray
1 2021-12-10T10:00:00.0000000Z String 1,7145 ["A","B"]
3 2021-12-10T10:01:20.0000000Z String Ketiga NULL ["D","E","F"]

Debug2 memiliki rekaman yang akan ditolak:

in_deviceId in_readingTimestamp in_readingStr in_readingNum in_readingArray deviceId readingTimestamp readingStr readingNum readingArray
2 2021-12-10T10:01:00 NULL 2,378 ["C"] 2 2021-12-10T10:01:00.0000000Z NULL 2,378 ["C"]
4 2021-12-10T10:02:10 String Keempat 1,2126 {} 4 2021-12-10T10:02:10.0000000Z String Keempat 1,2126 NULL

Langkah terakhir adalah menambahkan logika utama kita kembali. Kita juga akan menambahkan output yang mengumpulkan penolakan. Di sini yang terbaik adalah menggunakan adapter output yang tidak memaksakan pengetikan yang kuat, seperti akun penyimpanan.

Kueri lengkap dapat ditemukan di bagian terakhir.

WITH
readingsValidated AS (...),
readingsToBeProcessed AS (...),
readingsToBeRejected AS (...)

SELECT
	r.deviceId,
	r.readingTimestamp,
	SUBSTRING(r.readingStr,1,200) AS readingStr,
	ROUND(r.readingNum,2) AS readingNum,
	COUNT(a.ArrayValue) AS arrayCount
INTO SQLOutput
FROM readingsToBeProcessed AS r
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
	System.Timestamp(), --snapshot window
	r.deviceId,
	r.readingTimestamp,
	r.readingStr,
	r.readingNum

SELECT
	*
INTO BlobOutput
FROM readingsToBeRejected

Yang akan memberi kita set berikut untuk SQLOutput, tanpa kemungkinan kesalahan:

deviceId readingTimestamp readingStr readingNum readingArray
1 2021-12-10T10:00:00.0000000Z String 1,7145 2
3 2021-12-10T10:01:20.0000000Z String Ketiga NULL 3

Dua catatan lainnya dikirim ke BlobOutput untuk tinjauan manusia dan pemrosesan pasca. Kueri kita sekarang aman.

Contoh kueri dengan validasi input

WITH readingsValidated AS (
	SELECT
		-- Rename incoming fields, used for audit and debugging
		deviceId AS in_deviceId,
		readingTimestamp AS in_readingTimestamp,
		readingStr AS in_readingStr,
		readingNum AS in_readingNum,
		readingArray AS in_readingArray,

		-- Try casting fields in their expected type
		TRY_CAST(deviceId AS bigint) as deviceId,
		TRY_CAST(readingTimestamp AS datetime) as readingTimestamp,
		TRY_CAST(readingStr AS nvarchar(max)) as readingStr,
		TRY_CAST(readingNum AS float) as readingNum,
		TRY_CAST(readingArray AS array) as readingArray

	FROM readings TIMESTAMP BY readingTimestamp
),

readingsToBeProcessed AS (
	SELECT
		deviceId,
		readingTimestamp,
		readingStr,
		readingNum,
		readingArray
	FROM readingsValidated
	WHERE
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
),

readingsToBeRejected AS (
	SELECT
		*
	FROM readingsValidated
	WHERE -- Same clauses as readingsToBeProcessed, opposed with NOT
	NOT (
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
	)
)

-- Core business logic
SELECT
	r.deviceId,
	r.readingTimestamp,
	SUBSTRING(r.readingStr,1,200) AS readingStr,
	ROUND(r.readingNum,2) AS readingNum,
	COUNT(a.ArrayValue) AS arrayCount
INTO SQLOutput
FROM readingsToBeProcessed AS r
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
	System.Timestamp(), --snapshot window
	r.deviceId,
	r.readingTimestamp,
	r.readingStr,
	r.readingNum

-- Rejected output. For human review, correction, and manual re-insertion downstream
SELECT
	*
INTO BlobOutput -- to a storage adapter that doesn't require strong typing, here blob/adls
FROM readingsToBeRejected

Memperluas validasi input

GetType dapat digunakan untuk secara eksplisit memeriksa jenis. Ini bekerja dengan baik dengan CASE dalam proyeksi, atau WHERE pada tingkat yang ditetapkan. GetType juga dapat digunakan untuk memeriksa skema masuk secara dinamis terhadap repositori metadata. Repositori dapat dimuat melalui himpunan data referensi.

Pengujian unit adalah praktik yang baik untuk memastikan kueri bersifat tangguh. Kita akan membangun serangkaian tes yang terdiri dari file input dan output yang diharapkan. Kueri harus cocok dengan output yang dihasilkannya untuk diteruskan. Di ASA, pengujian unit dilakukan melalui modul npm asa-streamanalytics-cicd. Kasus pengujian dengan berbagai peristiwa yang salah bentuk harus dibuat dan diuji dalam alur penyebaran.

Akhirnya, kita dapat melakukan beberapa pengujian integrasi ringan di Visual Studio Code. Kita dapat menyisipkan rekaman ke dalam tabel SQL melalui eksekusi lokal ke output langsung.

Dapatkan dukungan

Untuk bantuan lebih lanjut, coba halaman pertanyaan Microsoft Q&A untuk Azure Stream Analytics.

Langkah berikutnya