Gegevens opnemen met behulp van Azure Stream Analytics in Azure Cosmos DB for PostgreSQL

VAN TOEPASSING OP: Azure Cosmos DB for PostgreSQL (mogelijk gemaakt door de Citus-database-extensie voor PostgreSQL)

Azure Stream Analytics is een engine voor realtime analyse en gebeurtenisverwerking die is ontworpen voor het verwerken van grote hoeveelheden snelle streaminggegevens van apparaten, sensoren en websites. Het is ook beschikbaar op de Azure IoT Edge-runtime, waardoor gegevensverwerking op IoT-apparaten mogelijk is.

Diagram met de Stream Analytics-architectuur met Azure Cosmos DB for PostgreSQL.

Azure Cosmos DB for PostgreSQL is een goed voorbeeld van realtime workloads, zoals IoT. Voor deze workloads kan Stream Analytics fungeren als een goed presterend en schaalbaar alternatief zonder code voor het vooraf verwerken en streamen van gegevens uit Azure Event Hubs, Azure IoT Hub en Azure Blob Storage in Azure Cosmos DB for PostgreSQL.

Stappen voor het instellen van Stream Analytics

Notitie

In dit artikel wordt Azure IoT Hub als voorbeeldgegevensbron gebruikt, maar de techniek is van toepassing op elke andere bron die wordt ondersteund door Stream Analytics. De volgende demonstratiegegevens zijn ook afkomstig van de Telemetriesimulator voor Azure IoT-apparaten. In dit artikel wordt het instellen van de simulator niet behandeld.

  1. Vouw in de Azure Portal het portalmenu linksboven uit en selecteer Een resource maken.

  2. Selecteer Analytics>Stream Analytics job in de lijst met resultaten.

  3. Vul de pagina Nieuwe Stream Analytics-taak in met de volgende gegevens:

    • Abonnement : selecteer het Azure-abonnement dat u voor deze taak wilt gebruiken.
    • Resourcegroep : selecteer dezelfde resourcegroep als uw IoT-hub.
    • Naam : voer een naam in om uw Stream Analytics-taak te identificeren.
    • Regio : selecteer de Azure-regio om uw Stream Analytics-taak te hosten. Gebruik de geografische locatie die zich het dichtst bij uw gebruikers bevindt voor betere prestaties en om de kosten voor gegevensoverdracht te verlagen.
    • Hostingomgeving: selecteer Cloud om te implementeren in de Azure-cloud of Edge om te implementeren op een IoT Edge apparaat.
    • Streaming-eenheden : selecteer het aantal streaming-eenheden voor de rekenresources die u nodig hebt om de taak uit te voeren.
  4. Selecteer Controleren en maken en selecteer vervolgens Maken. Rechtsboven ziet u de melding Implementatie wordt uitgevoerd .

    Schermopname van het formulier Stream Analytics-taak maken.

  5. Taakinvoer configureren.

    Schermopname van het configureren van taakinvoer in Stream Analytics.

    1. Zodra de resource-implementatie is voltooid, gaat u naar uw Stream Analytics-taak. Selecteer Invoer Stroominvoer>>toevoegenIoT Hub.

    2. Vul de volgende waarden in op de pagina IoT Hub:

      • Invoeralias : voer een naam in om de taakinvoer te identificeren.
      • Abonnement: selecteer het Azure-abonnement met uw IoT Hub-account.
      • IoT Hub: selecteer de naam van uw IoT-hub.
    3. Selecteer Opslaan.

    4. Zodra de invoerstroom is toegevoegd, kunt u ook de gegevensset controleren of downloaden die binnenstroomt. De volgende code toont de gegevens voor een voorbeeld van een gebeurtenis:

      {
         "deviceId": "sim000001",
         "time": "2022-04-25T13:49:11.6892185Z",
         "counter": 1,
         "EventProcessedUtcTime": "2022-04-25T13:49:41.4791613Z",
         "PartitionId": 3,
         "EventEnqueuedUtcTime": "2022-04-25T13:49:12.1820000Z",
         "IoTHub": {
           "MessageId": null,
           "CorrelationId": "990407b8-4332-4cb6-a8f4-d47f304397d8",
           "ConnectionDeviceId": "sim000001",
           "ConnectionDeviceGenerationId": "637842405470327268",
           "EnqueuedTime": "2022-04-25T13:49:11.7060000Z"
         }
      }
      
  6. Taakuitvoer configureren.

    1. Selecteer op de pagina Stream Analytics-taak de optie Uitvoer>PostgreSQL-databasetoevoegen> (preview).

      Schermopname van het selecteren van PostgreSQL-database-uitvoer.

    2. Vul op de pagina Azure PostgreSQL de volgende waarden in:

      • Uitvoeralias : voer een naam in om de uitvoer van de taak te identificeren.
      • Selecteer PostgreSQL-database-instellingen handmatig opgeven en voer de volledig gekwalificeerde domeinnaam van de server, database, tabel, gebruikersnaam en wachtwoord in. Gebruik vanuit de voorbeeldgegevensset de tabel device_data.
    3. Selecteer Opslaan.

    Taakuitvoer configureren in Azure Stream Analytics.

  7. Definieer de transformatiequery.

    Transformatiequery in Azure Stream Analytics.

    1. Selecteer query in het linkermenu op de pagina Stream Analytics-taak.

    2. Voor deze zelfstudie neemt u alleen de alternatieve gebeurtenissen van IoT Hub op in Azure Cosmos DB for PostgreSQL om de totale gegevensgrootte te verminderen. Kopieer en plak de volgende query in het queryvenster:

      select
         counter,
         iothub.connectiondeviceid,
         iothub.correlationid,
         iothub.connectiondevicegenerationid,
         iothub.enqueuedtime
      from
         [src-iot-hub]
      where counter%2 = 0;
      
    3. Selecteer Query opslaan.

      Notitie

      U gebruikt de query niet alleen om een steekproef van de gegevens te nemen, maar ook om de gewenste kenmerken uit de gegevensstroom te extraheren. De optie voor aangepaste query's met Stream Analytics is handig bij het vooraf verwerken/transformeren van de gegevens voordat deze worden opgenomen in de database.

  8. Start de Stream Analytics-taak en controleer de uitvoer.

    1. Ga terug naar de pagina met het taakoverzicht en selecteer Starten.

    2. Selecteer op de pagina Taak startende optie Nu voor de begintijd van taakuitvoer en selecteer vervolgens Starten.

    3. Het duurt even voordat de taak de eerste keer wordt gestart, maar nadat deze is geactiveerd, wordt deze nog steeds uitgevoerd wanneer de gegevens binnenkomen. Na een paar minuten kunt u een query uitvoeren op het cluster om te controleren of de gegevens zijn geladen.

      citus=> SELECT * FROM public.device_data LIMIT 10;
      
       counter | connectiondeviceid |            correlationid             | connectiondevicegenerationid |         enqueuedtime
      ---------+--------------------+--------------------------------------+------------------------------+------------------------------
             2 | sim000001          | 7745c600-5663-44bc-a70b-3e249f6fc302 | 637842405470327268           | 2022-05-25T18:24:03.4600000Z
             4 | sim000001          | 389abfde-5bec-445c-a387-18c0ed7af227 | 637842405470327268           | 2022-05-25T18:24:05.4600000Z
             6 | sim000001          | 3932ce3a-4616-470d-967f-903c45f71d0f | 637842405470327268           | 2022-05-25T18:24:07.4600000Z
             8 | sim000001          | 4bd8ecb0-7ee1-4238-b034-4e03cb50f11a | 637842405470327268           | 2022-05-25T18:24:09.4600000Z
            10 | sim000001          | 26cebc68-934e-4e26-80db-e07ade3775c0 | 637842405470327268           | 2022-05-25T18:24:11.4600000Z
            12 | sim000001          | 067af85c-a01c-4da0-b208-e4d31a24a9db | 637842405470327268           | 2022-05-25T18:24:13.4600000Z
            14 | sim000001          | 740e5002-4bb9-4547-8796-9d130f73532d | 637842405470327268           | 2022-05-25T18:24:15.4600000Z
            16 | sim000001          | 343ed04f-0cc0-4189-b04a-68e300637f0e | 637842405470327268           | 2022-05-25T18:24:17.4610000Z
            18 | sim000001          | 54157941-2405-407d-9da6-f142fc8825bb | 637842405470327268           | 2022-05-25T18:24:19.4610000Z
            20 | sim000001          | 219488e5-c48a-4f04-93f6-12c11ed00a30 | 637842405470327268           | 2022-05-25T18:24:21.4610000Z
      (10 rows)
      

Notitie

De functie Verbinding testen wordt momenteel niet ondersteund voor Azure Cosmos DB for PostgreSQL en kan een fout veroorzaken, zelfs wanneer de verbinding goed werkt.

Volgende stappen

Meer informatie over het maken van een realtime dashboard met Azure Cosmos DB for PostgreSQL.