Come inserire dati usando Analisi di flusso di Azure in Azure Cosmos DB for PostgreSQL

SI APPLICA A: Azure Cosmos DB for PostgreSQL (basato su estensione database Citus per PostgreSQL)

Analisi di flusso di Azure è un motore di analisi ed elaborazione di eventi in tempo reale progettato per elaborare volumi elevati di dati di streaming veloce da dispositivi, sensori e siti Web. È disponibile anche nel runtime di Azure IoT Edge, abilitando l'elaborazione dei dati nei dispositivi IoT.

Diagram that shows Stream Analytics architecture with Azure Cosmos DB for PostgreSQL.

Azure Cosmos DB for PostgreSQL eccelle con carichi di lavoro in tempo reale, ad esempio IoT. Per questi carichi di lavoro, Analisi di flusso può fungere da alternativa senza codice, efficiente e scalabile per pre-elaborare e trasmettere dati da Hub eventi di Azure, hub IoT di Azure e Archiviazione BLOB di Azure in Azure Cosmos DB for PostgreSQL.

Passaggi per configurare Analisi di flusso

Nota

Questo articolo usa Hub IoT di Azure come origine dati di esempio, ma la tecnica è applicabile a qualsiasi altra origine supportata da Analisi di flusso. Inoltre, i dati dimostrativi seguenti provengono dal Simulatore di telemetria del dispositivo IoT di Azure. Questo articolo non descrive la configurazione del simulatore.

  1. Nel portale di Azure espandere il menu del portale in alto a sinistra e selezionare Crea una risorsa.

  2. Selezionare Analisi>Processo di Analisi di flusso nell'elenco risultati.

  3. Compilare la pagina Nuovo processo di Analisi di flusso con le informazioni seguenti:

    • Sottoscrizione: selezionare la sottoscrizione di Azure che si vuole usare per questo processo.
    • Gruppo di risorse: selezionare lo stesso gruppo di risorse dell'hub IoT.
    • Nome: immettere un nome per identificare il processo di Analisi di flusso.
    • Area: selezionare l'area di Azure per ospitare il processo di Analisi di flusso. Usare la posizione geografica più vicina agli utenti per ottenere prestazioni migliori e ridurre i costi di trasferimento dati.
    • Ambiente di hosting: selezionare Cloud per distribuire nel cloud di Azure o Edge per distribuire in un dispositivo IoT Edge.
    • Unità di streaming: selezionare il numero di unità di streaming per le risorse di calcolo necessarie per eseguire il processo.
  4. Seleziona Rivedi e crea e quindi seleziona Crea. Verrà visualizzata una notifica Distribuzione in corso in alto a destra.

    Screenshot that shows the create Stream Analytics job form.

  5. Configurare l'input del processo.

    Screenshot that shows configuring job input in Stream Analytics.

    1. Al termine della distribuzione delle risorse, passare al processo di Analisi di flusso. Selezionare Input>Aggiungi input del flusso>Hub IoT.

    2. Compilare la pagina Hub IoT con i valori seguenti:

      • Alias di input: immettere un nome per identificare l'input del processo.
      • Sottoscrizione: selezionare la sottoscrizione di Azure con l'account dell'hub IoT.
      • Hub IoT: selezionare il nome dell'hub IoT.
    3. Seleziona Salva.

    4. Dopo aver aggiunto il flusso di input, è anche possibile verificare o scaricare il set di dati del flusso. Il codice seguente mostra i dati per un evento di esempio:

      {
         "deviceId": "sim000001",
         "time": "2022-04-25T13:49:11.6892185Z",
         "counter": 1,
         "EventProcessedUtcTime": "2022-04-25T13:49:41.4791613Z",
         "PartitionId": 3,
         "EventEnqueuedUtcTime": "2022-04-25T13:49:12.1820000Z",
         "IoTHub": {
           "MessageId": null,
           "CorrelationId": "990407b8-4332-4cb6-a8f4-d47f304397d8",
           "ConnectionDeviceId": "sim000001",
           "ConnectionDeviceGenerationId": "637842405470327268",
           "EnqueuedTime": "2022-04-25T13:49:11.7060000Z"
         }
      }
      
  6. Configurare l'output del processo.

    1. Nella pagina processo di Analisi di flusso selezionare Output>Aggiungi>database PostgreSQL (anteprima).

      Screenshot that shows selecting PostgreSQL database output.

    2. Compilare la pagina PostgreSQL di Azure con i valori seguenti:

      • Alias di output: immettere un nome per identificare l'output del processo.
      • Selezionare Specificare manualmente le impostazioni del database PostgreSQL e immettere Nome di dominio completo del server, Database, Tabella, Nome utente e Password. Dal set di dati di esempio usare la tabella device_data.
    3. Seleziona Salva.

    Configure job output in Azure Stream Analytics.

  7. Definire la query di trasformazione.

    Transformation query in Azure Stream Analytics.

    1. Nella pagina processo di Analisi di flusso selezionare Query dal menu a sinistra.

    2. Per questa esercitazione si inseriscono solo gli eventi alternativi dall'hub IoT in Azure Cosmos DB for PostgreSQL, per ridurre le dimensioni complessive dei dati. Copiare e incollare la query seguente nel relativo riquadro:

      select
         counter,
         iothub.connectiondeviceid,
         iothub.correlationid,
         iothub.connectiondevicegenerationid,
         iothub.enqueuedtime
      from
         [src-iot-hub]
      where counter%2 = 0;
      
    3. Selezionare Salva query.

      Nota

      Si usa la query per non solo campionare i dati, ma anche per estrarre gli attributi desiderati dal flusso dei dati. L'opzione di query personalizzata con Analisi di flusso è utile per la pre-elaborazione/trasformazione dei dati prima che vengano inseriti nel database.

  8. Avviare il processo di Analisi di flusso e verificare l'output.

    1. Tornare alla pagina della panoramica del processo e selezionare Avvia.

    2. Nella pagina Avvia processo, selezionare Ora per l’ora di inizio dell'output del processo, quindi selezionare Avvia.

    3. L'avvio del processo richiede tempo, ma una volta attivato continua a essere eseguito man mano che arrivano i dati. Dopo alcuni minuti, è possibile eseguire una query sul cluster per verificare che i dati caricati.

      citus=> SELECT * FROM public.device_data LIMIT 10;
      
       counter | connectiondeviceid |            correlationid             | connectiondevicegenerationid |         enqueuedtime
      ---------+--------------------+--------------------------------------+------------------------------+------------------------------
             2 | sim000001          | 7745c600-5663-44bc-a70b-3e249f6fc302 | 637842405470327268           | 2022-05-25T18:24:03.4600000Z
             4 | sim000001          | 389abfde-5bec-445c-a387-18c0ed7af227 | 637842405470327268           | 2022-05-25T18:24:05.4600000Z
             6 | sim000001          | 3932ce3a-4616-470d-967f-903c45f71d0f | 637842405470327268           | 2022-05-25T18:24:07.4600000Z
             8 | sim000001          | 4bd8ecb0-7ee1-4238-b034-4e03cb50f11a | 637842405470327268           | 2022-05-25T18:24:09.4600000Z
            10 | sim000001          | 26cebc68-934e-4e26-80db-e07ade3775c0 | 637842405470327268           | 2022-05-25T18:24:11.4600000Z
            12 | sim000001          | 067af85c-a01c-4da0-b208-e4d31a24a9db | 637842405470327268           | 2022-05-25T18:24:13.4600000Z
            14 | sim000001          | 740e5002-4bb9-4547-8796-9d130f73532d | 637842405470327268           | 2022-05-25T18:24:15.4600000Z
            16 | sim000001          | 343ed04f-0cc0-4189-b04a-68e300637f0e | 637842405470327268           | 2022-05-25T18:24:17.4610000Z
            18 | sim000001          | 54157941-2405-407d-9da6-f142fc8825bb | 637842405470327268           | 2022-05-25T18:24:19.4610000Z
            20 | sim000001          | 219488e5-c48a-4f04-93f6-12c11ed00a30 | 637842405470327268           | 2022-05-25T18:24:21.4610000Z
      (10 rows)
      

Nota

La funzionalità Connessione test non è attualmente supportata per Azure Cosmos DB for PostgreSQL e potrebbe generare un errore, anche quando la connessione funziona correttamente.

Passaggi successivi

Informazioni su come creare un dashboard in tempo reale con Azure Cosmos DB for PostgreSQL.