Share via


Azure Stream Analytics 쿼리의 입력 유효성 검사

입력 유효성 검사는 형식이 잘못되었거나 예기치 않은 이벤트로부터 주 쿼리 논리를 보호하는 데 사용하는 기술입니다. 쿼리는 주 논리를 중단할 수 없도록 레코드를 명시적으로 처리하고 검사하도록 업그레이드됩니다.

입력 유효성 검사를 구현하기 위해 쿼리에 두 개의 초기 단계를 추가합니다. 먼저 핵심 비즈니스 논리에 제출된 스키마가 예상과 일치하는지 확인합니다. 그런 다음, 예외를 심사하고 필요에 따라 잘못된 레코드를 보조 출력으로 라우팅합니다.

입력 유효성 검사가 포함된 쿼리는 다음과 같이 구조화됩니다.

WITH preProcessingStage AS (
	SELECT
		-- Rename incoming fields, used for audit and debugging
		field1 AS in_field1,
		field2 AS in_field2,
		...

		-- Try casting fields in their expected type
		TRY_CAST(field1 AS bigint) as field1,
		TRY_CAST(field2 AS array) as field2,
		...

	FROM myInput TIMESTAMP BY myTimestamp
),

triagedOK AS (
	SELECT -- Only fields in their new expected type
		field1,
		field2,
		...
	FROM preProcessingStage
	WHERE ( ... ) -- Clauses make sure that the core business logic expectations are satisfied
),

triagedOut AS (
	SELECT -- All fields to ease diagnostic
		*
	FROM preProcessingStage
	WHERE NOT (...) -- Same clauses as triagedOK, opposed with NOT
)

-- Core business logic
SELECT
	...
INTO myOutput
FROM triagedOK
...

-- Audit output. For human review, correction, and manual re-insertion downstream
SELECT
	*
INTO BlobOutput -- To a storage adapter that doesn't require strong typing, here blob/adls
FROM triagedOut

입력 유효성 검사를 사용하여 설정된 쿼리의 포괄적인 예제를 보려면 입력 유효성 검사가 포함된 쿼리 예제 섹션을 참조하세요.

이 문서에서는 이 기술을 구현하는 방법을 보여줍니다.

Context

ASA(Azure Stream Analytics) 작업은 스트림에서 오는 데이터를 처리합니다. 스트림은 직렬화되어 전송되는 원시 데이터(CSV, JSON, AVRO...)의 시퀀스입니다. 스트림에서 읽으려면 사용되는 직렬화 형식을 애플리케이션이 알고 있어야 합니다. ASA에서 스트리밍 입력을 구성할 때 이벤트 직렬화 형식을 정의해야 합니다.

데이터가 역직렬화되면 스키마를 적용하여 의미를 부여해야 합니다. 스키마를 통해 스트림의 필드 목록과 해당 데이터 형식에 의미를 부여합니다. ASA를 사용하면 들어오는 데이터의 스키마를 입력 수준에서 설정할 필요가 없습니다. 대신 ASA는 동적 입력 스키마를 기본적으로 지원합니다. ASA는 필드(열) 목록과 해당 형식이 이벤트(행) 간에 변경되어야 합니다. 또한 ASA는 데이터 형식이 명시적으로 제공되지 않으면 데이터 형식을 유추하고 필요할 때 형식을 암시적으로 캐스팅하려고 합니다.

동적 스키마 처리는 강력한 기능이며 스트림 처리의 핵심입니다. 데이터 스트림에는 여러 원본의 데이터가 포함되는 경우가 많으며, 여라 가지 이벤트 유형이 있고 이벤트마다 고유한 스키마가 있습니다. 이러한 스트림에서 이벤트를 라우팅, 필터링 및 처리하려면 ASA에서 모든 스키마를 수집해야 합니다.

Illustration of a pipeline with two fleet of devices sending data with conflicting schemas

