Создание решения IoT с помощью Stream Analytics

Введение

В этой статье содержатся инструкции по анализу данных в режиме реального времени с использованием Azure Stream Analytics. Разработчики могут легко объединять потоки данных, такие как сведения о посещении, данные журналов и созданные устройствами события, с историческими записями или справочными данными для получения бизнес-информации. Являясь полностью управляемой службой потоковой обработки в режиме реального времени, которая размещена на платформе Microsoft Azure, Azure Stream Analytics обеспечивает высокую надежность, малую задержку и масштабируемую обработку, что помогает приступить к работе за считаные минуты.

После завершения этого решения вы сможете:

  • Изучить портал Azure Stream Analytics.
  • Настроить и развернуть задание потоковой передачи.
  • Сформулировать реальные проблемы и устранить их с помощью языка запросов Stream Analytics.
  • Уверенно разрабатывать решения потоковой передачи для клиентов с помощью Stream Analytics.
  • Использовать процедуры мониторинга и ведения журнала для устранения неполадок.

Предварительные требования

Для создания описанного здесь решения вам потребуется:

Общие сведения о сценарии "Hello, Toll!"

Станции сбора дорожной платы представляют собой распространенное явление. Они встречаются на многих скоростных дорогах, мостах и туннелях по всему миру. Каждая станция имеет несколько пунктов сбора платы. В пунктах, работающих в ручном режиме, водитель останавливается и передает деньги служащему. В пунктах, работающих в автоматическом режиме, размещенный на крыше пункта датчик сканирует RFID-карту, прикрепленную на ветровом стекле автомобиля, во время его проезда через пункт. Легко визуализировать прохождение транспортных средств через эти платные станции в качестве потока событий, по которому могут выполняться интересные операции.

Изображение автомобилей в пунктах сбора платы

Входящие данные

В этом решении используется два потока данных. Датчики, установленные на въездах и выездах станций сбора платы, создают первый поток. Второй поток — это статический набор данных, содержащий данные регистрации транспортного средства.

Входной поток данных

Входной поток данных содержит сведения об автомобилях, въезжающих в пункты сбора платы. События выходных данных передаются в реальном времени в концентратор событий из веб-приложения, включенного в пример приложения.

| TollID | EntryTime | LicensePlate | State | Make | Model | VehicleType | VehicleWeight | Toll | Tag |
| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- |
| 1 |2014-09-10 12:01:00.000 |JNB 7001 |NY |Honda |CRV |1 |0 |7 | |
| 1 |2014-09-10 12:02:00.000 |YXZ 1001 |NY |Toyota |Camry |1 |0 |4 |123456789 |
| 3 |2014-09-10 12:02:00.000 |ABC 1004 |CT |Ford |Taurus |1 |0 |5 |456789123 |
| 2 |2014-09-10 12:03:00.000 |XYZ 1003 |CT |Toyota |Corolla |1 |0 |4 | |
| 1 |2014-09-10 12:03:00.000 |BNJ 1007 |NY |Honda |CRV |1 |0 |5 |789123456 |
| 2 |2014-09-10 12:05:00.000 |CDE 1007 |NJ |Toyota |4x4 |1 |0 |6 |321987654 |

Ниже приведено краткое описание столбцов.

Столбец Описание
ИД пункта сбора Уникальный идентификатор пункта сбора платы
Время въезда Дата и время въезда транспортного средства в пункт сбора платы в формате UTC
LicensePlate Номерной знак транспортного средства
Состояние Штат в США
Производитель Изготовитель транспортного средства
Моделирование Номер модели транспортного средства
Тип транспортного средства 1 — для пассажирского транспорта, 2 — для коммерческого транспорта
Тип веса Вес транспортного средства в тоннах. 0 — для пассажирского транспорта.
Плата Значение платы в долларах США
Тег Электронная метка на автомобиле для автоматической оплаты. Пустая метка, если оплата осуществляется вручную

Выходной поток данных

