Поделиться через


Обработка потоков данных Интернета вещей в реальном времени с использованием Azure Stream Analytics

В этой статье описывается, как создать логическую схему обработки потоков для сбора данных с устройств Интернета вещей. Вы используете реальный вариант использования Интернета вещей, чтобы продемонстрировать, как быстро и экономически создавать решение.

Необходимые компоненты

  • Создайте бесплатную подписку Azure.
  • Скачайте пример запроса и файлы данных с сайта GitHub.

Сценарий

Contoso, компания в промышленном пространстве автоматизации, автоматизирована его производственный процесс. Оборудование этой компании оснащено датчиками, которые передают потоки данных в режиме реального времени. В этом сценарии руководителю производственного участка необходимо в реальном времени получать аналитические данные с датчиков, чтобы определять шаблоны и предпринимать соответствующие действия. Для поиска интересных закономерностей во входящем потоке данных можно использовать язык запросов Stream Analytics (SAQL) для передаваемых датчиками данных.

В этом примере данные создаются с помощью устройства Texas Instruments SensorTag. Полезные данные представлены в формате JSON, как показано в следующем фрагменте кода:

{
    "time": "2016-01-26T20:47:53.0000000",  
    "dspl": "sensorE",  
    "temp": 123,  
    "hmdt": 34  
}  

В реальном сценарии у вас могут быть сотни таких датчиков, создающих события в виде потока. В идеале необходимо устройство шлюза, выполняющее код для отправки этих событий в Центры событий Azure или центры Интернета вещей Azure. Задание Stream Analytics будет прием этих событий из Центров событий или Центр Интернета вещей и выполнение запросов аналитики в режиме реального времени для потоков. Затем результаты можно отправить в поддерживаемое место назначения выходных данных.

Для удобства в этом руководстве по началу работы приведен пример файла данных, полученных с реальных устройств SensorTag. На основе этого примера можно выполнять различные запросы и просматривать их результаты. В последующих руководствах вы узнаете, как подключить задание к входным и выходным данным и развернуть их в службе Azure.

Создание задания Stream Analytics

  1. Перейдите на портал Azure.

  2. В меню навигации слева выберите "Все службы", выберите "Аналитика", наведите указатель мыши на задания Stream Analytics и нажмите кнопку "Создать".

    Снимок экрана: выбор кнопки

  3. На странице Новое задание New Stream Analytics сделайте следующее:

    1. Для подписки выберите подписку Azure.

    2. Для группы ресурсов выберите существующую группу ресурсов или создайте группу ресурсов.

    3. В поле "Имя" введите уникальное имя задания Stream Analytics.

    4. Выберите регион, в котором нужно развернуть задание Stream Analytics. Используйте то же расположение для группы ресурсов и всех ресурсов, чтобы увеличить скорость обработки и сократить затраты.

    5. Выберите Review + create (Просмотреть и создать).

      Снимок экрана: страница задания New Stream Analytics.

  4. На странице Проверить и создать проверьте параметры и нажмите кнопку Создать.

  5. После успешного развертывания выберите "Перейти к ресурсу ", чтобы перейти на страницу задания Stream Analytics для задания Stream Analytics.

Создание запроса Azure Stream Analytics

После создания задания напишите запрос. Вы можете проверить запросы на демонстрационных данных, не подключая к заданию входные или выходные данные.

  1. Скачайте файл HelloWorldASA-InputStream.json с сайта GitHub.

  2. На странице задания Azure Stream Analytics в портал Azure в меню слева выберите "Запрос" в разделе "Топология заданий".

  3. В окне запроса введите следующий запрос.

    SELECT
        *
    INTO
        youroutputalias
    FROM
        yourinputalias
    
  4. В нижней области выберите " Отправить пример входных данных", выберите скачанный HelloWorldASA-InputStream.json файл и нажмите кнопку "ОК".

    Снимок экрана: страница **Запрос** с выбранным параметром **Отправить образец входных данных** .

  5. Обратите внимание на предварительный просмотр данных, автоматически заполняющих таблицу Предпросмотр ввода.

    Снимок экрана: образец входных данных на вкладке предварительного просмотра входных данных.