그러나 동적 스키마 처리에서 제공하는 기능에는 잠재적인 단점이 있습니다. 예기치 않은 이벤트가 주 쿼리 논리를 따라 이동하다가 쿼리를 중단시킬 수 있습니다. 예를 들어 NVARCHAR(MAX) 형식 필드에 ROUND를 사용할 수 있습니다. ASA는 이를 암시적으로 부동 소수로 캐스팅하여 ROUND의 서명과 매칭합니다. 여기서 우리는 이 필드에 항상 숫자 값이 포함되기를 예상하거나 희망합니다. 그러나 필드가 "NaN"으로 설정된 이벤트를 수신하거나 필드가 완전히 누락된 경우 작업이 실패할 수 있습니다.

입력 유효성 검사를 통해 이처럼 형식이 잘못된 이벤트를 처리하는 예비 단계를 쿼리에 추가합니다. 주로 WITHTRY_CAST를 사용하여 구현할 것입니다.

시나리오: 신뢰할 수 없는 이벤트 생산자에 대한 입력 유효성 검사

단일 이벤트 허브에서 데이터를 수집할 새 ASA 작업을 빌드하겠습니다. 대부분 그렇듯이, 데이터 생산자는 저희 책임이 아닙니다. 여기서 말하는 생산자는 여러 하드웨어 공급업체에서 판매하는 IoT 디바이스입니다.

저희는 관련자와 만나 직렬화 형식 및 스키마에 대해 합의합니다. 모든 디바이스는 이러한 메시지를 ASA 작업의 입력인 공통 이벤트 허브로 푸시합니다.

스키마 계약은 다음과 같이 정의됩니다.

필드 이름 필드 유형 필드 설명
deviceId 정수 고유한 디바이스 식별자
readingTimestamp DateTime 중앙 게이트웨이에서 생성된 메시지 시간
readingStr 문자열
readingNum 숫자
readingArray 문자열의 배열

그러면 JSON 직렬화 아래에 다음과 같은 샘플 메시지가 제공됩니다.

{
    "deviceId" : 1,
    "readingTimestamp" : "2021-12-10T10:00:00",
    "readingStr" : "A String",
    "readingNum" : 1.7,
    "readingArray" : ["A","B"]
}

스키마 계약과 해당 구현 간의 불일치를 이미 확인할 수 있습니다. JSON 형식에는 날짜/시간에 대한 데이터 형식이 없습니다. 문자열로 전송됩니다(위의 readingTimestamp 참조). ASA는 이 문제를 쉽게 해결할 수 있지만, 형식의 유효성을 검사하고 명시적으로 캐스팅해야 합니다. 모든 값이 문자열로 전송되기 때문에 훨씬 많은 데이터가 CSV로 직렬화됩니다.

또 다른 불일치가 있습니다. ASA는 들어오는 시스템과 일치하지 않는 자체적인 형식 시스템을 사용합니다. ASA에 정수(bigint), 날짜/시간, 문자열(nvarchar(max)) 및 배열에 대한 기본 형식이 있는 경우 부동 소수를 통한 숫자만 지원합니다. 이 불일치는 대부분의 애플리케이션에서 문제가 되지 않습니다. 그러나 특정 에지의 경우 정밀도에서 약간의 드리프트가 발생할 수 있습니다. 이 경우 숫자 값을 새 필드의 문자열로 변환합니다. 그 후 다운스트림합니다. 고정 소수점을 지원하는 시스템을 사용하여 잠재적인 드리프트를 탐지하고 수정합니다.

쿼리로 돌아가서, 다음을 수행합니다.

  • readingStrJavaScript UDF에 전달
  • 배열의 레코드 수 계산
  • readingNum을 두 번째 소수 자릿수로 반올림
  • SQL 테이블에 데이터 삽입

대상 SQL 테이블에는 다음과 같은 스키마가 있습니다.

CREATE TABLE [dbo].[readings](
    [Device_Id] int NULL,
    [Reading_Timestamp] datetime2(7) NULL,
    [Reading_String] nvarchar(200) NULL,
    [Reading_Num] decimal(18,2) NULL,
    [Array_Count] int NULL
) ON [PRIMARY]

