Compartir a través de


Ingesta de datos mediante Azure Stream Analytics en Azure Cosmos DB for PostgreSQL

SE APLICA A: Azure Cosmos DB for PostgreSQL (con tecnología de la extensión de base de datos de Citus en PostgreSQL)

Azure Stream Analytics es un motor de procesamiento de eventos y análisis en tiempo real que está diseñado para procesar grandes volúmenes de datos de streaming rápido de dispositivos, sensores y sitios web. También está disponible en el entorno de ejecución de Azure IoT Edge, lo que permite el procesamiento de datos en dispositivos IoT.

Diagrama que muestra la arquitectura de Stream Analytics con Azure Cosmos DB for PostgreSQL.

Azure Cosmos DB for PostgreSQL brilla en cargas de trabajo en tiempo real, como IoT. Para estas cargas de trabajo, Stream Analytics puede actuar como una alternativa sin código, eficaz y escalable para previamente procesar y transmitir datos de Azure Event Hubs, Azure IoT Hub y Azure Blob Storage en Azure Cosmos DB for PostgreSQL.

Pasos para configurar Stream Analytics

Nota

En este artículo se usa Azure IoT Hub como ejemplo de origen de datos, pero la técnica es aplicable a cualquier otro origen que admita Stream Analytics. Además, los siguientes datos de demostración proceden del simulador de datos de telemetría de dispositivo de Azure IoT. En este artículo no se trata la configuración del simulador.

  1. En Azure Portal, expanda el menú del portal en la esquina superior izquierda y seleccione Crear un recurso.

  2. Seleccione Analytics>Trabajo de Stream Analytics en la lista de resultados.

  3. Rellene la página Nuevo trabajo de Stream Analytics con la siguiente información:

    • En Suscripción, seleccione la suscripción de Azure que quiere usar para este trabajo.
    • Grupo de recursos: seleccione el mismo grupo de recursos que el del centro de IoT.
    • Nombre: escriba un nombre para identificar el trabajo de Stream Analytics.
    • Región: seleccione la región de Azure para hospedar el trabajo de Stream Analytics. Para un mejor rendimiento y reducir el costo de la transferencia de datos, use la ubicación geográfica más cercana a los usuarios.
    • Entorno de hospedaje: seleccione Nube para implementar en la nube de Azure o Edge para implementar en un dispositivo IoT Edge.
    • Unidades de streaming: seleccione el número de unidades de streaming para los recursos informáticos que necesita para ejecutar el trabajo.
  4. Seleccione Revisar y crear y, luego, Crear. Debería ver una notificación Implementación en curso en la esquina superior derecha.

    Captura de pantalla que muestra el formulario Crear trabajo de Stream Analytics.

  5. Configuración de la entrada del trabajo.

    Captura de pantalla que muestra la configuración de la entrada de trabajo en Stream Analytics.

    1. Cuando se complete la implementación de recursos, vaya al trabajo de Stream Analytics. Seleccione Entradas>Agregar entrada de flujo>IoT Hub.

    2. Rellene la página IoT Hub con los siguientes valores:

      • Alias de entrada: escriba un nombre para identificar la entrada del trabajo.
      • Suscripción: seleccione la suscripción de Azure que tenga la cuenta de IoT Hub.
      • IoT Hub: seleccione el nombre del centro de IoT.
    3. Seleccione Guardar.

    4. Cuando agregue el flujo de entrada, también podrá comprobar o descargar el conjunto de datos que fluye. En el código siguiente se muestran los datos de un evento de ejemplo:

      {
         "deviceId": "sim000001",
         "time": "2022-04-25T13:49:11.6892185Z",
         "counter": 1,
         "EventProcessedUtcTime": "2022-04-25T13:49:41.4791613Z",
         "PartitionId": 3,
         "EventEnqueuedUtcTime": "2022-04-25T13:49:12.1820000Z",
         "IoTHub": {
           "MessageId": null,
           "CorrelationId": "aaaa0000-bb11-2222-33cc-444444dddddd",
           "ConnectionDeviceId": "sim000001",
           "ConnectionDeviceGenerationId": "637842405470327268",
           "EnqueuedTime": "2022-04-25T13:49:11.7060000Z"
         }
      }
      
  6. Configure la salida del trabajo.

    1. En la página Trabajo de Stream Analytics, seleccione Salidas>Agregar>Base de datos PostgreSQL (versión preliminar).

      Captura de pantalla que muestra la selección de la salida de la base de datos PostgreSQL.

    2. Rellene la página de Azure PostgreSQL con los valores siguientes:

      • Alias de la salida: escriba un nombre para identificar la salida del trabajo.
      • Seleccione Proporcionar configuración de la base de datos PostgreSQL manualmente y escriba la información correspondiente en Nombre de dominio completo del servidor, Base de datos, Tabla, Nombre de usuario y Contraseña. En el conjunto de datos de ejemplo, use la tabla device_data.
    3. Seleccione Guardar.

    Configuración de la salida del trabajo en Azure Stream Analytics.

  7. Defina la consulta de transformación.

    Consulta de transformación en Azure Stream Analytics.

    1. En la página Trabajo de Stream Analytics, seleccione Consulta en el menú de la izquierda.

    2. En este tutorial, solo se ingieren los eventos alternativos de IoT Hub en Azure Cosmos DB for PostgreSQL, para reducir el tamaño general de los datos. Copie y pegue la siguiente consulta en el panel de consulta:

      select
         counter,
         iothub.connectiondeviceid,
         iothub.correlationid,
         iothub.connectiondevicegenerationid,
         iothub.enqueuedtime
      from
         [src-iot-hub]
      where counter%2 = 0;
      
    3. Seleccione Guardar consulta.

      Nota

      La consulta se usa para muestrear los datos, así como extraer los atributos que necesite del flujo de datos. La opción de consulta personalizada con Stream Analytics resulta útil para el procesamiento previo o transformación de los datos antes de ingerirlos en la base de datos.

  8. Intente empezar el trabajo de Stream Analytics y compruebe la salida.

    1. Vuelva a la página de información general del trabajo y seleccione Iniciar.

    2. En la página Iniciar trabajo, seleccione Ahora en Hora de inicio de la salida del trabajo y, después, elija Iniciar.

    3. El trabajo tarda un tiempo en iniciarse por primera vez, pero, cuando se desencadena, sigue ejecutándose a medida que llegan los datos. Después de unos minutos, puede consultar el clúster para comprobar que los datos se cargaron.

      citus=> SELECT * FROM public.device_data LIMIT 10;
      
       counter | connectiondeviceid |            correlationid             | connectiondevicegenerationid |         enqueuedtime
      ---------+--------------------+--------------------------------------+------------------------------+------------------------------
             2 | sim000001          | 7745c600-5663-44bc-a70b-3e249f6fc302 | 637842405470327268           | 2022-05-25T18:24:03.4600000Z
             4 | sim000001          | 389abfde-5bec-445c-a387-18c0ed7af227 | 637842405470327268           | 2022-05-25T18:24:05.4600000Z
             6 | sim000001          | 3932ce3a-4616-470d-967f-903c45f71d0f | 637842405470327268           | 2022-05-25T18:24:07.4600000Z
             8 | sim000001          | 4bd8ecb0-7ee1-4238-b034-4e03cb50f11a | 637842405470327268           | 2022-05-25T18:24:09.4600000Z
            10 | sim000001          | 26cebc68-934e-4e26-80db-e07ade3775c0 | 637842405470327268           | 2022-05-25T18:24:11.4600000Z
            12 | sim000001          | 067af85c-a01c-4da0-b208-e4d31a24a9db | 637842405470327268           | 2022-05-25T18:24:13.4600000Z
            14 | sim000001          | 740e5002-4bb9-4547-8796-9d130f73532d | 637842405470327268           | 2022-05-25T18:24:15.4600000Z
            16 | sim000001          | 343ed04f-0cc0-4189-b04a-68e300637f0e | 637842405470327268           | 2022-05-25T18:24:17.4610000Z
            18 | sim000001          | 54157941-2405-407d-9da6-f142fc8825bb | 637842405470327268           | 2022-05-25T18:24:19.4610000Z
            20 | sim000001          | 219488e5-c48a-4f04-93f6-12c11ed00a30 | 637842405470327268           | 2022-05-25T18:24:21.4610000Z
      (10 rows)
      

Nota

La característica Probar conexión no se admite actualmente para Azure Cosmos DB for PostgreSQL y podría producir un error, incluso cuando la conexión funciona correctamente.

Pasos siguientes

Obtenga información sobre cómo crear un panel en tiempo real con Azure Cosmos DB for PostgreSQL.