Руководство. Маршрутизация данных с помощью политик обновления таблиц
Если исходные данные включают простые и быстрые преобразования, лучше выполнять их вышестоящий в конвейере с помощью потока событий. Однако этот подход может не подходить для других преобразований, которые являются сложными или требуют специальных функциональных возможностей.
В этом руководстве описано следующее:
В примере в этом руководстве показано, как использовать политики обновления для маршрутизации данных для выполнения сложных преобразований для обогащения, очистки и преобразования данных во время приема. Список других распространенных вариантов использования см. в статье Распространенные варианты использования политик обновления таблиц.
Предварительные требования
- Учетная запись Майкрософт или удостоверение пользователя Microsoft Entra. Подписка Azure не обязательна.
- Кластер и база данных Azure Data Explorer. Создайте кластер и базу данныхилибазу данных KQL в Real-Time Analytics в Microsoft Fabric.
1. Создание таблиц и обновление политик
Ниже приведены инструкции по созданию исходной таблицы, функций преобразования, целевых таблиц и политик обновления. В этом руководстве показано, как использовать политики обновления таблиц для выполнения сложных преобразований и сохранения результатов в одной или нескольких целевых таблицах. В примере используется одна исходная таблица с именем Raw_Table и три целевые таблицы с именами Device_Telemetry, Device_Alarms и Error_Log.
Выполните следующую команду, чтобы создать таблицу с именем Raw_Table.
.create table Raw_Table (RawData: dynamic)
В исходной таблице хранятся данные, которые были в приеме. Таблица содержит один столбец с именем RawData типа dynamic. Динамический тип используется для хранения необработанных данных без какой-либо схемы. Дополнительные сведения см. в разделе Команда .create table.
Выполните следующую команду, чтобы создать функцию с именами Get_Telemetry, Get_Alarms и Log_Error функций.
.execute database script <| .create-or-alter function Get_Telemetry() { Raw_Table | where todynamic(RawData).MessageType == 'Telemetry' | extend Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)), DeviceId = tostring(RawData.DeviceId), DeviceType = tostring(RawData.DeviceType), SensorName = tostring(RawData.SensorName), SensorValue = toreal(RawData.SensorValue), SensorUnit = tostring(RawData.SensorUnit) | project-away RawData } .create-or-alter function Get_Alarms() { Raw_Table | where RawData.MessageType == 'Alarms' | extend Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)), DeviceId = tostring(RawData.DeviceId), DeviceType = tostring(RawData.DeviceTpe) , AlarmType = tostring(RawData.AlarmType) | project-away RawData } .create-or-alter function Log_Error() { Raw_Table | where RawData.MessageType !in ('Telemetry', 'Alarms') | extend TimeStamp = datetime(now), ErrorType = 'Unknown MessageType' | project TimeStamp, RawData, ErrorType }
При создании политики обновления можно указать встроенный скрипт для выполнения. Однако мы рекомендуем инкапсулировать логику преобразования в функцию. Использование функции улучшает обслуживание кода. При поступлении новых данных выполняется функция для преобразования данных. Функцию можно повторно использовать в нескольких политиках обновления. Дополнительные сведения см. в разделе Команда функции .create.
Выполните следующую команду, чтобы создать целевые таблицы.
.execute database script <| .create table Device_Telemetry (Timestamp: datetime, DeviceId: string, DeviceType: string, SensorName: string, SensorValue: real, SensorUnit: string) .set-or-append Device_Alarms <| Get_Alarms | take 0 .set-or-append Error_Log <| Log_Error | take 0
Целевая таблица должна иметь ту же схему, что и выходные данные функции преобразования. Целевые таблицы можно создать следующими способами:
.create table
Используйте команду и вручную укажите схему, как показано при создании таблицы Device_Telemetry. Однако этот подход может быть подвержен ошибкам и отнимает много времени.- Используйте команду ,
.set-or-append
если вы уже создали функцию для преобразования данных. Этот метод создает новую таблицу с той же схемой, что и выходные данные функции, используяtake 0
, чтобы убедиться, что функция возвращает только схему. Дополнительные сведения см. в разделе Команда .set-or-append.
Выполните следующую команду, чтобы создать политики обновления для целевых таблиц.
.execute database script <| .alter table Device_Telemetry policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Telemetry\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]" .alter table Device_Alarms policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Alarms\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]" .alter table Error_Log policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Log_Error\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
Команда
.alter table policy update
используется для связывания исходной таблицы, функции преобразования и целевой таблицы. Политика обновления создается в целевой таблице и указывает для исходной таблицы и функции преобразования. Дополнительные сведения см. в разделе Команда .alter table policy update.
2. Прием примеров данных
Чтобы протестировать политики обновления, можно принять примеры данных в исходную таблицу .set-or-append
с помощью команды . Дополнительные сведения см. в разделе Прием данных из запроса.
.set-or-append Raw_Table <|
let Raw_Stream = datatable(RawData: dynamic)
[
dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Telemetry", "DeviceType": "Laminator", "SensorName": "Temperature", "SensorValue": 78.3, "SensorUnit": "Celcius"}),
dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Alarms", "DeviceType": "Laminator", "AlarmType": "Temperature threshold breached"}),
dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Foo", "ErrorType": "Unknown"})
];
Raw_Stream
3. Проверка результатов
Чтобы проверить результаты, можно выполнить запрос, чтобы убедиться, что данные были преобразованы и перенаправлены в целевые таблицы. В следующем примере union
оператор используется для объединения источника и результатов из целевых таблиц в один результирующий набор.
Raw_Table | summarize Rows=count() by TableName = "Raw_Table"
| union (Device_Telemetry | summarize Rows=count() by TableName = "Device_Telemetry")
| union (Device_Alarms | summarize Rows=count() by TableName = "Device_Alarms")
| union (Error_Log | summarize Rows=count() by TableName = "Error_Log")
| sort by Rows desc
Выходные данные
Вы увидите следующие выходные данные, в которых Raw_Table содержит три строки, а целевые таблицы содержат по одной строке.
TableName | Строки |
---|---|
Raw_Table | 3 |
Error_Log | 1 |
Device_Alarms | 1 |
Device_Telemetry | 1 |
Очистка ресурсов
Выполните следующую команду в базе данных, чтобы очистить таблицы и функции, созданные в этом руководстве.
.execute database script <|
.drop table Raw_Table
.drop table Device_Telemetry
.drop table Device_Alarms
.drop table Error_Log
.drop function Get_Telemetry
.drop function Get_Alarms
.drop function Log_Error
См. также
Обратная связь
https://aka.ms/ContentUserFeedback.
Ожидается в ближайшее время: в течение 2024 года мы постепенно откажемся от GitHub Issues как механизма обратной связи для контента и заменим его новой системой обратной связи. Дополнительные сведения см. в разделеОтправить и просмотреть отзыв по