작업을 수행할 때 각 필드에 발생하는 작업을 매핑하는 것이 좋습니다.

필드 입력(JSON) 상속된 형식(ASA) 출력(Azure SQL) Comment(설명)
deviceId number bigint 정수
readingTimestamp string nvarchar(MAX) datetime2
readingStr string nvarchar(MAX) nvarchar(200) UDF에서 사용
readingNum number float 10진수(18,2) 반올림
readingArray array(string) nvarchar(MAX) 배열 정수 계산

필수 조건

Visual Studio Code에서 ASA Tools 확장을 사용하여 쿼리를 개발할 것입니다. 이 자습서의 첫 번째 단계에서는 필요한 구성 요소를 설치하는 단계를 안내합니다.

VS Code에서 로컬 입력/출력과 함께 로컬 실행을 사용하여 비용을 발생시키지 않고, 디버깅 루프 속도를 높이겠습니다. 이벤트 허브 또는 Azure SQL Database를 설정할 필요가 없습니다.

기본 쿼리

입력 유효성 검사 없이 기본 구현부터 시작하겠습니다. 다음 섹션에서 추가할 것입니다.

VS Code에서 새 ASA 프로젝트를 만듭니다.

input 폴더에 data_readings.json이라는 새 JSON 파일을 만들고 이 파일에 다음 레코드를 추가합니다.

[
    {
        "deviceId" : 1,
        "readingTimestamp" : "2021-12-10T10:00:00",
        "readingStr" : "A String",
        "readingNum" : 1.7145,
        "readingArray" : ["A","B"]
    },
    {
        "deviceId" : 2,
        "readingTimestamp" : "2021-12-10T10:01:00",
        "readingStr" : "Another String",
        "readingNum" : 2.378,
        "readingArray" : ["C"]
    },
    {
        "deviceId" : 3,
        "readingTimestamp" : "2021-12-10T10:01:20",
        "readingStr" : "A Third String",
        "readingNum" : -4.85436,
        "readingArray" : ["D","E","F"]
    },
    {
        "deviceId" : 4,
        "readingTimestamp" : "2021-12-10T10:02:10",
        "readingStr" : "A Forth String",
        "readingNum" : 1.2126,
        "readingArray" : ["G","G"]
    }
]

그런 다음, 위에서 만든 JSON 파일을 참조하는 readings라는 로컬 입력을 정의합니다.

구성이 끝나면 다음과 비슷한 형태입니다.

{
    "InputAlias": "readings",
    "Type": "Data Stream",
    "Format": "Json",
    "FilePath": "data_readings.json",
    "ScriptType": "InputMock"
}

미리 보기 데이터를 사용하여 레코드가 제대로 로드되는지 확인할 수 있습니다.

Functions 폴더를 마우스 오른쪽 단추로 클릭하고 ASA: Add Function을 선택하여 udfLen이라는 새 JavaScript UDF를 만듭니다. 사용할 코드는 다음과 같습니다.

// Sample UDF that returns the length of a string for demonstration only: LEN will return the same thing in ASAQL
function main(arg1) {
    return arg1.length;
}

로컬 실행에서는 출력을 정의할 필요가 없습니다. 출력이 하나뿐인 경우에는 INTO를 사용할 필요도 없습니다. .asaql 파일에서 기존 쿼리를 다음으로 바꿀 수 있습니다.

SELECT
	r.deviceId,
	r.readingTimestamp,
	SUBSTRING(r.readingStr,1,200) AS readingStr,
	ROUND(r.readingNum,2) AS readingNum,
	COUNT(a.ArrayValue) AS arrayCount
FROM readings AS r TIMESTAMP BY r.readingTimestamp
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
	System.Timestamp(), --snapshot window
	r.deviceId,
	r.readingTimestamp,
	r.readingStr,
	r.readingNum

