Freigeben über


Tutorial: Weiterleiten von Daten mithilfe von Richtlinien zur Tabellenaktualisierung

Wenn Ihre Quelldaten einfache und schnelle Transformationen beinhalten, ist es am besten, sie mithilfe eines Ereignisdatenstroms in der Pipeline upstream auszuführen. Dieser Ansatz funktioniert jedoch möglicherweise nicht gut für andere Transformationen, die komplex sind oder spezielle Funktionen erfordern.

In diesem Tutorial lernen Sie Folgendes:

Im Beispiel in diesem Lernprogramm wird veranschaulicht, wie Sie Aktualisierungsrichtlinien für das Datenrouting verwenden, um komplexe Transformationen auszuführen, um Daten zur Aufnahmezeit zu bereichern, zu bereinigen und zu transformieren. Eine Liste mit anderen häufig verwendeten Anwendungsfällen finden Sie unter Allgemeine Anwendungsfälle für Tabellenaktualisierungsrichtlinien.

Voraussetzungen

1 – Erstellen von Tabellen und Aktualisierungsrichtlinien

Die folgenden Schritte führen Sie durch das Erstellen einer Quelltabelle, Transformationsfunktionen, Zieltabellen und Updaterichtlinien. Das Lernprogramm veranschaulicht, wie Sie Tabellenaktualisierungsrichtlinien verwenden, um komplexe Transformationen durchzuführen und die Ergebnisse in einer oder mehreren Zieltabellen zu speichern. Im Beispiel wird eine einzelne Quelltabelle namens Raw_Table und drei Zieltabellen namens Device_Telemetry, Device_Alarms und Error_Log verwendet.

  1. Führen Sie den folgenden Befehl aus, um eine Tabelle mit dem Namen Raw_Table zu erstellen.

    .create table Raw_Table (RawData: dynamic)
    

    In der Quelltabelle werden die erfassten Daten gespeichert. Die Tabelle weist eine einzelne Spalte mit dem Namen RawData vom Typ "dynamic" auf. Der dynamische Typ wird verwendet, um die Rohdaten ohne Schema zu speichern. Weitere Informationen finden Sie unter .create table command.

  2. Führen Sie den folgenden Befehl aus, um eine Funktion namens Get_Telemetry, Get_Alarms und Log_Error Funktionen zu erstellen.

    .execute database script <|
      .create-or-alter function Get_Telemetry() {
        Raw_Table
        | where todynamic(RawData).MessageType == 'Telemetry'
        | extend
          Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)),
          DeviceId = tostring(RawData.DeviceId),
          DeviceType = tostring(RawData.DeviceType),
          SensorName = tostring(RawData.SensorName),
          SensorValue = toreal(RawData.SensorValue),
          SensorUnit = tostring(RawData.SensorUnit)
        | project-away RawData
      }
      .create-or-alter function Get_Alarms() {
        Raw_Table
        | where RawData.MessageType == 'Alarms'
        | extend
          Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)),
          DeviceId = tostring(RawData.DeviceId),
          DeviceType = tostring(RawData.DeviceTpe) ,
          AlarmType = tostring(RawData.AlarmType)
        | project-away RawData
      }
      .create-or-alter function Log_Error() {
        Raw_Table
        | where RawData.MessageType !in ('Telemetry', 'Alarms')
        | extend
          TimeStamp = datetime(now),
          ErrorType = 'Unknown MessageType'
        | project TimeStamp, RawData, ErrorType
      }
    

    Beim Erstellen einer Updaterichtlinie können Sie ein Inlineskript für die Ausführung angeben. Es wird jedoch empfohlen, die Transformationslogik in eine Funktion zu kapseln. Die Verwendung einer Funktion verbessert die Codewartung. Wenn neue Daten eintreffen, wird die Funktion ausgeführt, um die Daten zu transformieren. Die Funktion kann in mehreren Updaterichtlinien wiederverwendet werden. Weitere Informationen finden Sie unter .create-Funktionsbefehl.

  3. Führen Sie den folgenden Befehl aus, um die Zieltabellen zu erstellen.

    .execute database script <|
      .create table Device_Telemetry (Timestamp: datetime, DeviceId: string, DeviceType: string, SensorName: string, SensorValue: real, SensorUnit: string)
      .set-or-append Device_Alarms <| Get_Alarms | take 0
      .set-or-append Error_Log <| Log_Error | take 0
    

    Die Zieltabelle muss dasselbe Schema wie diese Ausgabe der Transformationsfunktion aufweisen. Sie können Zieltabellen auf folgende Weise erstellen:

    • Verwenden Sie den .create table Befehl und geben Sie das Schema manuell an, wie bei der Erstellung der Device_Telemetry Tabelle gezeigt. Dieser Ansatz kann jedoch fehleranfällig und zeitaufwändig sein.
    • Verwenden Sie den .set-or-append Befehl, wenn Sie bereits eine Funktion zum Transformieren der Daten erstellt haben. Mit dieser Methode wird eine neue Tabelle mit demselben Schema wie die Ausgabe der Funktion erstellt, wobei take 0 sichergestellt wird, dass die Funktion nur das Schema zurückgibt. Weitere Informationen finden Sie unter ".set-or-append"-Befehl.
  4. Führen Sie den folgenden Befehl aus, um die Updaterichtlinien für die Zieltabellen zu erstellen.

    .execute database script <|
      .alter table Device_Telemetry policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Telemetry\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
      .alter table Device_Alarms policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Alarms\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
      .alter table Error_Log policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Log_Error\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
    

    Der .alter table policy update Befehl wird verwendet, um die Quelltabelle, die Transformationsfunktion und die Zieltabelle zu verknüpfen. Die Updaterichtlinie wird in der Zieltabelle erstellt und gibt die Quelltabelle und transformationsfunktion an. Weitere Informationen finden Sie unter ".alter table policy update command".