Выходной поток данных содержит сведения об автомобилях, выезжающих из станций сбора платы. События выходных данных передаются в реальном времени в концентратор событий из веб-приложения, включенного в пример приложения.

ИД пункта сбора Время выезда LicensePlate
1 10.09.2014T12:03:00.0000000Z JNB 7001
1 10.09.2014T12:03:00.0000000Z YXZ 1001
3 10.09.2014T12:04:00.0000000Z ABC 1004
2 10.09.2014T12:07:00.0000000Z XYZ 1003
1 10.09.2014T12:08:00.0000000Z BNJ 1007
2 10.09.2014T12:07:00.0000000Z CDE 1007

Ниже приведено краткое описание столбцов.

Столбец Описание
ИД пункта сбора Уникальный идентификатор пункта сбора платы
Время выезда Дата и время выезда транспортного средства с пункта сбора платы в формате UTC
LicensePlate Номерной знак транспортного средства

Данные регистрации коммерческого транспортного средства

В решении используется статический снимок базы данных регистрации коммерческого транспортного средства. Эти данные сохраняются в виде JSON-файла в хранилище BLOB-объектов Azure, включенном в пример.

LicensePlate ИД регистрации Срок действия истек
SVT 6023 285429838 1
XLZ 3463 362715656 0
BAC 1005 876133137 1
RIV 8632 992711956 0
SNY 7188 592133890 0
ELH 9896 678427724 1

Ниже приведено краткое описание столбцов.

Столбец Описание
LicensePlate Номерной знак транспортного средства
ИД регистрации Идентификатор регистрации транспортного средства
Срок действия истек Состояние регистрации транспортного средства: 0 — если регистрация транспортного средства действительна. 1 — если срок регистрации истек

Настройка среды для Azure Stream Analytics

Для создания описанного здесь решения требуется подписка Microsoft Azure. Если у вас нет учетной записи Azure, вы можете запросить бесплатную пробную версию.

Обязательно выполните шаги в разделе "Очистка учетной записи Azure" в конце этой статьи, чтобы максимально эффективно использовать деньги на счете в Azure.

Развертывание примера

Существуют некоторые ресурсы, которые можно легко развернуть в группе ресурсов с помощью нескольких действий. Определение решения см. в этом репозитории GitHub: https://github.com/Azure/azure-stream-analytics/tree/master/Samples/TollApp.

Развертывание шаблона TollApp на портале Azure

  1. Чтобы развернуть среду TollApp в Azure, перейдите по этой ссылке.

  2. Если будет предложено, войдите на портал Azure.

  3. Выберите подписку, в которой взимается плата за различные ресурсы.

  4. Укажите новую группу ресурсов с уникальным именем, например MyTollBooth.

  5. Выберите расположение Azure.

  6. Укажите для значения интервала количество секунд. Это значение используется в примере веб-приложения для частоты отправки данных в концентратор событий.

  7. Установите флажок, чтобы принять условия.

  8. Выберите Закрепить на панели мониторинга, чтобы позже можно было легко найти ресурсы.

  9. Выберите Приобрести, чтобы развернуть этот шаблон.

  10. Через некоторое время появится уведомление Развертывание прошло успешно.

Просмотр ресурсов TollApp в Azure Stream Analytics

  1. Войдите на портал Azure.

  2. Найдите группу ресурсов, которую вы называли в предыдущем разделе.

  3. Убедитесь, что в группе ресурсов находятся следующие ресурсы:

    • Одна учетная запись Azure Cosmos DB
    • одно задание Azure Stream Analytics;
    • одна учетная запись хранения Azure;
    • Один концентратор событий Azure
    • два веб-приложения.