제출한 쿼리를 빠르게 살펴보겠습니다.

  • 각 배열의 레코드 수를 계산하려면 먼저 압축을 풀어야 합니다. CROSS APPLYGetArrayElements()를 사용하겠습니다(더 많은 샘플은 여기).
    • 이렇게 하면 쿼리에 원래 입력과 배열 값이라는 두 개의 데이터 세트가 표시됩니다. 필드를 혼합하지 않도록 별칭(AS r)을 정의하고 모든 곳에서 사용합니다.
    • 그런 다음, 실제로 배열 값을 COUNT하려면 GROUP BY를 사용하여 집계해야 합니다.
    • 이렇게 하려면 시간 범위를 정의해야 합니다. 여기서는 논리에 시간 범위가 필요 없으므로 스냅샷 창이 올바른 선택입니다.
  • 또한 모든 필드를 GROUP BY하고, SELECT에서 모든 필드를 프로젝션해야 합니다. SELECT *는 입력에서 출력으로 오류가 전달되는 것을 허용하므로 필드를 명시적으로 프로젝션하는 것이 좋습니다.
    • 기간을 정의하는 경우 TIMESTAMP BY를 사용하여 타임스탬프를 정의할 수 있습니다. 여기서는 논리가 작동할 필요가 없습니다. 로컬 실행의 경우 TIMESTAMP BY가 없으면 모든 레코드가 실행 시작 시간인 단일 타임스탬프에 로드됩니다.
  • 여기서는 UDF를 사용하여 readingStr이 2자 미만인 판독값을 필터링합니다. LEN을 사용했어야 합니다. 여기서는 UDF를 데모용으로만 사용하고 있습니다.

실행을 시작하고 처리되는 데이터를 관찰할 수 있습니다.

deviceId readingTimestamp readingStr readingNum arrayCount
1 2021-12-10T10:00:00 문자열 1.71 2
2 2021-12-10T10:01:00 다른 문자열 2.38 1
3 2021-12-10T10:01:20 세 번째 문자열 -4.85 3
1 2021-12-10T10:02:10 네 번째 문자열 1.21 2

이제 쿼리가 작동한다는 것을 알고 있으므로, 더 많은 데이터를 대상으로 쿼리를 테스트하겠습니다. data_readings.json의 내용을 다음 레코드로 바꾸겠습니다.

[
    {
        "deviceId" : 1,
        "readingTimestamp" : "2021-12-10T10:00:00",
        "readingStr" : "A String",
        "readingNum" : 1.7145,
        "readingArray" : ["A","B"]
    },
    {
        "deviceId" : 2,
        "readingTimestamp" : "2021-12-10T10:01:00",
        "readingNum" : 2.378,
        "readingArray" : ["C"]
    },
    {
        "deviceId" : 3,
        "readingTimestamp" : "2021-12-10T10:01:20",
        "readingStr" : "A Third String",
        "readingNum" : "NaN",
        "readingArray" : ["D","E","F"]
    },
    {
        "deviceId" : 4,
        "readingTimestamp" : "2021-12-10T10:02:10",
        "readingStr" : "A Forth String",
        "readingNum" : 1.2126,
        "readingArray" : {}
    }
]

여기서는 다음과 같은 문제를 확인할 수 있습니다.

  • #1 디바이스는 모든 것을 올바르게 처리했습니다.
  • #2 디바이스는 readingStr을 포함하는 것을 잊었습니다.
  • #3 디바이스는 NaN을 숫자로 보냈습니다.
  • #4 디바이스는 배열 대신 빈 레코드를 보냈습니다.

지금 작업을 실행하면 정상적으로 종료되지 않습니다. 다음 오류 메시지 중 하나가 표시됩니다.

디바이스 2는 다음 오류를 표시합니다.

[Error] 12/22/2021 10:05:59 PM : **System Exception** Function 'udflen' resulted in an error: 'TypeError: Unable to get property 'length' of undefined or null reference' Stack: TypeError: Unable to get property 'length' of undefined or null reference at main (Unknown script code:3:5)
[Error] 12/22/2021 10:05:59 PM :    at Microsoft.EventProcessing.HostedRuntimes.JavaScript.JavaScriptHostedFunctionsRuntime.

