Condividi tramite


Esercitazione: Indirizzare i dati usando i criteri di aggiornamento delle tabelle

Quando i dati di origine comportano trasformazioni semplici e rapide, è consigliabile eseguirle upstream nella pipeline usando un flusso di eventi. Tuttavia, questo approccio potrebbe non funzionare correttamente per altre trasformazioni complesse o che richiedono funzionalità specializzate per funzionare.

In questa esercitazione apprenderai a:

L'esempio di questa esercitazione illustra come usare i criteri di aggiornamento per il routing dei dati per eseguire trasformazioni complesse per arricchire, pulire e trasformare i dati in fase di inserimento. Per un elenco di altri casi d'uso comuni, vedere Casi d'uso comuni per i criteri di aggiornamento delle tabelle.

Prerequisiti

1 - Creare tabelle e aggiornare i criteri

La procedura seguente illustra la creazione di una tabella di origine, funzioni di trasformazione, tabelle di destinazione e criteri di aggiornamento. L'esercitazione illustra come usare i criteri di aggiornamento delle tabelle per eseguire trasformazioni complesse e salvare i risultati in una o più tabelle di destinazione. Nell'esempio viene utilizzata una singola tabella di origine denominata Raw_Table e tre tabelle di destinazione denominate Device_Telemetry, Device_Alarms e Error_Log.

  1. Eseguire il comando seguente per creare una tabella denominata Raw_Table.

    .create table Raw_Table (RawData: dynamic)
    

    La tabella di origine è la posizione in cui vengono salvati i dati inseriti. La tabella include una singola colonna denominata RawData di tipo dynamic. Il tipo dinamico viene usato per archiviare i dati non elaborati così come sono, senza alcuno schema. Per altre informazioni, vedere comando .create table.

  2. Eseguire il comando seguente per creare una funzione denominata Get_Telemetry, Get_Alarms e Log_Error funzioni.

    .execute database script <|
      .create-or-alter function Get_Telemetry() {
        Raw_Table
        | where todynamic(RawData).MessageType == 'Telemetry'
        | extend
          Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)),
          DeviceId = tostring(RawData.DeviceId),
          DeviceType = tostring(RawData.DeviceType),
          SensorName = tostring(RawData.SensorName),
          SensorValue = toreal(RawData.SensorValue),
          SensorUnit = tostring(RawData.SensorUnit)
        | project-away RawData
      }
      .create-or-alter function Get_Alarms() {
        Raw_Table
        | where RawData.MessageType == 'Alarms'
        | extend
          Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)),
          DeviceId = tostring(RawData.DeviceId),
          DeviceType = tostring(RawData.DeviceTpe) ,
          AlarmType = tostring(RawData.AlarmType)
        | project-away RawData
      }
      .create-or-alter function Log_Error() {
        Raw_Table
        | where RawData.MessageType !in ('Telemetry', 'Alarms')
        | extend
          TimeStamp = datetime(now),
          ErrorType = 'Unknown MessageType'
        | project TimeStamp, RawData, ErrorType
      }
    

    Quando si creano criteri di aggiornamento, è possibile specificare uno script inline per l'esecuzione. È tuttavia consigliabile incapsulare la logica di trasformazione in una funzione. L'uso di una funzione migliora la manutenzione del codice. Quando arrivano nuovi dati, la funzione viene eseguita per trasformare i dati. La funzione può essere riutilizzata in più criteri di aggiornamento. Per altre informazioni, vedere comando della funzione .create.

  3. Eseguire il comando seguente per creare le tabelle di destinazione.

    .execute database script <|
      .create table Device_Telemetry (Timestamp: datetime, DeviceId: string, DeviceType: string, SensorName: string, SensorValue: real, SensorUnit: string)
      .set-or-append Device_Alarms <| Get_Alarms | take 0
      .set-or-append Error_Log <| Log_Error | take 0
    

    La tabella di destinazione deve avere lo stesso schema dell'output della funzione di trasformazione. È possibile creare tabelle di destinazione nei modi seguenti:

    • Usando il .create table comando e specificando manualmente lo schema come illustrato con la creazione della tabella Device_Telemetry . Tuttavia, questo approccio può essere soggetto a errori e dispendioso in termini di tempo.
    • Se è già stata creata una funzione per trasformare i dati, usare il .set-or-append comando . Questo metodo crea una nuova tabella con lo stesso schema dell'output della funzione, usando take 0 per assicurarsi che la funzione restituisca solo lo schema. Per altre informazioni, vedere comando .set-or-append.
  4. Eseguire il comando seguente per creare i criteri di aggiornamento per le tabelle di destinazione

    .execute database script <|
      .alter table Device_Telemetry policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Telemetry\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
      .alter table Device_Alarms policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Alarms\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
      .alter table Error_Log policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Log_Error\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
    

    Il .alter table policy update comando viene usato per collegare la tabella di origine, la funzione di trasformazione e la tabella di destinazione. I criteri di aggiornamento vengono creati nella tabella di destinazione e specificano la tabella di origine e la funzione di trasformazione. Per altre informazioni, vedere comando .alter table policy update.

2 - Inserire dati di esempio

Per testare i criteri di aggiornamento, è possibile inserire dati di esempio nella tabella di origine usando il .set-or-append comando . Per altre informazioni, vedere Inserire dati da una query.

.set-or-append Raw_Table <|
  let Raw_Stream = datatable(RawData: dynamic)
    [
    dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Telemetry", "DeviceType": "Laminator", "SensorName": "Temperature", "SensorValue": 78.3, "SensorUnit": "Celcius"}),
    dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Alarms", "DeviceType": "Laminator", "AlarmType": "Temperature threshold breached"}),
    dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Foo", "ErrorType": "Unknown"})
  ];
  Raw_Stream

3 - Verificare i risultati

Per convalidare i risultati, è possibile eseguire una query per verificare che i dati siano stati trasformati e indirizzati alle tabelle di destinazione. Nell'esempio seguente l'operatore union viene usato per combinare l'origine e i risultati delle tabelle di destinazione in un singolo set di risultati.

Raw_Table | summarize Rows=count() by TableName = "Raw_Table"
| union (Device_Telemetry | summarize Rows=count() by TableName = "Device_Telemetry")
| union (Device_Alarms | summarize Rows=count() by TableName = "Device_Alarms")
| union (Error_Log | summarize Rows=count() by TableName = "Error_Log")
| sort by Rows desc

Output

Verrà visualizzato l'output seguente in cui il Raw_Table ha tre righe e le tabelle di destinazione hanno una riga ciascuna.

TableName Righe
Raw_Table 3
Error_Log 1
Device_Alarms 1
Device_Telemetry 1

Pulire le risorse

Eseguire il comando seguente nel database per pulire le tabelle e le funzioni create in questa esercitazione.

.execute database script <|
  .drop table Raw_Table
  .drop table Device_Telemetry
  .drop table Device_Alarms
  .drop table Error_Log
  .drop function Get_Telemetry
  .drop function Get_Alarms
  .drop function Log_Error