2 – Beispieldaten aufnehmen

Zum Testen der Updaterichtlinien können Sie Beispieldaten mithilfe des .set-or-append Befehls in die Quelltabelle aufnehmen. Weitere Informationen finden Sie unter "Erfassen von Daten aus einer Abfrage".

.set-or-append Raw_Table <|
  let Raw_Stream = datatable(RawData: dynamic)
    [
    dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Telemetry", "DeviceType": "Laminator", "SensorName": "Temperature", "SensorValue": 78.3, "SensorUnit": "Celcius"}),
    dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Alarms", "DeviceType": "Laminator", "AlarmType": "Temperature threshold breached"}),
    dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Foo", "ErrorType": "Unknown"})
  ];
  Raw_Stream

3 – Überprüfen der Ergebnisse

Um die Ergebnisse zu überprüfen, können Sie eine Abfrage ausführen, um zu überprüfen, ob die Daten transformiert und an die Zieltabellen weitergeleitet wurden. Im folgenden Beispiel wird der union Operator verwendet, um die Quelle und die Ergebnisse aus den Zieltabellen in einem einzigen Resultset zu kombinieren.

Raw_Table | summarize Rows=count() by TableName = "Raw_Table"
| union (Device_Telemetry | summarize Rows=count() by TableName = "Device_Telemetry")
| union (Device_Alarms | summarize Rows=count() by TableName = "Device_Alarms")
| union (Error_Log | summarize Rows=count() by TableName = "Error_Log")
| sort by Rows desc

Output

Die folgende Ausgabe sollte angezeigt werden, in der die Raw_Table drei Zeilen enthält und die Zieltabellen jeweils eine Zeile haben.

TableName Zeilen
Raw_Table 3
Error_Log 1
Device_Alarms 1
Device_Telemetry 1

Bereinigen von Ressourcen

Führen Sie den folgenden Befehl in Ihrer Datenbank aus, um die in diesem Lernprogramm erstellten Tabellen und Funktionen zu bereinigen.

.execute database script <|
  .drop table Raw_Table
  .drop table Device_Telemetry
  .drop table Device_Alarms
  .drop table Error_Log
  .drop function Get_Telemetry
  .drop function Get_Alarms
  .drop function Log_Error