디바이스 3은 다음 오류를 표시합니다.

[Error] 12/22/2021 9:52:32 PM : **System Exception** The 1st argument of function round has invalid type 'nvarchar(max)'. Only 'bigint', 'float' is allowed.
[Error] 12/22/2021 9:52:32 PM :    at Microsoft.EventProcessing.SteamR.Sql.Runtime.Arithmetics.Round(CompilerPosition pos, Object value, Object length)

디바이스 4는 다음 오류를 표시합니다.

[Error] 12/22/2021 9:50:41 PM : **System Exception** Cannot cast value of type 'record' to type 'array' in expression 'r . readingArray'. At line '9' and column '30'. TRY_CAST function can be used to handle values with unexpected type.
[Error] 12/22/2021 9:50:41 PM :    at Microsoft.EventProcessing.SteamR.Sql.Runtime.Cast.ToArray(CompilerPosition pos, Object value, Boolean isUserCast)

매번 잘못된 형식의 레코드가 유효성 검사 없이 입력에서 주 쿼리 논리로 흐르도록 허용되었습니다. 이제 우리는 입력 유효성 검사의 가치를 알게 되었습니다.

입력 유효성 검사 구현

쿼리를 확장하여 입력의 유효성을 검사해 보겠습니다.

입력 유효성 검사의 첫 번째 단계는 핵심 비즈니스 논리의 스키마 기대치를 정의하는 것입니다. 원래 요구 사항을 되돌아보면, 기본 논리는 다음과 같습니다.

  • readingStrJavaScript UDF에 전달하여 길이 측정
  • 배열의 레코드 수 계산
  • readingNum을 두 번째 소수 자릿수로 반올림
  • SQL 테이블에 데이터 삽입

다음과 같은 각 지점의 기대치를 나열할 수 있습니다.

  • UDF에는 null이면 안 되는 문자열 형식의 인수(여기서는 nvarchar(max))가 필요합니다.
  • GetArrayElements()에는 배열 형식의 인수 또는 null 값이 필요합니다.
  • Round에는 bigint 또는 float 형식의 인수나 null 값이 필요합니다.
  • ASA의 암시적 캐스팅을 사용하지 말고, 직접 수행하고 쿼리에서 형식 충돌을 처리해야 합니다.

이렇게 하는 한 가지 방법은 이러한 예외를 처리하도록 주 논리를 조정하는 것입니다. 하지만 여기서는 우리의 주 논리가 완벽하다고 믿습니다. 따라서 들어오는 데이터의 유효성을 검사해 보겠습니다.

먼저 WITH를 사용하여 입력 유효성 검사 계층을 쿼리의 첫 번째 단계로 추가해 보겠습니다. TRY_CAST를 사용하여 필드를 필요한 형식으로 변환하고, 변환이 실패하는 경우 NULL로 설정하겠습니다.

WITH readingsValidated AS (
	SELECT
		-- Rename incoming fields, used for audit and debugging
		deviceId AS in_deviceId,
		readingTimestamp AS in_readingTimestamp,
		readingStr AS in_readingStr,
		readingNum AS in_readingNum,
		readingArray AS in_readingArray,

		-- Try casting fields in their expected type
		TRY_CAST(deviceId AS bigint) as deviceId,
		TRY_CAST(readingTimestamp AS datetime) as readingTimestamp,
		TRY_CAST(readingStr AS nvarchar(max)) as readingStr,
		TRY_CAST(readingNum AS float) as readingNum,
		TRY_CAST(readingArray AS array) as readingArray

	FROM readings TIMESTAMP BY readingTimestamp
)

-- For debugging only
SELECT * FROM readingsValidated

마지막으로 사용한 입력 파일(오류가 있는 입력 파일)의 경우 이 쿼리가 다음 세트를 반환합니다.

