Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Validasi input adalah teknik yang digunakan untuk melindungi logika kueri utama dari peristiwa yang salah bentuk atau tidak terduga. Kueri ditingkatkan untuk memproses dan memeriksa catatan secara eksplisit sehingga tidak dapat memecahkan logika utama.
Untuk menerapkan validasi input, kami menambahkan dua langkah awal ke kueri. Pertama-tama kami memastikan skema yang diajukan ke logika bisnis inti sesuai dengan harapannya. Kami kemudian melakukan triase pengecualian, dan secara opsional mengarahkan catatan yang tidak valid ke output sekunder.
Kueri dengan validasi input akan disusun sebagai berikut:
WITH preProcessingStage AS (
SELECT
-- Rename incoming fields, used for audit and debugging
field1 AS in_field1,
field2 AS in_field2,
...
-- Try casting fields in their expected type
TRY_CAST(field1 AS bigint) as field1,
TRY_CAST(field2 AS array) as field2,
...
FROM myInput TIMESTAMP BY myTimestamp
),
triagedOK AS (
SELECT -- Only fields in their new expected type
field1,
field2,
...
FROM preProcessingStage
WHERE ( ... ) -- Clauses make sure that the core business logic expectations are satisfied
),
triagedOut AS (
SELECT -- All fields to ease diagnostic
*
FROM preProcessingStage
WHERE NOT (...) -- Same clauses as triagedOK, opposed with NOT
)
-- Core business logic
SELECT
...
INTO myOutput
FROM triagedOK
...
-- Audit output. For human review, correction, and manual re-insertion downstream
SELECT
*
INTO BlobOutput -- To a storage adapter that doesn't require strong typing, here blob/adls
FROM triagedOut
Untuk melihat contoh lengkap kueri yang disiapkan dengan validasi input, lihat bagian: Contoh kueri dengan validasi input.
Artikel ini mengilustrasikan cara menerapkan teknik ini.
Konteks
Pekerjaan Azure Stream Analytics (ASA) memproses data yang berasal dari aliran. Aliran adalah urutan data mentah yang ditransmisikan berseri (CSV, JSON, AVRO...). Untuk membaca dari aliran, aplikasi perlu mengetahui format serialisasi khusus yang digunakan. Di ASA, format serialisasi peristiwa harus ditentukan saat mengonfigurasi input streaming.
Setelah data dideserialisasi, skema perlu diterapkan untuk memberikan makna. Yang kami maksud dengan skema adalah daftar bidang dalam aliran, dan jenis datanya masing-masing. Dengan ASA, skema data yang masuk tidak perlu diatur pada level input. ASA malah mendukung skema masukan dinamis secara asli. Ini mengharapkan daftar bidang (kolom), dan jenisnya, berubah di antara peristiwa (baris). ASA juga akan menyimpulkan jenis data ketika tidak ada yang disediakan secara eksplisit, dan mencoba untuk secara implisit mentransmisikan jenis saat diperlukan.
Penanganan skema dinamis adalah fitur yang kuat, kunci untuk pemrosesan streaming. Aliran data sering kali berisi data dari berbagai sumber, dengan beberapa jenis peristiwa, masing-masing dengan skema unik. Untuk merutekan, memfilter, dan memproses peristiwa pada aliran tersebut, ASA harus menyerap semuanya apa pun skemanya.