Запрос: архивация необработанных данных

Запрос к серверу — это самая простая форма запроса, который архивирует все входные данные в место назначения выходных данных. Это запрос по умолчанию, заполнивший новое задание Azure Stream Analytics.

  1. Выберите тестовый запрос на панели инструментов.

  2. Просмотрите результаты на вкладке "Результаты теста" на нижней панели.

    Снимок экрана: пример запроса и его результаты.

Запрос: фильтрация данных по условию

Давайте обновим запрос, чтобы отфильтровать результаты на основе условия. Например, в следующем запросе показаны события, поступающие из sensorA."

  1. Обновите запрос со следующим примером:

    SELECT 
        time,
        dspl AS SensorName,
        temp AS Temperature,
        hmdt AS Humidity
    INTO
       youroutputalias
    FROM
        yourinputalias
    WHERE dspl='sensorA'
    
  2. Выберите тестовый запрос , чтобы просмотреть результаты запроса.

    Снимок экрана: результаты запроса с фильтром.

Запрос: оповещение для активации рабочего бизнес-процесса

Теперь давайте детализируем наш запрос. Если требуется отслеживать среднее значение температуры за 30-секундный промежуток времени и отображать результаты только в том случае, если это значение превышает 100 градусов,

  1. Обновите запрос, чтобы:

    SELECT 
        System.Timestamp AS OutputTime,
        dspl AS SensorName,
        Avg(temp) AS AvgTemperature
    INTO
       youroutputalias
    FROM
        yourinputalias TIMESTAMP BY time
    GROUP BY TumblingWindow(second,30),dspl
    HAVING Avg(temp)>100
    
  2. Выберите тестовый запрос , чтобы просмотреть результаты запроса.

    Снимок экрана: запрос с переворачивающимся окном.

    Теперь результаты должны содержать всего 245 строк. В них указаны датчики, для которых среднее значение температуры превышает 100. В этом запросе поток событий сгруппирован по свойству dspl, представляющему собой имя датчика, в "переворачивающемся" окне длительностью 30 секунд. Во время создания таких временных запросов важно задать способ учета отметок времени. С помощью предложения TIMESTAMP BY вы настроили использование столбца OUTPUTTIME, чтобы связать время со всеми временными вычислениями. Дополнительные сведения см. в статьях об управлении временем и функциях для работы с окнами.

Запрос: обнаружение отсутствия событий

Как написать запрос, чтобы обнаружить отсутствие входящих событий? Давайте найдем последний раз, когда датчик отправил данные, а затем не отправлял события в течение следующих 5 секунд.

  1. Обновите запрос, чтобы:

    SELECT 
        t1.time,
        t1.dspl AS SensorName
    INTO
       youroutputalias
    FROM
        yourinputalias t1 TIMESTAMP BY time
    LEFT OUTER JOIN yourinputalias t2 TIMESTAMP BY time
    ON
        t1.dspl=t2.dspl AND
        DATEDIFF(second,t1,t2) BETWEEN 1 and 5
    WHERE t2.dspl IS NULL
    
  2. Выберите тестовый запрос , чтобы просмотреть результаты запроса.

    Снимок экрана: запрос, который обнаруживает отсутствие событий.

    Здесь используется LEFT OUTER JOIN (левое внешнее соединение) для одного и того же потока данных (самосоединение). При внутреннем соединении результат возвращается, только если обнаружено совпадение. Но если событие с левой стороны соединения не сопоставлено при использовании левого внешнего соединения, для всех столбцов справа возвращается строка со значением NULL. Этот метод полезен для поиска отсутствия событий. Дополнительные сведения см. в описании операции JOIN.

Заключение

Цель этой статьи — показать, как писать различные запросы на языке запросов Stream Analytics и просматривать результаты в браузере. Тем не менее, эта статья заключается только в том, чтобы начать работу. Stream Analytics поддерживает различные входные и выходные данные и даже может использовать функции в Машинное обучение Azure, чтобы сделать его надежным инструментом для анализа потоков данных. Дополнительные сведения о написании запросов см. в статье Примеры запросов для распространенных шаблонов использования Stream Analytics.