in_deviceId in_readingTimestamp in_readingStr in_readingNum in_readingArray deviceId readingTimestamp readingStr readingNum readingArray
1 2021-12-10T10:00:00 문자열 1.7145 ["A","B"] 1 2021-12-10T10:00:00.0000000Z 문자열 1.7145 ["A","B"]
2 2021-12-10T10:01:00 NULL 2.378 ["C"] 2 2021-12-10T10:01:00.0000000Z NULL 2.378 ["C"]
3 2021-12-10T10:01:20 세 번째 문자열 NaN ["D","E","F"] 3 2021-12-10T10:01:20.0000000Z 세 번째 문자열 NULL ["D","E","F"]
4 2021-12-10T10:02:10 네 번째 문자열 1.2126 {} 4 2021-12-10T10:02:10.0000000Z 네 번째 문자열 1.2126 NULL

두 가지 오류가 해결되는 것을 볼 수 있습니다. 우리는 NaN{}NULL로 변환했습니다. 이제 이러한 레코드가 대상 SQL 테이블에 제대로 삽입될 것을 확신할 수 있습니다.

이제 값이 누락되었거나 값이 잘못된 레코드를 처리하는 방법을 결정해야 합니다. 약간의 논의 후, 우리는 readingArray가 비어 있거나/잘못되었거나 readingStr이 누락된 레코드를 거부하기로 결정합니다.

따라서 유효성 검사 1과 주 논리 간에 레코드를 심사하는 두 번째 계층을 추가합니다.

WITH readingsValidated AS (
	...
),

readingsToBeProcessed AS (
	SELECT
		deviceId,
		readingTimestamp,
		readingStr,
		readingNum,
		readingArray
	FROM readingsValidated
	WHERE
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
),

readingsToBeRejected AS (
	SELECT
		*
	FROM readingsValidated
	WHERE -- Same clauses as readingsToBeProcessed, opposed with NOT
	NOT (
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
	)
)

-- For debugging only
SELECT * INTO Debug1 FROM readingsToBeProcessed
SELECT * INTO Debug2 FROM readingsToBeRejected

두 출력 모두에 대해 단일 WHERE 절을 작성하고 두 번째 절에서 NOT (...)을 사용하는 것이 좋습니다. 이렇게 하면 출력과 손실 모두에서 레코드를 제외할 수 없습니다.

이제 두 가지 출력이 표시됩니다. Debug1에는 다음과 같이 주 논리로 전송될 레코드가 있습니다.

deviceId readingTimestamp readingStr readingNum readingArray
1 2021-12-10T10:00:00.0000000Z 문자열 1.7145 ["A","B"]
3 2021-12-10T10:01:20.0000000Z 세 번째 문자열 NULL ["D","E","F"]

Debug2에는 다음과 같이 거부될 레코드가 있습니다.

in_deviceId in_readingTimestamp in_readingStr in_readingNum in_readingArray deviceId readingTimestamp readingStr readingNum readingArray
2 2021-12-10T10:01:00 NULL 2.378 ["C"] 2 2021-12-10T10:01:00.0000000Z NULL 2.378 ["C"]
4 2021-12-10T10:02:10 네 번째 문자열 1.2126 {} 4 2021-12-10T10:02:10.0000000Z 네 번째 문자열 1.2126 NULL

마지막 단계는 주 논리를 다시 추가하는 것입니다. 거부를 수집하는 출력도 추가하겠습니다. 여기서는 스토리지 계정과 같이 강력한 입력을 적용하지 않는 출력 어댑터를 사용하는 것이 가장 좋습니다.

전체 쿼리는 마지막 섹션에서 찾을 수 있습니다.

WITH
readingsValidated AS (...),
readingsToBeProcessed AS (...),
readingsToBeRejected AS (...)

SELECT
	r.deviceId,
	r.readingTimestamp,
	SUBSTRING(r.readingStr,1,200) AS readingStr,
	ROUND(r.readingNum,2) AS readingNum,
	COUNT(a.ArrayValue) AS arrayCount
