Oktatóanyag: Adatok átirányítása táblafrissítési szabályzatokkal
Ha a forrásadatok egyszerű és gyors átalakításokat igényelnek, a legjobb, ha azokat a folyamat felső rétegében hajtja végre egy eseménystream használatával. Előfordulhat azonban, hogy ez a megközelítés nem működik jól más olyan átalakítások esetében, amelyek összetettek, vagy speciális funkciókat igényelnek a működéshez.
Eben az oktatóanyagban az alábbiakkal fog megismerkedni:
Az oktatóanyagban szereplő példa bemutatja, hogyan használhat frissítési szabályzatokat az adatátirányításhoz összetett átalakítások végrehajtásához az adatok betöltésekor történő bővítéséhez, tisztításához és átalakításához. Az egyéb gyakori használati esetek listáját lásd: A táblafrissítési szabályzatok gyakori használati esetei.
Előfeltételek
- Microsoft-fiók vagy Microsoft Entra felhasználói identitás. Nincs szükség Azure-előfizetésre.
- Egy Azure-Data Explorer-fürt és -adatbázis. Hozzon létre egy fürtöt és egy adatbázistvagy egy KQL-adatbázist Real-Time Analyticsben a Microsoft Fabricben.
1 – Táblák létrehozása és szabályzatok frissítése
Az alábbi lépések végigvezetik a forrástábla, az átalakítási függvények, a céltáblák és a frissítési szabályzatok létrehozásán. Az oktatóanyag bemutatja, hogyan használhat táblafrissítési szabályzatokat összetett átalakítások végrehajtására, és hogyan mentheti az eredményeket egy vagy több céltáblára. A példa egy Raw_Table nevű forrástáblát és három Device_Telemetry,Device_Alarms és Error_Log nevű céltáblát használ.
Futtassa a következő parancsot egy Raw_Table nevű tábla létrehozásához.
.create table Raw_Table (RawData: dynamic)
A rendszer a forrástáblában menti a betöltött adatokat. A tábla egyetlen , dinamikus típusú RawData nevű oszlopot tartalmaz. A dinamikus típus a nyers adatok séma nélküli tárolására szolgál. További információt a .create table parancsban talál.
Futtassa a következő parancsot egy Get_Telemetry, Get_Alarms és Log_Error nevű függvény létrehozásához.
.execute database script <| .create-or-alter function Get_Telemetry() { Raw_Table | where todynamic(RawData).MessageType == 'Telemetry' | extend Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)), DeviceId = tostring(RawData.DeviceId), DeviceType = tostring(RawData.DeviceType), SensorName = tostring(RawData.SensorName), SensorValue = toreal(RawData.SensorValue), SensorUnit = tostring(RawData.SensorUnit) | project-away RawData } .create-or-alter function Get_Alarms() { Raw_Table | where RawData.MessageType == 'Alarms' | extend Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)), DeviceId = tostring(RawData.DeviceId), DeviceType = tostring(RawData.DeviceTpe) , AlarmType = tostring(RawData.AlarmType) | project-away RawData } .create-or-alter function Log_Error() { Raw_Table | where RawData.MessageType !in ('Telemetry', 'Alarms') | extend TimeStamp = datetime(now), ErrorType = 'Unknown MessageType' | project TimeStamp, RawData, ErrorType }
Frissítési szabályzat létrehozásakor megadhatja a végrehajtáshoz szükséges, soron belüli szkriptet. Javasoljuk azonban, hogy foglalja bele az átalakítási logikát egy függvénybe. A függvények használata javítja a kódkarbantartást. Amikor új adatok érkeznek, a rendszer végrehajtja a függvényt az adatok átalakításához. A függvény több frissítési szabályzatban is újra felhasználható. További információ: .create függvényparancs.
A céltáblák létrehozásához futtassa az alábbi parancsot.
.execute database script <| .create table Device_Telemetry (Timestamp: datetime, DeviceId: string, DeviceType: string, SensorName: string, SensorValue: real, SensorUnit: string) .set-or-append Device_Alarms <| Get_Alarms | take 0 .set-or-append Error_Log <| Log_Error | take 0
A céltáblának ugyanazzal a sémával kell rendelkeznie, mint az átalakítási függvény kimenete. Céltáblákat a következő módokon hozhat létre:
- Használja a
.create table
parancsot, és manuálisan adja meg a sémát a Device_Telemetry tábla létrehozásakor bemutatott módon. Ez a megközelítés azonban hibalehetőséget és időigényes lehet. .set-or-append
Ha már létrehozott egy függvényt az adatok átalakításához, használja a parancsot. Ez a metódus létrehoz egy új táblát, amelynek a kimenete megegyezik a függvény kimenetével, így meggyőződhet arról,take 0
hogy a függvény csak a sémát adja vissza. További információ: .set-or-append parancs.
- Használja a
Futtassa az alábbi parancsot a céltáblák frissítési szabályzatainak létrehozásához
.execute database script <| .alter table Device_Telemetry policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Telemetry\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]" .alter table Device_Alarms policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Alarms\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]" .alter table Error_Log policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Log_Error\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
A
.alter table policy update
parancs a forrástábla, az átalakítási függvény és a céltábla összekapcsolására szolgál. A frissítési szabályzat a céltáblán jön létre, és megadja a forrástáblát és az átalakítási függvényt. További információ: .alter table policy update parancs.
2 – Mintaadatok betöltése
A frissítési szabályzatok teszteléséhez a parancs használatával mintaadatokat is betölthet a .set-or-append
forrástáblába. További információ: Adatok betöltése lekérdezésből.
.set-or-append Raw_Table <|
let Raw_Stream = datatable(RawData: dynamic)
[
dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Telemetry", "DeviceType": "Laminator", "SensorName": "Temperature", "SensorValue": 78.3, "SensorUnit": "Celcius"}),
dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Alarms", "DeviceType": "Laminator", "AlarmType": "Temperature threshold breached"}),
dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Foo", "ErrorType": "Unknown"})
];
Raw_Stream
3 – Az eredmények ellenőrzése
Az eredmények ellenőrzéséhez futtathat egy lekérdezést, amely ellenőrzi, hogy az adatok át lett-e alakítva, és a céltáblákra lett-e irányítva. Az alábbi példában az operátort union
használja a forrás és a céltáblák eredményeinek egyetlen eredményhalmazba való összevonására.
Raw_Table | summarize Rows=count() by TableName = "Raw_Table"
| union (Device_Telemetry | summarize Rows=count() by TableName = "Device_Telemetry")
| union (Device_Alarms | summarize Rows=count() by TableName = "Device_Alarms")
| union (Error_Log | summarize Rows=count() by TableName = "Error_Log")
| sort by Rows desc
Kimenet
A következő kimenetnek kell megjelennie, amelyben a Raw_Table három sorból áll, és a céltáblák mindegyike egy-egy sort tartalmaz.
TableName | Rows (Sorok) |
---|---|
Raw_Table | 3 |
Error_Log | 1 |
Device_Alarms | 1 |
Device_Telemetry | 1 |
Az erőforrások eltávolítása
Futtassa az alábbi parancsot az adatbázisban az oktatóanyagban létrehozott táblák és függvények törléséhez.
.execute database script <|
.drop table Raw_Table
.drop table Device_Telemetry
.drop table Device_Alarms
.drop table Error_Log
.drop function Get_Telemetry
.drop function Get_Alarms
.drop function Log_Error
Kapcsolódó tartalom
Visszajelzés
https://aka.ms/ContentUserFeedback.
Hamarosan elérhető: 2024-ben fokozatosan kivezetjük a GitHub-problémákat a tartalom visszajelzési mechanizmusaként, és lecseréljük egy új visszajelzési rendszerre. További információ:Visszajelzés küldése és megtekintése a következőhöz: