Kurz: Směrování dat pomocí zásad aktualizace tabulek

Pokud zdrojová data zahrnují jednoduché a rychlé transformace, je nejlepší je provést upstream v kanálu pomocí streamu událostí. Tento přístup ale nemusí dobře fungovat u jiných transformací, které jsou složité nebo ke své činnosti vyžadují specializované funkce.

V tomto kurzu se naučíte:

Příklad v tomto kurzu ukazuje použití zásad aktualizace pro směrování dat k provádění složitých transformací za účelem obohacení, vyčištění a transformace dat v době příjmu dat. Seznam dalších běžných případů použití najdete v tématu Běžné případy použití pro zásady aktualizace tabulek.

Požadavky

1. Vytvoření tabulek a aktualizace zásad

Následující kroky vás provedou vytvořením zdrojové tabulky, transformačními funkcemi, cílovými tabulkami a zásadami aktualizace. Tento kurz ukazuje, jak používat zásady aktualizace tabulek k provádění složitých transformací a ukládání výsledků do jedné nebo více cílových tabulek. Příklad používá jednu zdrojovou tabulku s názvem Raw_Table a tři cílové tabulky s názvy Device_Telemetry, Device_Alarms a Error_Log.

  1. Spuštěním následujícího příkazu vytvořte tabulku s názvem Raw_Table.

    .create table Raw_Table (RawData: dynamic)
    

    Ve zdrojové tabulce se ukládají ingestovaná data. Tabulka obsahuje jeden sloupec s názvem RawData typu dynamic. Dynamický typ se používá k ukládání nezpracovaných dat tak, jak jsou, bez jakéhokoli schématu. Další informace najdete v tématu příkaz .create table.

  2. Spuštěním následujícího příkazu vytvořte funkci s názvem Get_Telemetry, Get_Alarms a Log_Error funkce.

    .execute database script <|
      .create-or-alter function Get_Telemetry() {
        Raw_Table
        | where todynamic(RawData).MessageType == 'Telemetry'
        | extend
          Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)),
          DeviceId = tostring(RawData.DeviceId),
          DeviceType = tostring(RawData.DeviceType),
          SensorName = tostring(RawData.SensorName),
          SensorValue = toreal(RawData.SensorValue),
          SensorUnit = tostring(RawData.SensorUnit)
        | project-away RawData
      }
      .create-or-alter function Get_Alarms() {
        Raw_Table
        | where RawData.MessageType == 'Alarms'
        | extend
          Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)),
          DeviceId = tostring(RawData.DeviceId),
          DeviceType = tostring(RawData.DeviceTpe) ,
          AlarmType = tostring(RawData.AlarmType)
        | project-away RawData
      }
      .create-or-alter function Log_Error() {
        Raw_Table
        | where RawData.MessageType !in ('Telemetry', 'Alarms')
        | extend
          TimeStamp = datetime(now),
          ErrorType = 'Unknown MessageType'
        | project TimeStamp, RawData, ErrorType
      }
    

    Při vytváření zásad aktualizace můžete zadat vložený skript pro spuštění. Doporučujeme ale logiku transformace zapouzdřovat do funkce. Použití funkce zlepšuje údržbu kódu. Při příchodu nových dat se provede funkce, která data transformuje. Funkci je možné opakovaně používat v rámci více zásad aktualizace. Další informace najdete v tématu .create function command.

  3. Spuštěním následujícího příkazu vytvořte cílové tabulky.

    .execute database script <|
      .create table Device_Telemetry (Timestamp: datetime, DeviceId: string, DeviceType: string, SensorName: string, SensorValue: real, SensorUnit: string)
      .set-or-append Device_Alarms <| Get_Alarms | take 0
      .set-or-append Error_Log <| Log_Error | take 0
    

    Cílová tabulka musí mít stejné schéma jako výstup transformační funkce. Cílové tabulky můžete vytvořit následujícími způsoby:

    • .create table Pomocí příkazu a ručním zadáním schématu, jak ukazuje vytvoření tabulky Device_Telemetry. Tento přístup ale může být náchylný k chybám a časově náročný.
    • .set-or-append Pokud jste už vytvořili funkci pro transformaci dat, použijte příkaz . Tato metoda vytvoří novou tabulku se stejným schématem jako výstup funkce a pomocí take 0 příkazu zajistí, aby funkce vrátila pouze schéma. Další informace najdete v tématu příkaz .set-or-append.
  4. Spuštěním následujícího příkazu vytvořte zásady aktualizace pro cílové tabulky.

    .execute database script <|
      .alter table Device_Telemetry policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Telemetry\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
      .alter table Device_Alarms policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Alarms\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
      .alter table Error_Log policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Log_Error\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
    

    Příkaz .alter table policy update slouží k propojení zdrojové tabulky, transformační funkce a cílové tabulky. Zásada aktualizace se vytvoří v cílové tabulce a určuje pro zdrojovou tabulku a transformační funkci. Další informace najdete v tématu .alter table policy update command.

2. Ingestování ukázkových dat

Pokud chcete zásady aktualizace otestovat, můžete pomocí příkazu ingestovat ukázková data do zdrojové tabulky .set-or-append . Další informace najdete v tématu Příjem dat z dotazu.

.set-or-append Raw_Table <|
  let Raw_Stream = datatable(RawData: dynamic)
    [
    dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Telemetry", "DeviceType": "Laminator", "SensorName": "Temperature", "SensorValue": 78.3, "SensorUnit": "Celcius"}),
    dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Alarms", "DeviceType": "Laminator", "AlarmType": "Temperature threshold breached"}),
    dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Foo", "ErrorType": "Unknown"})
  ];
  Raw_Stream

3 – Ověření výsledků

Pokud chcete ověřit výsledky, můžete spuštěním dotazu ověřit, že se data transformovala a směrovala do cílových tabulek. V následujícím příkladu union se operátor používá ke sloučení zdroje a výsledků z cílových tabulek do jedné sady výsledků dotazu.

Raw_Table | summarize Rows=count() by TableName = "Raw_Table"
| union (Device_Telemetry | summarize Rows=count() by TableName = "Device_Telemetry")
| union (Device_Alarms | summarize Rows=count() by TableName = "Device_Alarms")
| union (Error_Log | summarize Rows=count() by TableName = "Error_Log")
| sort by Rows desc

Výstup

Měli byste vidět následující výstup, kde Raw_Table obsahuje tři řádky a cílové tabulky mají každý jeden řádek.

TableName Řádky
Raw_Table 3
Error_Log 1
Device_Alarms 1
Device_Telemetry 1

Vyčištění prostředků

Spuštěním následujícího příkazu v databázi vyčistěte tabulky a funkce vytvořené v tomto kurzu.

.execute database script <|
  .drop table Raw_Table
  .drop table Device_Telemetry
  .drop table Device_Alarms
  .drop table Error_Log
  .drop function Get_Telemetry
  .drop function Get_Alarms
  .drop function Log_Error