Dela via


Självstudie: Dirigera data med hjälp av tabelluppdateringsprinciper

När dina källdata omfattar enkla och snabba transformeringar är det bäst att utföra dem uppströms i pipelinen med hjälp av en händelseström. Den här metoden kanske dock inte fungerar bra för andra transformeringar som är komplexa eller kräver särskilda funktioner för att fungera.

I den här guiden får du lära dig att:

Exemplet i den här självstudien visar hur du använder uppdateringsprinciper för dataroutning för att utföra komplexa transformeringar för att berika, rensa och transformera data vid inmatning. En lista över andra vanliga användningsfall finns i Vanliga användningsfall för tabelluppdateringsprinciper.

Förutsättningar

1 – Skapa tabeller och uppdatera principer

Följande steg vägleder dig genom att skapa en källtabell, transformeringsfunktioner, måltabeller och uppdateringsprinciper. Självstudien visar hur du använder tabelluppdateringsprinciper för att utföra komplexa transformeringar och spara resultatet i en eller flera måltabeller. I exemplet används en enda källtabell med namnet Raw_Table och tre måltabeller med namnet Device_Telemetry, Device_Alarms och Error_Log.

  1. Kör följande kommando för att skapa en tabell med namnet Raw_Table.

    .create table Raw_Table (RawData: dynamic)
    

    Källtabellen är den plats där inmatade data sparas. Tabellen har en enda kolumn med namnet RawData av typen dynamic. Den dynamiska typen används för att lagra rådata som de är, utan något schema. Mer information finns i kommandot .create table.

  2. Kör följande kommando för att skapa en funktion med namnet Get_Telemetry, Get_Alarms och Log_Error funktioner.

    .execute database script <|
      .create-or-alter function Get_Telemetry() {
        Raw_Table
        | where todynamic(RawData).MessageType == 'Telemetry'
        | extend
          Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)),
          DeviceId = tostring(RawData.DeviceId),
          DeviceType = tostring(RawData.DeviceType),
          SensorName = tostring(RawData.SensorName),
          SensorValue = toreal(RawData.SensorValue),
          SensorUnit = tostring(RawData.SensorUnit)
        | project-away RawData
      }
      .create-or-alter function Get_Alarms() {
        Raw_Table
        | where RawData.MessageType == 'Alarms'
        | extend
          Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)),
          DeviceId = tostring(RawData.DeviceId),
          DeviceType = tostring(RawData.DeviceTpe) ,
          AlarmType = tostring(RawData.AlarmType)
        | project-away RawData
      }
      .create-or-alter function Log_Error() {
        Raw_Table
        | where RawData.MessageType !in ('Telemetry', 'Alarms')
        | extend
          TimeStamp = datetime(now),
          ErrorType = 'Unknown MessageType'
        | project TimeStamp, RawData, ErrorType
      }
    

    När du skapar en uppdateringsprincip kan du ange ett infogat skript för körning. Vi rekommenderar dock att du kapslar in transformeringslogik i en funktion. Användning av en funktion förbättrar kodunderhållet. När nya data tas emot körs funktionen för att transformera data. Funktionen kan återanvändas i flera uppdateringsprinciper. Mer information finns i .create-funktionskommandot.

  3. Kör följande kommando för att skapa måltabellerna.

    .execute database script <|
      .create table Device_Telemetry (Timestamp: datetime, DeviceId: string, DeviceType: string, SensorName: string, SensorValue: real, SensorUnit: string)
      .set-or-append Device_Alarms <| Get_Alarms | take 0
      .set-or-append Error_Log <| Log_Error | take 0
    

    Måltabellen måste ha samma schema som utdata för transformeringsfunktionen. Du kan skapa måltabeller på följande sätt:

    • .create table Använd kommandot och ange schemat manuellt, vilket visas när du skapar Device_Telemetry-tabellen. Den här metoden kan dock vara felbenägen och tidskrävande.
    • .set-or-append Använd kommandot om du redan har skapat en funktion för att transformera data. Den här metoden skapar en ny tabell med samma schema som utdata för funktionen, med hjälp take 0 av för att se till att funktionen bara returnerar schemat. Mer information finns i kommandot .set-or-append.
  4. Kör följande kommando för att skapa uppdateringsprinciperna för måltabellerna

    .execute database script <|
      .alter table Device_Telemetry policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Telemetry\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
      .alter table Device_Alarms policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Alarms\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
      .alter table Error_Log policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Log_Error\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
    

    Kommandot .alter table policy update används för att länka källtabellen, transformeringsfunktionen och måltabellen. Uppdateringsprincipen skapas i måltabellen och anger för källtabellen och transformeringsfunktionen. Mer information finns i uppdateringskommandot för .alter table policy.

2 – Mata in exempeldata

Om du vill testa uppdateringsprinciperna kan du mata in exempeldata i källtabellen .set-or-append med kommandot . Mer information finns i Mata in data från en fråga.

.set-or-append Raw_Table <|
  let Raw_Stream = datatable(RawData: dynamic)
    [
    dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Telemetry", "DeviceType": "Laminator", "SensorName": "Temperature", "SensorValue": 78.3, "SensorUnit": "Celcius"}),
    dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Alarms", "DeviceType": "Laminator", "AlarmType": "Temperature threshold breached"}),
    dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Foo", "ErrorType": "Unknown"})
  ];
  Raw_Stream

3 – Verifiera resultatet

För att verifiera resultatet kan du köra en fråga för att kontrollera att data har transformerats och dirigerats till måltabellerna. I följande exempel används operatorn union för att kombinera källan och resultaten från måltabellerna till en enda resultatuppsättning.

Raw_Table | summarize Rows=count() by TableName = "Raw_Table"
| union (Device_Telemetry | summarize Rows=count() by TableName = "Device_Telemetry")
| union (Device_Alarms | summarize Rows=count() by TableName = "Device_Alarms")
| union (Error_Log | summarize Rows=count() by TableName = "Error_Log")
| sort by Rows desc

Resultat

Du bör se följande utdata där Raw_Table har tre rader och måltabellerna har en rad vardera.

TableName Rader
Raw_Table 3
Error_Log 1
Device_Alarms 1
Device_Telemetry 1

Rensa resurser

Kör följande kommando i databasen för att rensa tabellerna och funktionerna som skapades i den här självstudien.

.execute database script <|
  .drop table Raw_Table
  .drop table Device_Telemetry
  .drop table Device_Alarms
  .drop table Error_Log
  .drop function Get_Telemetry
  .drop function Get_Alarms
  .drop function Log_Error