Oktatóanyag: Adatok átirányítása táblafrissítési szabályzatokkal

Ha a forrásadatok egyszerű és gyors átalakításokat igényelnek, a legjobb, ha azokat a folyamat felső rétegében hajtja végre egy eseménystream használatával. Előfordulhat azonban, hogy ez a megközelítés nem működik jól más olyan átalakítások esetében, amelyek összetettek, vagy speciális funkciókat igényelnek a működéshez.

Eben az oktatóanyagban az alábbiakkal fog megismerkedni:

Az oktatóanyagban szereplő példa bemutatja, hogyan használhat frissítési szabályzatokat az adatátirányításhoz összetett átalakítások végrehajtásához az adatok betöltésekor történő bővítéséhez, tisztításához és átalakításához. Az egyéb gyakori használati esetek listáját lásd: A táblafrissítési szabályzatok gyakori használati esetei.

Előfeltételek

1 – Táblák létrehozása és szabályzatok frissítése

Az alábbi lépések végigvezetik a forrástábla, az átalakítási függvények, a céltáblák és a frissítési szabályzatok létrehozásán. Az oktatóanyag bemutatja, hogyan használhat táblafrissítési szabályzatokat összetett átalakítások végrehajtására, és hogyan mentheti az eredményeket egy vagy több céltáblára. A példa egy Raw_Table nevű forrástáblát és három Device_Telemetry,Device_Alarms és Error_Log nevű céltáblát használ.

  1. Futtassa a következő parancsot egy Raw_Table nevű tábla létrehozásához.

    .create table Raw_Table (RawData: dynamic)
    

    A rendszer a forrástáblában menti a betöltött adatokat. A tábla egyetlen , dinamikus típusú RawData nevű oszlopot tartalmaz. A dinamikus típus a nyers adatok séma nélküli tárolására szolgál. További információt a .create table parancsban talál.

  2. Futtassa a következő parancsot egy Get_Telemetry, Get_Alarms és Log_Error nevű függvény létrehozásához.

    .execute database script <|
      .create-or-alter function Get_Telemetry() {
        Raw_Table
        | where todynamic(RawData).MessageType == 'Telemetry'
        | extend
          Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)),
          DeviceId = tostring(RawData.DeviceId),
          DeviceType = tostring(RawData.DeviceType),
          SensorName = tostring(RawData.SensorName),
          SensorValue = toreal(RawData.SensorValue),
          SensorUnit = tostring(RawData.SensorUnit)
        | project-away RawData
      }
      .create-or-alter function Get_Alarms() {
        Raw_Table
        | where RawData.MessageType == 'Alarms'
        | extend
          Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)),
          DeviceId = tostring(RawData.DeviceId),
          DeviceType = tostring(RawData.DeviceTpe) ,
          AlarmType = tostring(RawData.AlarmType)
        | project-away RawData
      }
      .create-or-alter function Log_Error() {
        Raw_Table
        | where RawData.MessageType !in ('Telemetry', 'Alarms')
        | extend
          TimeStamp = datetime(now),
          ErrorType = 'Unknown MessageType'
        | project TimeStamp, RawData, ErrorType
      }
    

    Frissítési szabályzat létrehozásakor megadhatja a végrehajtáshoz szükséges, soron belüli szkriptet. Javasoljuk azonban, hogy foglalja bele az átalakítási logikát egy függvénybe. A függvények használata javítja a kódkarbantartást. Amikor új adatok érkeznek, a rendszer végrehajtja a függvényt az adatok átalakításához. A függvény több frissítési szabályzatban is újra felhasználható. További információ: .create függvényparancs.

  3. A céltáblák létrehozásához futtassa az alábbi parancsot.

    .execute database script <|
      .create table Device_Telemetry (Timestamp: datetime, DeviceId: string, DeviceType: string, SensorName: string, SensorValue: real, SensorUnit: string)
      .set-or-append Device_Alarms <| Get_Alarms | take 0
      .set-or-append Error_Log <| Log_Error | take 0
    

    A céltáblának ugyanazzal a sémával kell rendelkeznie, mint az átalakítási függvény kimenete. Céltáblákat a következő módokon hozhat létre:

    • Használja a .create table parancsot, és manuálisan adja meg a sémát a Device_Telemetry tábla létrehozásakor bemutatott módon. Ez a megközelítés azonban hibalehetőséget és időigényes lehet.
    • .set-or-append Ha már létrehozott egy függvényt az adatok átalakításához, használja a parancsot. Ez a metódus létrehoz egy új táblát, amelynek a kimenete megegyezik a függvény kimenetével, így meggyőződhet arról, take 0 hogy a függvény csak a sémát adja vissza. További információ: .set-or-append parancs.
  4. Futtassa az alábbi parancsot a céltáblák frissítési szabályzatainak létrehozásához

    .execute database script <|
      .alter table Device_Telemetry policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Telemetry\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
      .alter table Device_Alarms policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Alarms\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
      .alter table Error_Log policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Log_Error\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
    

    A .alter table policy update parancs a forrástábla, az átalakítási függvény és a céltábla összekapcsolására szolgál. A frissítési szabályzat a céltáblán jön létre, és megadja a forrástáblát és az átalakítási függvényt. További információ: .alter table policy update parancs.

2 – Mintaadatok betöltése

A frissítési szabályzatok teszteléséhez a parancs használatával mintaadatokat is betölthet a .set-or-append forrástáblába. További információ: Adatok betöltése lekérdezésből.

.set-or-append Raw_Table <|
  let Raw_Stream = datatable(RawData: dynamic)
    [
    dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Telemetry", "DeviceType": "Laminator", "SensorName": "Temperature", "SensorValue": 78.3, "SensorUnit": "Celcius"}),
    dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Alarms", "DeviceType": "Laminator", "AlarmType": "Temperature threshold breached"}),
    dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Foo", "ErrorType": "Unknown"})
  ];
  Raw_Stream

3 – Az eredmények ellenőrzése

Az eredmények ellenőrzéséhez futtathat egy lekérdezést, amely ellenőrzi, hogy az adatok át lett-e alakítva, és a céltáblákra lett-e irányítva. Az alábbi példában az operátort union használja a forrás és a céltáblák eredményeinek egyetlen eredményhalmazba való összevonására.

Raw_Table | summarize Rows=count() by TableName = "Raw_Table"
| union (Device_Telemetry | summarize Rows=count() by TableName = "Device_Telemetry")
| union (Device_Alarms | summarize Rows=count() by TableName = "Device_Alarms")
| union (Error_Log | summarize Rows=count() by TableName = "Error_Log")
| sort by Rows desc

Kimenet

A következő kimenetnek kell megjelennie, amelyben a Raw_Table három sorból áll, és a céltáblák mindegyike egy-egy sort tartalmaz.

TableName Rows (Sorok)
Raw_Table 3
Error_Log 1
Device_Alarms 1
Device_Telemetry 1

Az erőforrások eltávolítása

Futtassa az alábbi parancsot az adatbázisban az oktatóanyagban létrehozott táblák és függvények törléséhez.

.execute database script <|
  .drop table Raw_Table
  .drop table Device_Telemetry
  .drop table Device_Alarms
  .drop table Error_Log
  .drop function Get_Telemetry
  .drop function Get_Alarms
  .drop function Log_Error