INTO SQLOutput
FROM readingsToBeProcessed AS r
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
	System.Timestamp(), --snapshot window
	r.deviceId,
	r.readingTimestamp,
	r.readingStr,
	r.readingNum

SELECT
	*
INTO BlobOutput
FROM readingsToBeRejected

오류 발생 가능성 없이 SQLOutput에 대한 다음 세트를 제공합니다.

deviceId readingTimestamp readingStr readingNum readingArray
1 2021-12-10T10:00:00.0000000Z 문자열 1.7145 2
3 2021-12-10T10:01:20.0000000Z 세 번째 문자열 NULL 3

다른 두 레코드는 사람의 검토와 사후 처리를 위해 BlobOutput으로 전송됩니다. 우리 쿼리는 이제 안전합니다.

입력 유효성 검사가 있는 쿼리의 예

WITH readingsValidated AS (
	SELECT
		-- Rename incoming fields, used for audit and debugging
		deviceId AS in_deviceId,
		readingTimestamp AS in_readingTimestamp,
		readingStr AS in_readingStr,
		readingNum AS in_readingNum,
		readingArray AS in_readingArray,

		-- Try casting fields in their expected type
		TRY_CAST(deviceId AS bigint) as deviceId,
		TRY_CAST(readingTimestamp AS datetime) as readingTimestamp,
		TRY_CAST(readingStr AS nvarchar(max)) as readingStr,
		TRY_CAST(readingNum AS float) as readingNum,
		TRY_CAST(readingArray AS array) as readingArray

	FROM readings TIMESTAMP BY readingTimestamp
),

readingsToBeProcessed AS (
	SELECT
		deviceId,
		readingTimestamp,
		readingStr,
		readingNum,
		readingArray
	FROM readingsValidated
	WHERE
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
),

readingsToBeRejected AS (
	SELECT
		*
	FROM readingsValidated
	WHERE -- Same clauses as readingsToBeProcessed, opposed with NOT
	NOT (
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
	)
)

-- Core business logic
SELECT
	r.deviceId,
	r.readingTimestamp,
	SUBSTRING(r.readingStr,1,200) AS readingStr,
	ROUND(r.readingNum,2) AS readingNum,
	COUNT(a.ArrayValue) AS arrayCount
INTO SQLOutput
FROM readingsToBeProcessed AS r
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
	System.Timestamp(), --snapshot window
	r.deviceId,
	r.readingTimestamp,
	r.readingStr,
	r.readingNum

-- Rejected output. For human review, correction, and manual re-insertion downstream
SELECT
	*
INTO BlobOutput -- to a storage adapter that doesn't require strong typing, here blob/adls
FROM readingsToBeRejected

입력 유효성 검사 확장

GetType을 사용하여 형식을 명시적으로 확인할 수 있습니다. GetType은 프로젝션의 CASE 또는 설정 수준의 WHERE와 잘 작동합니다. GetType은 들어오는 스키마를 메타데이터 리포지토리와 비교하여 동적으로 확인하는 데 사용할 수도 있습니다. 리포지토리는 참조 데이터 세트를 통해 로드할 수 있습니다.

단위 테스트는 쿼리의 복원력을 보장하는 좋은 방법입니다. 입력 파일과 예상 출력으로 구성된 일련의 테스트를 빌드하겠습니다. 쿼리는 전달하기 위해 생성하는 출력과 일치해야 합니다. ASA에서 단위 테스트는 asa-streamanalytics-cicd npm 모듈을 통해 수행됩니다. 형식이 잘못된 다양한 이벤트가 있는 테스트 사례는 배포 파이프라인에서 만들고 테스트해야 합니다.

마지막으로, VS Code에서 몇 가지 가벼운 통합 테스트를 수행할 수 있습니다. 라이브 출력에 대한 로컬 실행을 통해 SQL 테이블에 레코드를 삽입할 수 있습니다.

지원 받기

추가 지원이 필요한 경우 Azure Stream Analytics용 Microsoft Q&A 질문 페이지를 참조하세요.

다음 단계