Tutorial: Enrutamiento de datos mediante directivas de actualización de tablas

Cuando los datos de origen implican transformaciones sencillas y rápidas, es mejor realizarlas ascendentes en la canalización mediante un flujo de eventos. Sin embargo, este enfoque podría no funcionar bien para otras transformaciones que son complejas o requieren una funcionalidad especializada para funcionar.

En este tutorial, obtendrá información sobre cómo:

En el ejemplo de este tutorial se muestra cómo usar directivas de actualización para el enrutamiento de datos para realizar transformaciones complejas para enriquecer, limpiar y transformar datos en tiempo de ingesta. Para obtener una lista de otros casos de uso comunes, consulte Casos de uso comunes para las directivas de actualización de tablas.

Requisitos previos

1- Creación de tablas y directivas de actualización

Los pasos siguientes le guían a través de la creación de una tabla de origen, funciones de transformación, tablas de destino y directivas de actualización. En el tutorial se muestra cómo usar directivas de actualización de tablas para realizar transformaciones complejas y guardar los resultados en una o varias tablas de destino. En el ejemplo se usa una única tabla de origen denominada Raw_Table y tres tablas de destino denominadas Device_Telemetry, Device_Alarms y Error_Log.

  1. Ejecute el siguiente comando para crear una tabla denominada Raw_Table.

    .create table Raw_Table (RawData: dynamic)
    

    La tabla de origen es donde se guardan los datos ingeridos. La tabla tiene una sola columna denominada RawData de tipo dinámico. El tipo dinámico se usa para almacenar los datos sin procesar tal y como están, sin ningún esquema. Para obtener más información, vea El comando .create table.

  2. Ejecute el siguiente comando para crear una función denominada Get_Telemetry, Get_Alarms y funciones de Log_Error .

    .execute database script <|
      .create-or-alter function Get_Telemetry() {
        Raw_Table
        | where todynamic(RawData).MessageType == 'Telemetry'
        | extend
          Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)),
          DeviceId = tostring(RawData.DeviceId),
          DeviceType = tostring(RawData.DeviceType),
          SensorName = tostring(RawData.SensorName),
          SensorValue = toreal(RawData.SensorValue),
          SensorUnit = tostring(RawData.SensorUnit)
        | project-away RawData
      }
      .create-or-alter function Get_Alarms() {
        Raw_Table
        | where RawData.MessageType == 'Alarms'
        | extend
          Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)),
          DeviceId = tostring(RawData.DeviceId),
          DeviceType = tostring(RawData.DeviceTpe) ,
          AlarmType = tostring(RawData.AlarmType)
        | project-away RawData
      }
      .create-or-alter function Log_Error() {
        Raw_Table
        | where RawData.MessageType !in ('Telemetry', 'Alarms')
        | extend
          TimeStamp = datetime(now),
          ErrorType = 'Unknown MessageType'
        | project TimeStamp, RawData, ErrorType
      }
    

    Al crear una directiva de actualización, puede especificar un script en línea para su ejecución. Sin embargo, se recomienda encapsular la lógica de transformación en una función. El uso de una función mejora el mantenimiento del código. Cuando llegan nuevos datos, la función se ejecuta para transformar los datos. La función se puede reutilizar en varias directivas de actualización. Para obtener más información, vea El comando de función .create.

  3. Ejecute el siguiente comando para crear las tablas de destino.

    .execute database script <|
      .create table Device_Telemetry (Timestamp: datetime, DeviceId: string, DeviceType: string, SensorName: string, SensorValue: real, SensorUnit: string)
      .set-or-append Device_Alarms <| Get_Alarms | take 0
      .set-or-append Error_Log <| Log_Error | take 0
    

    La tabla de destino debe tener el mismo esquema que la salida de la función de transformación. Puede crear tablas de destino de las siguientes maneras:

    • Con el .create table comando y especificando manualmente el esquema como se muestra con la creación de la tabla Device_Telemetry . Sin embargo, este enfoque puede ser propenso a errores y llevar mucho tiempo.
    • Si ya ha creado una función para transformar los datos, use el .set-or-append comando . Este método crea una nueva tabla con el mismo esquema que la salida de la función, utilizando take 0 para asegurarse de que la función solo devuelve el esquema. Para obtener más información, vea El comando .set-or-append.
  4. Ejecute el siguiente comando para crear las directivas de actualización para las tablas de destino.

    .execute database script <|
      .alter table Device_Telemetry policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Telemetry\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
      .alter table Device_Alarms policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Alarms\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
      .alter table Error_Log policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Log_Error\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
    

    El .alter table policy update comando se usa para vincular la tabla de origen, la función de transformación y la tabla de destino. La directiva de actualización se crea en la tabla de destino y especifica la tabla de origen y la función de transformación. Para obtener más información, consulte el comando .alter table policy update.

2- Ingesta de datos de ejemplo

Para probar las directivas de actualización, puede ingerir datos de ejemplo en la tabla de origen mediante el .set-or-append comando . Para más información, consulte Ingesta de datos de una consulta.

.set-or-append Raw_Table <|
  let Raw_Stream = datatable(RawData: dynamic)
    [
    dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Telemetry", "DeviceType": "Laminator", "SensorName": "Temperature", "SensorValue": 78.3, "SensorUnit": "Celcius"}),
    dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Alarms", "DeviceType": "Laminator", "AlarmType": "Temperature threshold breached"}),
    dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Foo", "ErrorType": "Unknown"})
  ];
  Raw_Stream

3 - Comprobación de los resultados

Para validar los resultados, puede ejecutar una consulta para comprobar que los datos se transformaron y enrutaron a las tablas de destino. En el ejemplo siguiente, el union operador se usa para combinar el origen y los resultados de las tablas de destino en un único conjunto de resultados.

Raw_Table | summarize Rows=count() by TableName = "Raw_Table"
| union (Device_Telemetry | summarize Rows=count() by TableName = "Device_Telemetry")
| union (Device_Alarms | summarize Rows=count() by TableName = "Device_Alarms")
| union (Error_Log | summarize Rows=count() by TableName = "Error_Log")
| sort by Rows desc

Salida

Debería ver la siguiente salida en la que el Raw_Table tiene tres filas y las tablas de destino tienen una fila cada una.

TableName Filas
Raw_Table 3
Error_Log 1
Device_Alarms 1
Device_Telemetry 1

Limpieza de recursos

Ejecute el siguiente comando en la base de datos para limpiar las tablas y funciones creadas en este tutorial.

.execute database script <|
  .drop table Raw_Table
  .drop table Device_Telemetry
  .drop table Device_Alarms
  .drop table Error_Log
  .drop function Get_Telemetry
  .drop function Get_Alarms
  .drop function Log_Error