Samouczek: kierowanie danych przy użyciu zasad aktualizacji tabeli
Gdy dane źródłowe obejmują proste i szybkie przekształcenia, najlepiej jest wykonać je nadrzędnie w potoku przy użyciu strumienia zdarzeń. Jednak takie podejście może nie działać dobrze w przypadku innych przekształceń, które są złożone lub wymagają wyspecjalizowanych funkcji do działania.
Ten samouczek zawiera informacje na temat wykonywania następujących czynności:
W przykładzie w tym samouczku pokazano, jak używać zasad aktualizacji do routingu danych w celu wykonywania złożonych przekształceń w celu wzbogacania, czyszczenia i przekształcania danych w czasie pozyskiwania. Aby uzyskać listę innych typowych przypadków użycia, zobacz Typowe przypadki użycia zasad aktualizacji tabel.
Wymagania wstępne
- Konto Microsoft lub tożsamość użytkownika Microsoft Entra. Subskrypcja platformy Azure nie jest wymagana.
- Baza danych i klaster usługi Azure Data Explorer. Utwórz klaster i bazę danychlubbazę danych KQL w usłudze Real-Time Analytics w usłudze Microsoft Fabric.
1 — Tworzenie tabel i aktualizowanie zasad
Poniższe kroki zawierają instrukcje tworzenia tabeli źródłowej, funkcji przekształcania, tabel docelowych i zasad aktualizacji. W tym samouczku pokazano, jak używać zasad aktualizacji tabel do wykonywania złożonych przekształceń i zapisywania wyników w co najmniej jednej tabeli docelowej. W przykładzie użyto pojedynczej tabeli źródłowej o nazwie Raw_Table i trzech tabel docelowych o nazwach Device_Telemetry, Device_Alarms i Error_Log.
Uruchom następujące polecenie, aby utworzyć tabelę o nazwie Raw_Table.
.create table Raw_Table (RawData: dynamic)
Tabela źródłowa to miejsce, w którym są zapisywane pozyskane dane. Tabela ma jedną kolumnę o nazwie RawData typu dynamic. Typ dynamiczny jest używany do przechowywania nieprzetworzonych danych bez żadnego schematu. Aby uzyskać więcej informacji, zobacz polecenie .create table.
Uruchom następujące polecenie, aby utworzyć funkcję o nazwie Get_Telemetry, Get_Alarms i Log_Error .
.execute database script <| .create-or-alter function Get_Telemetry() { Raw_Table | where todynamic(RawData).MessageType == 'Telemetry' | extend Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)), DeviceId = tostring(RawData.DeviceId), DeviceType = tostring(RawData.DeviceType), SensorName = tostring(RawData.SensorName), SensorValue = toreal(RawData.SensorValue), SensorUnit = tostring(RawData.SensorUnit) | project-away RawData } .create-or-alter function Get_Alarms() { Raw_Table | where RawData.MessageType == 'Alarms' | extend Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)), DeviceId = tostring(RawData.DeviceId), DeviceType = tostring(RawData.DeviceTpe) , AlarmType = tostring(RawData.AlarmType) | project-away RawData } .create-or-alter function Log_Error() { Raw_Table | where RawData.MessageType !in ('Telemetry', 'Alarms') | extend TimeStamp = datetime(now), ErrorType = 'Unknown MessageType' | project TimeStamp, RawData, ErrorType }
Podczas tworzenia zasad aktualizacji można określić skrypt wbudowany do wykonania. Zalecamy jednak hermetyzowanie logiki przekształcania w funkcję. Korzystanie z funkcji usprawnia konserwację kodu. Po nadejściu nowych danych funkcja jest wykonywana w celu przekształcenia danych. Funkcja może być ponownie użyta w wielu zasadach aktualizacji. Aby uzyskać więcej informacji, zobacz polecenie .create function.
Uruchom następujące polecenie, aby utworzyć tabele docelowe.
.execute database script <| .create table Device_Telemetry (Timestamp: datetime, DeviceId: string, DeviceType: string, SensorName: string, SensorValue: real, SensorUnit: string) .set-or-append Device_Alarms <| Get_Alarms | take 0 .set-or-append Error_Log <| Log_Error | take 0
Tabela docelowa musi mieć ten sam schemat co dane wyjściowe funkcji przekształcania. Tabele docelowe można tworzyć na następujące sposoby:
.create table
Przy użyciu polecenia i ręcznie określając schemat, jak pokazano podczas tworzenia tabeli Device_Telemetry. Jednak takie podejście może być podatne na błędy i czasochłonne..set-or-append
Użyj polecenia , jeśli funkcja została już utworzona w celu przekształcenia danych. Ta metoda tworzy nową tabelę z tym samym schematem co dane wyjściowe funkcji, używająctake 0
polecenia , aby upewnić się, że funkcja zwraca tylko schemat. Aby uzyskać więcej informacji, zobacz polecenie .set-or-append.
Uruchom następujące polecenie, aby utworzyć zasady aktualizacji dla tabel docelowych
.execute database script <| .alter table Device_Telemetry policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Telemetry\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]" .alter table Device_Alarms policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Alarms\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]" .alter table Error_Log policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Log_Error\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
Polecenie
.alter table policy update
służy do łączenia tabeli źródłowej, funkcji transformowania i tabeli docelowej. Zasady aktualizacji są tworzone w tabeli docelowej i określa tabelę źródłową i funkcję przekształcania. Aby uzyskać więcej informacji, zobacz polecenie alter table policy update.
2 — Pozyskiwanie przykładowych danych
Aby przetestować zasady aktualizacji, możesz pozyskać przykładowe dane do tabeli źródłowej .set-or-append
przy użyciu polecenia . Aby uzyskać więcej informacji, zobacz Pozyskiwanie danych z zapytania.
.set-or-append Raw_Table <|
let Raw_Stream = datatable(RawData: dynamic)
[
dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Telemetry", "DeviceType": "Laminator", "SensorName": "Temperature", "SensorValue": 78.3, "SensorUnit": "Celcius"}),
dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Alarms", "DeviceType": "Laminator", "AlarmType": "Temperature threshold breached"}),
dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Foo", "ErrorType": "Unknown"})
];
Raw_Stream
3 — Weryfikowanie wyników
Aby zweryfikować wyniki, możesz uruchomić zapytanie, aby sprawdzić, czy dane zostały przekształcone i kierowane do tabel docelowych. W poniższym przykładzie union
operator jest używany do łączenia źródła i wyników z tabel docelowych w jeden zestaw wyników.
Raw_Table | summarize Rows=count() by TableName = "Raw_Table"
| union (Device_Telemetry | summarize Rows=count() by TableName = "Device_Telemetry")
| union (Device_Alarms | summarize Rows=count() by TableName = "Device_Alarms")
| union (Error_Log | summarize Rows=count() by TableName = "Error_Log")
| sort by Rows desc
Dane wyjściowe
Powinny zostać wyświetlone następujące dane wyjściowe, w których Raw_Table ma trzy wiersze, a tabele docelowe mają jeden wiersz.
TableName | Rows (Wiersze) |
---|---|
Raw_Table | 3 |
Error_Log | 1 |
Device_Alarms | 1 |
Device_Telemetry | 1 |
Czyszczenie zasobów
Uruchom następujące polecenie w bazie danych, aby wyczyścić tabele i funkcje utworzone w tym samouczku.
.execute database script <|
.drop table Raw_Table
.drop table Device_Telemetry
.drop table Device_Alarms
.drop table Error_Log
.drop function Get_Telemetry
.drop function Get_Alarms
.drop function Log_Error
Zawartość pokrewna
Opinia
https://aka.ms/ContentUserFeedback.
Dostępne już wkrótce: W 2024 r. będziemy stopniowo wycofywać zgłoszenia z serwisu GitHub jako mechanizm przesyłania opinii na temat zawartości i zastępować go nowym systemem opinii. Aby uzyskać więcej informacji, sprawdź:Prześlij i wyświetl opinię dla