Прием данных с помощью Azure Stream Analytics в Azure Cosmos DB для PostgreSQL

ПРИМЕНИМО К: Azure Cosmos DB для PostgreSQL (на базе расширения базы данных Citus для PostgreSQL)

Azure Stream Analytics — это подсистема аналитики и обработки событий в режиме реального времени, предназначенная для обработки больших объемов быстрой потоковой передачи данных с устройств, датчиков и веб-сайтов. Эта служба также доступна в среде выполнения Azure IoT Edge, что позволяет обрабатывать данные на устройствах Интернета вещей.

Схема архитектуры Stream Analytics с Azure Cosmos DB для PostgreSQL.

Azure Cosmos DB для PostgreSQL демонстрирует рабочие нагрузки в режиме реального времени, такие как Интернет вещей. Для этих рабочих нагрузок Stream Analytics может выступать в качестве простой, производительной и масштабируемой альтернативы предварительной обработке и потоковой передаче данных из Центры событий Azure, Центр Интернета вещей Azure и Хранилище BLOB-объектов Azure в Azure Cosmos DB для PostgreSQL.

Действия по настройке Stream Analytics

Примечание

В этой статье в качестве примера источника данных используется Центр Интернета вещей Azure, но этот метод применим к любому другому источнику, поддерживаемому Stream Analytics. Кроме того, следующие демонстрационные данные поступают из симулятора телеметрии устройств Azure IoT. В этой статье не рассматривается настройка симулятора.

  1. В портал Azure разверните меню портала в левом верхнем углу и выберите Создать ресурс.

  2. В списке результатов выберите Аналитика>Задание Stream Analytics.

  3. Заполните страницу Нового задания Stream Analytics следующими сведениями:

    • Подписка — выберите подписку Azure, которую необходимо использовать для этого задания.
    • Группа ресурсов . Выберите ту же группу ресурсов, что и центр Интернета вещей.
    • Имя . Введите имя для идентификации задания Stream Analytics.
    • Регион — выберите регион Azure для размещения задания Stream Analytics. Используйте географическое расположение, ближайшее к пользователям, чтобы повысить производительность и снизить затраты на передачу данных.
    • Среда размещения. Выберите Облако для развертывания в облаке Azure или Edge для развертывания на IoT Edge устройстве.
    • Единицы потоковой передачи . Выберите количество единиц потоковой передачи для вычислительных ресурсов, необходимых для выполнения задания.
  4. Выберите Просмотр и создание и затем Создать. В правом верхнем углу должно появиться уведомление о выполнении развертывания .

    Снимок экрана: форма создания задания Stream Analytics.

  5. Настройте входные данные задания.

    Снимок экрана: настройка входных данных задания в Stream Analytics.

    1. После развертывания ресурсов перейдите к заданию Stream Analytics. Выберите Входные> данныеДобавить потоковые входные>данные Центр Интернета вещей.

    2. На странице Центр Интернета вещей укажите такие значения:

      • Псевдоним входных данных — введите имя для идентификации входных данных задания.
      • Подписка. Выберите подписку Azure, в которую имеется ваша Центр Интернета вещей учетная запись.
      • Центр Интернета вещей — выберите имя Центра Интернета вещей.
    3. Щелкните Сохранить.

    4. После добавления входного потока можно также проверить или скачать набор данных, передаваемый в. В следующем коде показаны данные для примера события:

      {
         "deviceId": "sim000001",
         "time": "2022-04-25T13:49:11.6892185Z",
         "counter": 1,
         "EventProcessedUtcTime": "2022-04-25T13:49:41.4791613Z",
         "PartitionId": 3,
         "EventEnqueuedUtcTime": "2022-04-25T13:49:12.1820000Z",
         "IoTHub": {
           "MessageId": null,
           "CorrelationId": "990407b8-4332-4cb6-a8f4-d47f304397d8",
           "ConnectionDeviceId": "sim000001",
           "ConnectionDeviceGenerationId": "637842405470327268",
           "EnqueuedTime": "2022-04-25T13:49:11.7060000Z"
         }
      }
      
  6. Настройка выходных данных задания.

    1. На странице задания Stream Analytics выберите Выходные> данныеДобавить>базу данных PostgreSQL (предварительная версия).

      Снимок экрана: выбор выходных данных базы данных PostgreSQL.

    2. На странице Azure PostgreSQL укажите следующие значения:

      • Выходной псевдоним — введите имя для идентификации выходных данных задания.
      • Выберите Указать параметры базы данных PostgreSQL вручную и введите полное доменное имя сервера, базу данных, таблицу, имя пользователя и пароль. В примере набора данных используйте таблицу device_data.
    3. Щелкните Сохранить.

    Настройте выходные данные задания в Azure Stream Analytics.

  7. Определите запрос преобразования.

    Запрос преобразования в Azure Stream Analytics.

    1. На странице задания Stream Analytics выберите Запрос в меню слева.

    2. В этом руководстве вы будете принимать только альтернативные события из Центр Интернета вещей в Azure Cosmos DB для PostgreSQL, чтобы уменьшить общий размер данных. Скопируйте и вставьте на панели запросов следующий запрос:

      select
         counter,
         iothub.connectiondeviceid,
         iothub.correlationid,
         iothub.connectiondevicegenerationid,
         iothub.enqueuedtime
      from
         [src-iot-hub]
      where counter%2 = 0;
      
    3. Выберите Сохранить запрос.

      Примечание

      Запрос используется не только для выборки данных, но и для извлечения нужных атрибутов из потока данных. Параметр настраиваемого запроса в Stream Analytics полезен при предварительной обработке или преобразовании данных перед их приемом в базу данных.

  8. Запустите задание Stream Analytics и проверьте выходные данные.

    1. Вернитесь на страницу обзора задания и нажмите кнопку Запуск.

    2. На странице Запуск задания выберите Сейчас в поле Время начала выходных данных задания, а затем нажмите кнопку Запустить.

    3. Выполнение задания занимает некоторое время, чтобы запустить его в первый раз, но после активации оно продолжает выполняться по мере поступления данных. Через несколько минут можно запросить кластер, чтобы убедиться, что данные загружены.

      citus=> SELECT * FROM public.device_data LIMIT 10;
      
       counter | connectiondeviceid |            correlationid             | connectiondevicegenerationid |         enqueuedtime
      ---------+--------------------+--------------------------------------+------------------------------+------------------------------
             2 | sim000001          | 7745c600-5663-44bc-a70b-3e249f6fc302 | 637842405470327268           | 2022-05-25T18:24:03.4600000Z
             4 | sim000001          | 389abfde-5bec-445c-a387-18c0ed7af227 | 637842405470327268           | 2022-05-25T18:24:05.4600000Z
             6 | sim000001          | 3932ce3a-4616-470d-967f-903c45f71d0f | 637842405470327268           | 2022-05-25T18:24:07.4600000Z
             8 | sim000001          | 4bd8ecb0-7ee1-4238-b034-4e03cb50f11a | 637842405470327268           | 2022-05-25T18:24:09.4600000Z
            10 | sim000001          | 26cebc68-934e-4e26-80db-e07ade3775c0 | 637842405470327268           | 2022-05-25T18:24:11.4600000Z
            12 | sim000001          | 067af85c-a01c-4da0-b208-e4d31a24a9db | 637842405470327268           | 2022-05-25T18:24:13.4600000Z
            14 | sim000001          | 740e5002-4bb9-4547-8796-9d130f73532d | 637842405470327268           | 2022-05-25T18:24:15.4600000Z
            16 | sim000001          | 343ed04f-0cc0-4189-b04a-68e300637f0e | 637842405470327268           | 2022-05-25T18:24:17.4610000Z
            18 | sim000001          | 54157941-2405-407d-9da6-f142fc8825bb | 637842405470327268           | 2022-05-25T18:24:19.4610000Z
            20 | sim000001          | 219488e5-c48a-4f04-93f6-12c11ed00a30 | 637842405470327268           | 2022-05-25T18:24:21.4610000Z
      (10 rows)
      

Примечание

Функция проверки подключения в настоящее время не поддерживается для Azure Cosmos DB для PostgreSQL и может вызвать ошибку, даже если подключение работает нормально.

Дальнейшие действия

Узнайте, как создать панель мониторинга в режиме реального времени с помощью Azure Cosmos DB для PostgreSQL.