Изучение примера задания TollApp

  1. Начиная с группы ресурсов в предыдущем разделе выберите задание потоковой передачи Stream Analytics, начиная с имени tollapp (имя содержит случайные символы для уникальности).

  2. На странице обзора задания в поле Запрос просмотрите синтаксис запроса.

    SELECT TollId, System.Timestamp AS WindowEnd, COUNT(*) AS Count
    INTO CosmosDB
    FROM EntryStream TIMESTAMP BY EntryTime
    GROUP BY TUMBLINGWINDOW(minute, 3), TollId
    

    Чтобы перефразировать цель запроса, допустим, что вам нужно подсчитать количество автомобилей, въезжающих в пункт сбора платы. Так как в пункт сбора платы поступает непрерывный поток транспортных средств, входные события аналогичны потоку, который никогда не останавливается. Чтобы количественно оценить поток, необходимо определить период времени для измерения. Давайте уточним вопрос: "Сколько автомобилей въезжает в пункт сбора платы каждые три минуты?" Этот период часто называют переворачивающимся.

    Как видите, Azure Stream Analytics использует язык запросов, такой же как SQL, и добавляет несколько расширений для указания аспектов запроса, связанных со временем. См. дополнительные сведения о конструкциях управления временем и оконных расширениях, используемых в этом запросе.

  3. Проверьте входные данные примера задания TollApp. В текущем запросе используются только входные данные EntryStream.

    • Входные данные EntryStream — это подключение концентратора событий, которое помещает в очередь данные, представляющие каждый раз, когда автомобиль въедет в платную машину на шоссе. Веб-приложение, которое является частью примера, создает события, и эти данные помещается в очередь в этом концентраторе событий. Обратите внимание, что эти входные данные запрашиваются в предложении FROM запроса потоковой передачи.
    • Входные данные ExitStream — это подключение концентратора событий, которое помещает в очередь данные, представляющие каждый раз, когда автомобиль выходит из платной платы на шоссе. Эти потоковые входные данные используются в вариантах синтаксиса запроса, которые мы рассмотрим позже.
    • Входные данные регистрации — это подключение хранилища BLOB-объектов Azure, указывающее на статистический файл registration.json, при необходимости используемый для поиска. Входные эталонные данные используются в вариантах синтаксиса запроса, которые мы рассмотрим позже.
  4. Просмотрите выходные данные примера задания TollApp.

    • Выходные данные Azure Cosmos DB — это контейнер базы данных Azure Cosmos DB, который получает события приемника выходных данных. Обратите внимание, что эти выходные данные используются в предложении INTO запроса потоковой передачи.

Запуск задания потовой передачи TollApp

Чтобы запустить задание потоковой передачи, выполните следующие действия:

  1. На странице обзора задания выберите Запустить.

  2. На панели Запуск задания выберите Сейчас.

  3. Через несколько минут, когда задание уже будет выполняться, на странице обзора задания потоковой передачи просмотрите график Мониторинг. На графике будет отображаться несколько тысяч входных событий и десятки выходных событий.

Просмотр выходных данных Azure Cosmos DB

  1. Найдите группу ресурсов, в которой содержатся ресурсы TollApp.

  2. Выберите учетную запись Azure Cosmos DB с таким шаблоном имени: tollapp<random>-cosmos.

  3. Выберите заголовок Обозреватель данных, чтобы открыть страницу обозревателя данных.

  4. Разверните tollAppDatabase>tollAppCollection>Документы.

  5. В списке идентификаторов отображается несколько документов, как только выходные данные будут доступны.

  6. Выберите каждый идентификатор, чтобы просмотреть документ JSON. Обратите внимание на каждую tollid, windowend timeи из count of cars этого окна.

  7. Через еще три минуты будет доступен еще один набор из четырех документов, по одному документу на tollid.

Получение общего времени, затрачиваемого каждым автомобилем

Среднее время, затрачиваемое на проезд через пункт, помогает оценить эффективность процесса и условий работы для клиентов.