Tetapi kemampuan yang ditawarkan oleh penanganan skema dinamis disertai dengan potensi kerugian. Peristiwa tak terduga dapat mengalir melalui logika kueri utama dan memecahkannya. Sebagai contoh, kita dapat menggunakan ROUND pada bidang jenis NVARCHAR(MAX). ASA secara implisit akan mentransmisikannya ke float agar sesuai dengan tanda tangan ROUND. Di sini kita mengharapkan, atau berharap, bidang ini akan selalu berisi nilai numerik. Tetapi ketika kita menerima suatu peristiwa dengan bidang yang diatur ke "NaN", atau jika bidang sepenuhnya hilang, pekerjaan itu mungkin gagal.
Dengan validasi input, kami menambahkan langkah-langkah awal ke kueri kami untuk menangani peristiwa yang salah informasi tersebut. Kami terutama akan menggunakan WITH dan TRY_CAST untuk menerapkannya.
Skenario: validasi input untuk produsen peristiwa yang tidak dapat diandalkan
Kami akan membangun pekerjaan ASA baru yang akan menyerap data dari satu hub peristiwa. Seperti yang sering terjadi, kami tidak bertanggung jawab atas pembuat data. Di sini produsennya adalah perangkat IoT yang dijual oleh beberapa vendor perangkat keras.
Bertemu dengan para pemangku kepentingan, kami menyepakati format dan skema serialisasi. Semua perangkat akan mendorong pesan tersebut ke hub peristiwa umum, input dari pekerjaan ASA.
Kontrak skema didefinisikan sebagai berikut:
| Nama bidang | Jenis bidang | Deskripsi bidang |
|---|---|---|
deviceId |
Bilangan bulat | Pengidentifikasi perangkat unik |
readingTimestamp |
Tanggalwaktu | Waktu pesan, yang dihasilkan oleh gateway pusat |
readingStr |
String | |
readingNum |
Numerik | |
readingArray |
Array String |
Yang pada gilirannya memberi kita contoh pesan berikut di bawah serialisasi JSON:
{
"deviceId" : 1,
"readingTimestamp" : "2021-12-10T10:00:00",
"readingStr" : "A String",
"readingNum" : 1.7,
"readingArray" : ["A","B"]
}
Kita sudah bisa melihat perbedaan antara kontrak skema dan implementasinya. Dalam format JSON, tidak ada jenis data untuk tanggalwaktu. Ini akan ditransmisikan sebagai string (lihat readingTimestamp di atas). ASA dapat dengan mudah mengatasi masalah ini, tetapi ini menunjukkan perlunya memvalidasi dan secara eksplisit mentransmisikan jenis. Terlebih lagi untuk data yang diserialisasi dalam CSV, karena semua nilai kemudian ditransmisikan sebagai string.
Ada perbedaan lain. ASA menggunakan sistem jenisnya sendiri yang tidak cocok dengan sistem yang masuk. Jika ASA memiliki jenis bawaan untuk bilangan bulat (bigint), tanggalwaktu, string (nvarchar(max)) dan array, ASA hanya mendukung numerik melalui float. Ketidakcocokan ini tidak menjadi masalah bagi sebagian besar aplikasi. Tetapi dalam kasus edge tertentu, itu bisa menyebabkan sedikit penyimpangan dalam presisi. Dalam hal ini, kita akan mengonversi nilai numerik sebagai string di bidang baru. Kemudian di hilir, kita akan menggunakan sistem yang mendukung desimal tetap untuk mendeteksi dan memperbaiki potensi penyimpangan.
Kembali ke kueri, di sini kami bermaksud untuk:
- Teruskan
readingStrke JavaScript UDF - Hitung jumlah rekaman dalam array
- Bulatkan
readingNumke tempat desimal kedua - Menyisipkan data ke dalam tabel SQL
Tabel SQL tujuan memiliki skema berikut:
CREATE TABLE [dbo].[readings](
[Device_Id] int NULL,
[Reading_Timestamp] datetime2(7) NULL,
[Reading_String] nvarchar(200) NULL,
[Reading_Num] decimal(18,2) NULL,
[Array_Count] int NULL
) ON [PRIMARY]
Ini adalah praktik yang baik untuk memetakan apa yang terjadi pada setiap bidang saat melewati pekerjaan:
| Bidang | Input (JSON) | Jenis yang diwariskan (ASA) | Output (Azure SQL) | Komentar |
|---|---|---|---|---|
deviceId |
number | bigint | Integer | |
readingTimestamp |
string | nvarchar(MAX) | tanggalwaktu2 | |
readingStr |
string | nvarchar(MAX) | nvarchar(200) | digunakan oleh UDF |
readingNum |
number | float | desimal(18,2) | untuk dibulatkan |
readingArray |
array(string) | array dari nvarchar(MAX) | Integer | untuk dihitung |
Prasyarat
Kami akan mengembangkan kueri dalam Visual Studio Code menggunakan ekstensi Alat ASA. Langkah pertama dari tutorial ini akan memandu Anda dalam memasang komponen yang diperlukan.
Dalam Visual Studio Code, kami akan menggunakan penjalanan lokal dengan I/O lokal agar tidak dikenakan biaya apa pun, dan mempercepat perulangan debug. Kami tidak perlu menyiapkan hub peristiwa atau Azure SQL Database.
Kueri dasar
Mari kita mulai dengan implementasi dasar, tanpa validasi input. Kita akan menambahkannya di bagian berikutnya.
Di Visual Studio Code, kita akan membuat proyek ASA baru
Di folder input, kita akan membuat file JSON baru bernama data_readings.json dan menambahkan catatan berikut ke dalamnya:
[
{
"deviceId" : 1,
"readingTimestamp" : "2021-12-10T10:00:00",
"readingStr" : "A String",
"readingNum" : 1.7145,
"readingArray" : ["A","B"]
},
{
"deviceId" : 2,
"readingTimestamp" : "2021-12-10T10:01:00",
"readingStr" : "Another String",
"readingNum" : 2.378,
"readingArray" : ["C"]
},
{
"deviceId" : 3,
"readingTimestamp" : "2021-12-10T10:01:20",
"readingStr" : "A Third String",
"readingNum" : -4.85436,
"readingArray" : ["D","E","F"]
},
{
"deviceId" : 4,
"readingTimestamp" : "2021-12-10T10:02:10",
"readingStr" : "A Forth String",
"readingNum" : 1.2126,
"readingArray" : ["G","G"]
}
]
Kemudian kita akan menentukan input lokal, yang disebut readings, yang merujuk pada file JSON yang kita buat di atas.
Setelah dikonfigurasi akan terlihat seperti ini:
{
"InputAlias": "readings",
"Type": "Data Stream",
"Format": "Json",
"FilePath": "data_readings.json",
"ScriptType": "InputMock"
}
Dengan data pratinjau, kita dapat mengamati bahwa catatan kita dimuat dengan benar.
Kita akan membuat UDF JavaScript baru yang dipanggil udfLen dengan mengeklik kanan folder Functions dan memilih ASA: Add Function. Kode yang akan kita gunakan adalah:
// Sample UDF that returns the length of a string for demonstration only: LEN will return the same thing in ASAQL
function main(arg1) {
return arg1.length;
}
Dalam eksekusi lokal, kita tidak perlu menentukan output. Kita bahkan tidak perlu menggunakan INTO kecuali ada lebih dari satu output. Dalam file .asaql, kita dapat mengganti kueri yang ada dengan:
SELECT
r.deviceId,
r.readingTimestamp,
SUBSTRING(r.readingStr,1,200) AS readingStr,
ROUND(r.readingNum,2) AS readingNum,
COUNT(a.ArrayValue) AS arrayCount
FROM readings AS r TIMESTAMP BY r.readingTimestamp
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
System.Timestamp(), --snapshot window
r.deviceId,
r.readingTimestamp,
r.readingStr,
r.readingNum
Mari kita segera menelusuri kueri yang kita kirimkan:
- Untuk menghitung jumlah catatan di setiap array, pertama-tama kita perlu membongkarnya. Kita akan menggunakan CROSS APPLY dan GetArrayElements() (lebih banyak sampel di sini)
- Dengan demikian, kita memunculkan dua set data dalam kueri: input asli dan nilai array. Untuk memastikan tidak mencampuradukkan bidang, kita menentukan alias (
AS r) dan menggunakannya di mana-mana - Kemudian untuk benar-benar
COUNTnilai array, kita perlu mengagregasi dengan GROUP BY - Untuk itu kita harus menentukan jendela waktu. Di sini karena kita tidak memerlukannya untuk logika kita, jendela snapshot adalah pilihan yang tepat
- Dengan demikian, kita memunculkan dua set data dalam kueri: input asli dan nilai array. Untuk memastikan tidak mencampuradukkan bidang, kita menentukan alias (
- Kita juga harus
GROUP BYsemua bidang, dan memproyeksikan semuanya diSELECT. Secara eksplisit memproyeksikan bidang adalah praktik yang baik, karenaSELECT *akan membiarkan kesalahan mengalir dari input ke output- Jika kita menentukan jendela waktu, kita mungkin ingin menentukan stempel waktu dengan TIMESTAMP BY. Di sini logika kita tidak perlu bekerja. Untuk eksekusi lokal, tanpa
TIMESTAMP BYrekaman catatan dimuat pada satu stempel waktu, waktu mulai eksekusi.
- Jika kita menentukan jendela waktu, kita mungkin ingin menentukan stempel waktu dengan TIMESTAMP BY. Di sini logika kita tidak perlu bekerja. Untuk eksekusi lokal, tanpa
- Kita menggunakan UDF untuk memfilter pembacaan di mana
readingStrmemiliki kurang dari dua karakter. Kita seharusnya menggunakan LEN di sini. Kita menggunakan UDF hanya untuk tujuan demonstrasi
Kita dapat memulai eksekusi dan mengamati data yang sedang diproses:
| deviceId | readingTimestamp | readingStr | readingNum | arrayCount |
|---|---|---|---|---|
| 1 | 2021-12-10T10:00:00 | String | 1,71 | 2 |
| 2 | 2021-12-10T10:01:00 | String Lain | 2,38 | 1 |
| 3 | 2021-12-10T10:01:20 | String Ketiga | -4,85 | 3 |
| 1 | 2021-12-10T10:02:10 | String Keempat | 1.21 | 2 |
Sekarang setelah kita mengetahui bahwa kueri kita berfungsi, mari kita uji dengan lebih banyak data. Mari kita ganti konten data_readings.json dengan rekaman berikut:
[
{
"deviceId" : 1,
"readingTimestamp" : "2021-12-10T10:00:00",
"readingStr" : "A String",
"readingNum" : 1.7145,
"readingArray" : ["A","B"]
},
{
"deviceId" : 2,
"readingTimestamp" : "2021-12-10T10:01:00",
"readingNum" : 2.378,
"readingArray" : ["C"]
},
{
"deviceId" : 3,
"readingTimestamp" : "2021-12-10T10:01:20",
"readingStr" : "A Third String",
"readingNum" : "NaN",
"readingArray" : ["D","E","F"]
},
{
"deviceId" : 4,
"readingTimestamp" : "2021-12-10T10:02:10",
"readingStr" : "A Forth String",
"readingNum" : 1.2126,
"readingArray" : {}
}
]
Di sini kita dapat melihat masalah berikut:
- Perangkat #1 melakukan segalanya dengan benar
- Perangkat #2 lupa menyertakan
readingStr - Perangkat #3 mengirim
NaNsebagai angka - Perangkat #4 mengirim rekaman kosong, bukan array
Menjalankan pekerjaan sekarang seharusnya tidak berakhir dengan baik. Kita akan mendapatkan salah satu pesan kesalahan berikut:
Perangkat 2 akan memberi kita:
[Error] 12/22/2021 10:05:59 PM : **System Exception** Function 'udflen' resulted in an error: 'TypeError: Unable to get property 'length' of undefined or null reference' Stack: TypeError: Unable to get property 'length' of undefined or null reference at main (Unknown script code:3:5)
[Error] 12/22/2021 10:05:59 PM : at Microsoft.EventProcessing.HostedRuntimes.JavaScript.JavaScriptHostedFunctionsRuntime.
Perangkat 3 akan memberi kita:
[Error] 12/22/2021 9:52:32 PM : **System Exception** The 1st argument of function round has invalid type 'nvarchar(max)'. Only 'bigint', 'float' is allowed.
[Error] 12/22/2021 9:52:32 PM : at Microsoft.EventProcessing.SteamR.Sql.Runtime.Arithmetics.Round(CompilerPosition pos, Object value, Object length)
Perangkat 4 akan memberi kita:
[Error] 12/22/2021 9:50:41 PM : **System Exception** Cannot cast value of type 'record' to type 'array' in expression 'r . readingArray'. At line '9' and column '30'. TRY_CAST function can be used to handle values with unexpected type.
[Error] 12/22/2021 9:50:41 PM : at Microsoft.EventProcessing.SteamR.Sql.Runtime.Cast.ToArray(CompilerPosition pos, Object value, Boolean isUserCast)
Setiap kali catatan yang salah informasi diizinkan mengalir dari input ke logika kueri utama tanpa divalidasi. Sekarang kita menyadari nilai validasi input.
Menerapkan validasi input
Mari kita perpanjang kueri kita untuk memvalidasi input.
Langkah pertama validasi input adalah menentukan harapan skema logika bisnis inti. Melihat kembali persyaratan asli, logika utama kami adalah:
- Teruskan
readingStrke JavaScript UDF untuk mengukur panjangnya - Hitung jumlah rekaman dalam array
- Bulatkan
readingNumke tempat desimal kedua - Menyisipkan data ke dalam tabel SQL
Untuk setiap poin kita dapat membuat daftar harapan:
- UDF memerlukan argumen jenis string (nvarchar(max) di sini) yang tidak bisa null
GetArrayElements()memerlukan argumen jenis array, atau nilai nullRoundmemerlukan argumen jenis bigint atau float, atau nilai nol- Daripada mengandalkan transmisi implisit ASA, kita harus melakukannya sendiri dan menangani konflik jenis dalam kueri
Salah satu cara yang harus dilakukan adalah mengadaptasi logika utama untuk menangani pengecualian ini. Namun dalam hal ini, kami percaya logika utama kami sempurna. Jadi mari kita validasi data yang masuk.
Pertama, mari gunakan WITH untuk menambahkan lapisan validasi input sebagai langkah pertama kueri. Kita akan menggunakan TRY_CAST untuk mengonversi bidang ke jenis yang diharapkan, dan mengaturnya ke NULL jika konversi gagal:
WITH readingsValidated AS (
SELECT
-- Rename incoming fields, used for audit and debugging
deviceId AS in_deviceId,
readingTimestamp AS in_readingTimestamp,
readingStr AS in_readingStr,
readingNum AS in_readingNum,
readingArray AS in_readingArray,
-- Try casting fields in their expected type
TRY_CAST(deviceId AS bigint) as deviceId,
TRY_CAST(readingTimestamp AS datetime) as readingTimestamp,
TRY_CAST(readingStr AS nvarchar(max)) as readingStr,
TRY_CAST(readingNum AS float) as readingNum,
TRY_CAST(readingArray AS array) as readingArray
FROM readings TIMESTAMP BY readingTimestamp
)
-- For debugging only
SELECT * FROM readingsValidated
Dengan file input terakhir yang kita gunakan (yang memiliki kesalahan), kueri ini akan mengembalikan set berikut:
| in_deviceId | in_readingTimestamp | in_readingStr | in_readingNum | in_readingArray | deviceId | readingTimestamp | readingStr | readingNum | readingArray |
|---|---|---|---|---|---|---|---|---|---|
| 1 | 2021-12-10T10:00:00 | String | 1,7145 | ["A","B"] | 1 | 2021-12-10T10:00:00.0000000Z | String | 1,7145 | ["A","B"] |
| 2 | 2021-12-10T10:01:00 | NULL | 2,378 | ["C"] | 2 | 2021-12-10T10:01:00.0000000Z | NULL | 2,378 | ["C"] |
| 3 | 2021-12-10T10:01:20 | String Ketiga | NaN | ["D","E","F"] | 3 | 2021-12-10T10:01:20.0000000Z | String Ketiga | NULL | ["D","E","F"] |
| 4 | 2021-12-10T10:02:10 | String Keempat | 1,2126 | {} | 4 | 2021-12-10T10:02:10.0000000Z | String Keempat | 1,2126 | NULL |
Kita sudah dapat melihat dua kesalahan kita sedang ditangani. Kita mengubah NaN dan {} menjadi NULL. Kita sekarang yakin rekaman ini akan dimasukkan dengan benar di tabel SQL tujuan.
Kita sekarang harus memutuskan bagaimana mengatasi rekaman dengan nilai yang hilang atau tidak valid. Setelah beberapa diskusi, kita memutuskan untuk menolak catatan dengan readingArray kosong/tidak valid atau readingStr yang hilang.
Jadi kita menambahkan lapisan kedua yang akan melakukan triase catatan antara yang validasi dan logika utama:
WITH readingsValidated AS (
...
),
readingsToBeProcessed AS (
SELECT
deviceId,
readingTimestamp,
readingStr,
readingNum,
readingArray
FROM readingsValidated
WHERE
readingStr IS NOT NULL
AND readingArray IS NOT NULL
),
readingsToBeRejected AS (
SELECT
*
FROM readingsValidated
WHERE -- Same clauses as readingsToBeProcessed, opposed with NOT
NOT (
readingStr IS NOT NULL
AND readingArray IS NOT NULL
)
)
-- For debugging only
SELECT * INTO Debug1 FROM readingsToBeProcessed
SELECT * INTO Debug2 FROM readingsToBeRejected
Merupakan praktik yang baik untuk menulis satu klausul WHERE untuk kedua output, dan menggunakan NOT (...) di yang kedua. Dengan cara itu tidak ada catatan yang dapat dikecualikan baik dari output maupun yang hilang.
Sekarang kita mendapatkan dua output. Debug1 memiliki rekaman yang akan dikirim ke logika utama:
| deviceId | readingTimestamp | readingStr | readingNum | readingArray |
|---|---|---|---|---|
| 1 | 2021-12-10T10:00:00.0000000Z | String | 1,7145 | ["A","B"] |
| 3 | 2021-12-10T10:01:20.0000000Z | String Ketiga | NULL | ["D","E","F"] |
Debug2 memiliki rekaman yang akan ditolak:
| in_deviceId | in_readingTimestamp | in_readingStr | in_readingNum | in_readingArray | deviceId | readingTimestamp | readingStr | readingNum | readingArray |
|---|---|---|---|---|---|---|---|---|---|
| 2 | 2021-12-10T10:01:00 | NULL | 2,378 | ["C"] | 2 | 2021-12-10T10:01:00.0000000Z | NULL | 2,378 | ["C"] |
| 4 | 2021-12-10T10:02:10 | String Keempat | 1,2126 | {} | 4 | 2021-12-10T10:02:10.0000000Z | String Keempat | 1,2126 | NULL |
Langkah terakhir adalah menambahkan logika utama kita kembali. Kita juga akan menambahkan output yang mengumpulkan penolakan. Di sini yang terbaik adalah menggunakan adapter output yang tidak memaksakan pengetikan yang kuat, seperti akun penyimpanan.
Kueri lengkap dapat ditemukan di bagian terakhir.
WITH
readingsValidated AS (...),
readingsToBeProcessed AS (...),
readingsToBeRejected AS (...)
SELECT
r.deviceId,
r.readingTimestamp,
SUBSTRING(r.readingStr,1,200) AS readingStr,
ROUND(r.readingNum,2) AS readingNum,
COUNT(a.ArrayValue) AS arrayCount
INTO SQLOutput
FROM readingsToBeProcessed AS r
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
System.Timestamp(), --snapshot window
r.deviceId,
r.readingTimestamp,
r.readingStr,
r.readingNum
SELECT
*
INTO BlobOutput
FROM readingsToBeRejected
Yang akan memberi kita set berikut untuk SQLOutput, tanpa kemungkinan kesalahan:
| deviceId | readingTimestamp | readingStr | readingNum | readingArray |
|---|---|---|---|---|
| 1 | 2021-12-10T10:00:00.0000000Z | String | 1,7145 | 2 |
| 3 | 2021-12-10T10:01:20.0000000Z | String Ketiga | NULL | 3 |
Dua catatan lainnya dikirim ke BlobOutput untuk tinjauan manusia dan pemrosesan pasca. Kueri kita sekarang aman.
Contoh kueri dengan validasi input
WITH readingsValidated AS (
SELECT
-- Rename incoming fields, used for audit and debugging
deviceId AS in_deviceId,
readingTimestamp AS in_readingTimestamp,
readingStr AS in_readingStr,
readingNum AS in_readingNum,
readingArray AS in_readingArray,
-- Try casting fields in their expected type
TRY_CAST(deviceId AS bigint) as deviceId,
TRY_CAST(readingTimestamp AS datetime) as readingTimestamp,
TRY_CAST(readingStr AS nvarchar(max)) as readingStr,
TRY_CAST(readingNum AS float) as readingNum,
TRY_CAST(readingArray AS array) as readingArray
FROM readings TIMESTAMP BY readingTimestamp
),
readingsToBeProcessed AS (
SELECT
deviceId,
readingTimestamp,
readingStr,
readingNum,
readingArray
FROM readingsValidated
WHERE
readingStr IS NOT NULL
AND readingArray IS NOT NULL
),
readingsToBeRejected AS (
SELECT
*
FROM readingsValidated
WHERE -- Same clauses as readingsToBeProcessed, opposed with NOT
NOT (
readingStr IS NOT NULL
AND readingArray IS NOT NULL
)
)
-- Core business logic
SELECT
r.deviceId,
r.readingTimestamp,
SUBSTRING(r.readingStr,1,200) AS readingStr,
ROUND(r.readingNum,2) AS readingNum,
COUNT(a.ArrayValue) AS arrayCount
INTO SQLOutput
FROM readingsToBeProcessed AS r
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
System.Timestamp(), --snapshot window
r.deviceId,
r.readingTimestamp,
r.readingStr,
r.readingNum
-- Rejected output. For human review, correction, and manual re-insertion downstream
SELECT
*
INTO BlobOutput -- to a storage adapter that doesn't require strong typing, here blob/adls
FROM readingsToBeRejected
Memperluas validasi input
GetType dapat digunakan untuk secara eksplisit memeriksa jenis. Ini bekerja dengan baik dengan CASE dalam proyeksi, atau WHERE pada tingkat yang ditetapkan. GetType juga dapat digunakan untuk memeriksa skema masuk secara dinamis terhadap repositori metadata. Repositori dapat dimuat melalui himpunan data referensi.
Pengujian unit adalah praktik yang baik untuk memastikan kueri bersifat tangguh. Kita akan membangun serangkaian tes yang terdiri dari file input dan output yang diharapkan. Kueri harus cocok dengan output yang dihasilkannya untuk diteruskan. Di ASA, pengujian unit dilakukan melalui modul npm asa-streamanalytics-cicd. Kasus pengujian dengan berbagai peristiwa yang salah bentuk harus dibuat dan diuji dalam alur penyebaran.
Akhirnya, kita dapat melakukan beberapa pengujian integrasi ringan di Visual Studio Code. Kita dapat menyisipkan rekaman ke dalam tabel SQL melalui eksekusi lokal ke output langsung.
Dapatkan dukungan
Untuk bantuan lebih lanjut, coba halaman pertanyaan Microsoft Q&A untuk Azure Stream Analytics.
Langkah berikutnya
- Menyiapkan alur CI/CD menggunakan paket npm
- Gambaran Umum Analisis Aliran lokal berjalan di Visual Studio Code dengan Alat ASA
- Menguji kueri Azure Stream Analytics secara lokal dengan sampel data menggunakan Visual Studio Code
- Uji kueri Azure Stream Analytics secara lokal terhadap input streaming langsung menggunakan Visual Studio Code
- Menjelajahi pekerjaan Azure Stream Analytics dengan Visual Studio Code (Pratinjau)