자습서: 테이블 업데이트 정책을 사용하여 데이터 라우팅
원본 데이터에 간단하고 빠른 변환이 포함되는 경우 이벤트 스트림을 사용하여 파이프라인에서 업스트림 수행하는 것이 가장 좋습니다. 그러나 이 방법은 복잡하거나 특수한 기능이 필요한 다른 변환에서는 제대로 작동하지 않을 수 있습니다.
이 자습서에서는 다음 작업 방법을 알아봅니다.
이 자습서의 예제에서는 데이터 라우팅 에 대한 업데이트 정책을 사용하여 복잡한 변환을 수행하여 수집 시 데이터를 보강, 정리 및 변환하는 방법을 보여 줍니다. 다른 일반적인 사용 사례 목록은 테이블 업데이트 정책에 대한 일반적인 사용 사례를 참조하세요.
사전 요구 사항
- Microsoft 계정 또는 Microsoft Entra 사용자 ID. Azure 구독이 필요하지 않습니다.
- Azure Data Explorer 클러스터 및 데이터베이스. Microsoft Fabric의 Real-Time Analytics에서 클러스터 및 데이터베이스 또는 KQL 데이터베이스를만듭니다.
1 - 테이블 만들기 및 정책 업데이트
다음 단계에서는 원본 테이블, 변환 함수, 대상 테이블 및 업데이트 정책을 만드는 방법에 대해 설명합니다. 이 자습서에서는 테이블 업데이트 정책을 사용하여 복잡한 변환을 수행하고 결과를 하나 이상의 대상 테이블에 저장하는 방법을 보여 줍니다. 이 예제에서는 Raw_Table 라는 단일 원본 테이블과 Device_Telemetry, Device_Alarms및Error_Log라는 세 개의 대상 테이블을 사용합니다.
다음 명령을 실행하여 Raw_Table 테이블을 만듭니다.
.create table Raw_Table (RawData: dynamic)
원본 테이블은 수집된 데이터가 저장되는 위치입니다. 테이블에는 동적 형식의 RawData 라는 단일 열이 있습니다. 동적 형식은 스키마 없이 원시 데이터를 있는 그대로 저장하는 데 사용됩니다. 자세한 내용은 .create table 명령을 참조하세요.
다음 명령을 실행하여 Get_Telemetry, Get_Alarms 및 Log_Error 함수라는 함수를 만듭니다.
.execute database script <| .create-or-alter function Get_Telemetry() { Raw_Table | where todynamic(RawData).MessageType == 'Telemetry' | extend Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)), DeviceId = tostring(RawData.DeviceId), DeviceType = tostring(RawData.DeviceType), SensorName = tostring(RawData.SensorName), SensorValue = toreal(RawData.SensorValue), SensorUnit = tostring(RawData.SensorUnit) | project-away RawData } .create-or-alter function Get_Alarms() { Raw_Table | where RawData.MessageType == 'Alarms' | extend Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)), DeviceId = tostring(RawData.DeviceId), DeviceType = tostring(RawData.DeviceTpe) , AlarmType = tostring(RawData.AlarmType) | project-away RawData } .create-or-alter function Log_Error() { Raw_Table | where RawData.MessageType !in ('Telemetry', 'Alarms') | extend TimeStamp = datetime(now), ErrorType = 'Unknown MessageType' | project TimeStamp, RawData, ErrorType }
업데이트 정책을 만들 때 실행을 위한 인라인 스크립트를 지정할 수 있습니다. 그러나 변환 논리를 함수로 캡슐화하는 것이 좋습니다. 함수를 사용하면 코드 유지 관리가 향상됩니다. 새 데이터가 도착하면 함수가 실행되어 데이터를 변환합니다. 함수는 여러 업데이트 정책에서 다시 사용할 수 있습니다. 자세한 내용은 .create 함수 명령을 참조하세요.
다음 명령을 실행하여 대상 테이블을 만듭니다.
.execute database script <| .create table Device_Telemetry (Timestamp: datetime, DeviceId: string, DeviceType: string, SensorName: string, SensorValue: real, SensorUnit: string) .set-or-append Device_Alarms <| Get_Alarms | take 0 .set-or-append Error_Log <| Log_Error | take 0
대상 테이블에는 변환 함수의 출력과 동일한 스키마가 있어야 합니다. 다음과 같은 방법으로 대상 테이블을 만들 수 있습니다.
.create table
명령을 사용하고 Device_Telemetry 테이블을 만드는 것과 같이 스키마를 수동으로 지정합니다. 그러나 이 방법은 오류가 발생하기 쉽고 시간이 많이 걸릴 수 있습니다..set-or-append
이미 데이터를 변환하는 함수를 만든 경우 명령을 사용합니다. 이 메서드는 함수의 출력과 동일한 스키마를 사용하여 새 테이블을 만들고, 를 사용하여take 0
함수가 스키마만 반환하는지 확인합니다. 자세한 내용은 .set-or-append 명령을 참조하세요.
다음 명령을 실행하여 대상 테이블에 대한 업데이트 정책을 만듭니다.
.execute database script <| .alter table Device_Telemetry policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Telemetry\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]" .alter table Device_Alarms policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Alarms\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]" .alter table Error_Log policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Log_Error\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
명령은
.alter table policy update
원본 테이블, 변환 함수 및 대상 테이블을 연결하는 데 사용됩니다. 업데이트 정책은 대상 테이블에 만들어지고 원본 테이블 및 변환 함수에 지정됩니다. 자세한 내용은 .alter table policy update 명령을 참조하세요.
2 - 샘플 데이터 수집
업데이트 정책을 테스트하려면 명령을 사용하여 샘플 데이터를 원본 테이블에 수집할 .set-or-append
수 있습니다. 자세한 내용은 쿼리에서 데이터 수집을 참조하세요.
.set-or-append Raw_Table <|
let Raw_Stream = datatable(RawData: dynamic)
[
dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Telemetry", "DeviceType": "Laminator", "SensorName": "Temperature", "SensorValue": 78.3, "SensorUnit": "Celcius"}),
dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Alarms", "DeviceType": "Laminator", "AlarmType": "Temperature threshold breached"}),
dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Foo", "ErrorType": "Unknown"})
];
Raw_Stream
3 - 결과 확인
결과의 유효성을 검사하기 위해 쿼리를 실행하여 데이터가 변환되어 대상 테이블로 라우팅되었는지 확인할 수 있습니다. 다음 예제에서는 연산자를 union
사용하여 원본과 대상 테이블의 결과를 단일 결과 집합으로 결합합니다.
Raw_Table | summarize Rows=count() by TableName = "Raw_Table"
| union (Device_Telemetry | summarize Rows=count() by TableName = "Device_Telemetry")
| union (Device_Alarms | summarize Rows=count() by TableName = "Device_Alarms")
| union (Error_Log | summarize Rows=count() by TableName = "Error_Log")
| sort by Rows desc
출력
Raw_Table 3개의 행이 있고 대상 테이블에 각각 하나의 행이 있는 다음 출력이 표시됩니다.
TableName | 행 |
---|---|
Raw_Table | 3 |
Error_Log | 1 |
Device_Alarms | 1 |
Device_Telemetry | 1 |
리소스 정리
데이터베이스에서 다음 명령을 실행하여 이 자습서에서 만든 테이블과 함수를 클린.
.execute database script <|
.drop table Raw_Table
.drop table Device_Telemetry
.drop table Device_Alarms
.drop table Error_Log
.drop function Get_Telemetry
.drop function Get_Alarms
.drop function Log_Error
관련 콘텐츠
피드백
https://aka.ms/ContentUserFeedback
출시 예정: 2024년 내내 콘텐츠에 대한 피드백 메커니즘으로 GitHub 문제를 단계적으로 폐지하고 이를 새로운 피드백 시스템으로 바꿀 예정입니다. 자세한 내용은 다음을 참조하세요.다음에 대한 사용자 의견 제출 및 보기