Чтобы определить общее время, объедините поток EntryTime и ExitTime. Объедините два входных потока с одинаковыми столбцами TollId и LicencePlate. Для использования оператора JOIN требуется указать запас, описывающий допустимую разницу во времени между объединенными событиями. С помощью функции DATEDIFF укажите, что события должны происходить с интервалом не более 15 минут. Кроме того, примените функцию DATEDIFF к значениям времени въезда и выезда для вычисления фактического времени, проводимого автомобилем на станции сбора платы. Обратите внимание на различия использования DATEDIFF при применении в инструкции SELECT по сравнению с условием JOIN.

SELECT EntryStream.TollId, EntryStream.EntryTime, ExitStream.ExitTime, EntryStream.LicensePlate, DATEDIFF (minute, EntryStream.EntryTime, ExitStream.ExitTime) AS DurationInMinutes
INTO CosmosDB
FROM EntryStream TIMESTAMP BY EntryTime
JOIN ExitStream TIMESTAMP BY ExitTime
ON (EntryStream.TollId= ExitStream.TollId AND EntryStream.LicensePlate = ExitStream.LicensePlate)
AND DATEDIFF (minute, EntryStream, ExitStream ) BETWEEN 0 AND 15

Чтобы обновить синтаксис запроса задания потовой передачи TollApp, сделайте следующее:

  1. На странице обзора задания выберите Остановить.

  2. Через некоторое время появится уведомление о том, что задание остановлено.

  3. В разделе заголовка "Топология задания" выберите <> Запрос.

  4. Вставьте настроенный SQL-запрос потоковой передачи.

  5. Выберите Сохранить, чтобы сохранить запрос. Щелкните Да, чтобы сохранить изменения.

  6. На странице обзора задания выберите Запустить.

  7. На панели Запуск задания выберите Сейчас.

Просмотр общего времени в выходных данных

Повторите действия, описанные в предыдущем разделе, чтобы просмотреть выходные данные Azure Cosmos DB из задания потоковой передачи. Просмотрите последние JSON-документы.

Например, в этом документе показан пример автомобиля с определенным номерным знаком, entrytime а exit timeтакже вычисляемое durationinminutes поле DATEDIFF, показывающее длительность стенда оплаты в виде двух минут:

{
    "tollid": 4,
    "entrytime": "2018-04-05T06:51:39.0491173Z",
    "exittime": "2018-04-05T06:53:09.0491173Z",
    "licenseplate": "JVR 9425",
    "durationinminutes": 2,
    "id": "ff52eb25-d580-7566-2879-1f52bba6601e",
    "_rid": "+8E4AI1DZgBjAAAAAAAAAA==",
    "_self": "dbs/+8E4AA==/colls/+8E4AI1DZgA=/docs/+8E4AI1DZgBjAAAAAAAAAA==/",
    "_etag": "\"ad02f6b8-0000-0000-0000-5ac5c8330000\"",
    "_attachments": "attachments/",
    "_ts": 1522911283
}

Получение сведений о транспортных средствах с истекшим сроком действия регистрации

Azure Stream Analytics может использовать статические снимки эталонных данных для объединения с потоками временных данных. Чтобы продемонстрировать эту возможность, можно использовать следующий пример вопроса. Входные данные регистрации представляют собой статический JSON-файл большого двоичного объекта, в котором перечислены сведения об истечении срока действия лицензии. Объединившись по номерному знаку, эталонные данные сравниваются с каждым транспортным средством, проходящим через пункт оплаты.

Если коммерческое транспортное средство зарегистрировано в компании автодорожных сборов, оно может проезжать через пункт оплаты без остановки для проверки. Для выявления всех коммерческих транспортных средств с истекшим сроком действия регистрации используйте таблицу подстановки регистрации.

SELECT EntryStream.EntryTime, EntryStream.LicensePlate, EntryStream.TollId, Registration.RegistrationId
INTO CosmosDB
FROM EntryStream TIMESTAMP BY EntryTime
JOIN Registration
ON EntryStream.LicensePlate = Registration.LicensePlate
WHERE Registration.Expired = '1'
  1. Повторите шаги из предыдущего раздела, чтобы обновить синтаксис запроса задания потоковой передачи TollApp.

  2. Повторите действия, описанные в предыдущем разделе, чтобы просмотреть выходные данные Azure Cosmos DB из задания потоковой передачи.

Выходные данные примера:

    {
        "entrytime": "2018-04-05T08:01:28.0252168Z",
        "licenseplate": "GMT 3221",
        "tollid": 1,
        "registrationid": "763220582",
        "id": "47db0535-9716-4eb2-db58-de7886966cbf",
        "_rid": "y+F8AJ9QWACSAQAAAAAAAA==",
        "_self": "dbs/y+F8AA==/colls/y+F8AJ9QWAA=/docs/y+F8AJ9QWACSAQAAAAAAAA==/",
        "_etag": "\"88007d8d-0000-0000-0000-5ac5d7e20000\"",
        "_attachments": "attachments/",
        "_ts": 1522915298
    }

Масштабирование задания

Azure Stream Analytics поддерживает масштабируемость, что позволяет обрабатывать большие объемы данных. Запрос Azure Stream Analytics может с помощью предложения PARTITION BY сообщить системе, что этот шаг горизонтально увеличивает масштаб. Система добавляет специальный столбец PartitionId для сопоставления с входными данными (из концентратора событий) по идентификатору секции.

Чтобы масштабировать запрос, разделив его на разделы, измените синтаксис запроса на следующий код:

SELECT TollId, System.Timestamp AS WindowEnd, COUNT(*)AS Count
INTO CosmosDB
FROM EntryStream
TIMESTAMP BY EntryTime
PARTITION BY PartitionId
GROUP BY TUMBLINGWINDOW(minute,3), TollId, PartitionId

Чтобы увеличить количество единиц потоковой передачи задания, сделайте следующее:

  1. Остановите текущее задание.

  2. Обновите синтаксис запроса на странице <> Запрос и сохраните изменения.

  3. Под заголовком CONFIGURE задания потоковой передачи выберите Масштабировать.

  4. Перетяните ползунок Единицы потоковой передачи с 1 до 6. Единицы потоковой передачи определяют объем вычислительной мощности, которую может получать задание. Нажмите кнопку Сохранить.

  5. Запустите задание потоковой передачи, в котором применяется дополнительное масштабирование. Azure Stream Analytics распределяет работу между большим объемом вычислительных ресурсов и обеспечивает лучшую пропускную способность, секционируя работу между ресурсами с помощью столбца, указанного в предложении PARTITION BY.

Отслеживание задания

Область Мониторинг содержит статистические данные по выполнению задания. Выполните первоначальную настройку, чтобы использовать учетную запись хранения в том же регионе (задайте имя toll, как в остальной части этой статьи).

Мониторинг заданий Azure Stream Analytics

Доступ к журналам действий можно также получить на панели мониторинга заданий в области Параметры.

Удаление ресурсов TollApp

  1. Остановите выполнение задания Stream Analytics на портале Azure.

  2. Найдите группу ресурсов, содержащую восемь ресурсов, связанных с шаблоном TollApp.

  3. Выберите Удалить группу ресурсов. Введите имя группы ресурсов, чтобы подтвердить удаление.

Заключение

В этой статье приведены общие сведения о службе Azure Stream Analytics. Вы узнали, как настроить входные и выходные данные для задания Stream Analytics. С помощью сценария с платными данными в решении описаны распространенные типы проблем, возникающих в пространстве перемещаемых данных, и способы их решения с помощью простых SQL-запросов в Azure Stream Analytics. Здесь описаны конструкции расширений SQL для работы с временными данными. Здесь также рассмотрено объединение потоков данных, их дополнение статическими справочными данными, а также масштабирование запроса для повышения пропускной способности.

Хотя это решение является хорошим введением, оно ни в коем случае не является полным. Дополнительные шаблоны запросов на языке SQL можно найти в статье Примеры запросов для распространенных шаблонов использования Stream Analytics.