Потоковые данные в качестве входных данных Stream Analytics
Stream Analytics имеет первоклассную интеграцию с потоками данных Azure в качестве входных данных из четырех видов ресурсов:
- Центры событий Azure
- Центр Интернета вещей Azure
- Хранилище BLOB-объектов Azure
- Azure Data Lake Storage 2-го поколения
Эти ресурсы входных данных могут существовать в той же подписке Azure, что и задание Stream Analytics, или другой подписке.
Сжатие
Stream Analytics поддерживает сжатие для всех источников входных данных. Поддерживаемые типы сжатия: None, Gzip и Deflate. Поддержка сжатия недоступна для эталонных данных. Если входные данные сжимаются, Stream Analytics прозрачно обрабатывает данные Avro. Вам не нужно указывать тип сжатия с сериализацией Avro.
Создание, изменение или проверка входных данных
С помощью портала Azure, Visual Studio и Visual Studio Code можно добавлять входные данные и просматривать или изменять их в задании потоковой передачи. Вы также можете протестировать входные подключения и тестовые запросы из примеров данных из портал Azure, Visual Studio и Visual Studio Code. При написании запроса входные данные указываются в предложении FROM. Список доступных входных данных можно получить на странице Запрос на портале. Если вы хотите использовать несколько входных данных, JOIN
их или написать несколько SELECT
запросов.
Примечание.
Настоятельно рекомендуется использовать средства Stream Analytics для Visual Studio Code для лучшей локальной разработки. У средств Stream Analytics для Visual Studio 2019 (версия 2.6.3000.0) имеются известные функциональные пробелы, и они не будут улучшаться в дальнейшем.
Потоковая передача данных из Центров событий
Центры событий Azure — это высокомасштабируемое событие публикации и подписки на событие ingestor. Концентратор событий может обрабатывать миллионы событий в секунду, позволяя вам обрабатывать и анализировать огромное количество данных, создаваемых подключенными устройствами и приложениями. Вместе центры событий и Stream Analytics могут предоставлять комплексное решение для аналитики в режиме реального времени. Центры событий позволяют передавать события в Azure в режиме реального времени, где задания Stream Analytics также обрабатывают эти события в режиме реального времени. Например, в Центры событий можно отправлять сведения о щелчках, показания датчиков или журналы сетевых событий. Затем можно создать задания Stream Analytics для использования центров событий для входных данных для фильтрации, агрегирования и корреляции в режиме реального времени.
EventEnqueuedUtcTime
— это метка времени поступления события в концентратор, которая также является меткой времени по умолчанию для событий, поступающих из Центров событий в Stream Analytics. Для обработки данных как потока с помощью метки времени в полезных данных события необходимо использовать ключевое слово TIMESTAMP BY.
Группы потребителей Центров событий
Необходимо настроить входные данные каждого концентратора событий, чтобы иметь собственную группу потребителей. Если задание содержит самосоединение или несколько источников входных данных, некоторые входные данные могут последовательно считываться несколькими модулями чтения. Эта ситуация влияет на количество модулей чтения в группе получателей. Чтобы не превысить лимит на количество модулей чтения для Центров событий (5 на каждую группу получателей в разделе), рекомендуется назначить группу получателей для каждого задания Stream Analytics. Существует также ограничение в 20 групп потребителей для концентратора событий уровня "Стандартный". Дополнительные сведения см. в разделе об устранении неполадок входных данных Azure Stream Analytics.
Создание входных данных из Центров событий
В следующей таблице описываются все параметры на странице Новые входные данные на портале Azure для передачи потока входных данных из концентратора событий.
Свойство | Description |
---|---|
Псевдоним входных данных | Понятное имя, используемое в запросах задания для ссылки на эти входные данные. |
Подписка | Выберите подписку Azure, в которой существует ресурс концентратора событий. |
Пространство имен концентратора событий | Пространство имен Центров событий — это контейнер для центров событий. При создании концентратора событий также создается пространство имен. |
Имя концентратора событий | Имя концентратора событий для использования в качестве источника входных данных. |
Группа получателей концентратора событий (рекомендуется) | Для каждого задания Stream Analytics рекомендуется использовать отдельную группу получателей. Эта строка указывает группу получателей, принимающих данные из концентратора событий. Если группа потребителей не указана, задание Stream Analytics использует группу потребителей $Default . |
Режим проверки подлинности | Укажите тип проверки подлинности, которую вы хотите использовать для подключения к концентратору событий. Для проверки подлинности в концентраторе событий можно использовать строка подключения или управляемое удостоверение. Для параметра управляемого удостоверения можно создать управляемое удостоверение, назначаемое системой, для задания Stream Analytics или управляемого удостоверения, назначаемого пользователем, для проверки подлинности в концентраторе событий. При использовании управляемого удостоверения управляемое удостоверение должно быть членом ролей Центры событий Azure приемника данных или Центры событий Azure роли владельца данных. |
Имя политики концентратора событий | Политика общего доступа, которая предоставляет доступ к Центрам событий. Каждой политике общего доступа присваивается имя, а также для нее задаются разрешения и ключи доступа. Этот параметр заполняется автоматически, если не выбран вариант указания параметров Центров событий вручную. |
Ключ секции | Это необязательное поле, доступное только в том случае, если задание настроено на использование уровня совместимости 1.2 или более поздней версии. Если входные данные секционируются по свойству, вы можете добавить сюда имя этого свойства. Он используется для повышения производительности запроса, если он включает PARTITION BY в себя или GROUP BY предложение для этого свойства. Если это задание использует уровень совместимости 1.2 или выше, это поле по умолчанию используется PartitionId. |
Формат сериализации событий | Формат сериализации (JSON, CSV, Avro, Parquet или Other (Protobuf, XML, собственная...)) входящего потока данных. Убедитесь, что формат JSON совпадает со спецификацией и не содержит ведущий "0" в десятичных числах. |
Кодирование | Сейчас UTF-8 — единственный поддерживаемый формат кодировки. |
Тип сжатия событий | Тип сжатия, используемый для чтения входящего потока данных, например Нет (по умолчанию), Gzip или Deflate. |
Реестр схем (предварительная версия) | Вы можете выбрать реестр схем с схемами для данных событий, полученных из концентратора событий. |
Когда данные поступают из входного потока Центров событий, у вас есть доступ к следующим полям метаданных в запросе Stream Analytics:
Свойство | Description |
---|---|
EventProcessedUtcTime | Дата и время обработки события Stream Analytics. |
EventEnqueuedUtcTime | Дата и время получения событий Центрами событий. |
PartitionId | Идентификатор секции для входного адаптера (нумерация идет от нуля). |
Например, используя эти поля, можно писать запросы, как в следующем примере:
SELECT
EventProcessedUtcTime,
EventEnqueuedUtcTime,
PartitionId
FROM Input
Примечание.
При использовании Центров событий в качестве конечной точки для маршрутов Центр Интернета вещей можно получить доступ к метаданным Центр Интернета вещей с помощью функции GetMetadataPropertyValue.
Потоковая передача данных из Центра Интернета вещей
Центр Интернета вещей — это высокомасштабируемая служба приема данных о событиях публикации и подписки, оптимизированная под сценарии "Интернет вещей".
По умолчанию метка времени событий, поступающих из Центра Интернета вещей в Stream Analytics, — это метка времени поступления события в концентратор Центра Интернета вещей, то есть EventEnqueuedUtcTime
. Для обработки данных как потока с помощью метки времени в полезных данных события необходимо использовать ключевое слово TIMESTAMP BY.
Группа потребителей Центра Интернета вещей
Каждый Центр Интернета вещей Stream Analytics нужно настроить таким образом, чтобы у него была собственная группа получателей. Если задание включает самосоединение или несколько источников входных данных, некоторые входные данные могут последовательно считываться несколькими модулями чтения. Эта ситуация влияет на количество модулей чтения в группе получателей. Чтобы не превысить лимит на количество модулей чтения для Центра Интернета вещей (5 на каждую группу получателей в разделе), рекомендуется назначить группу получателей для каждого задания Stream Analytics.
Настройка Центра Интернета вещей в качестве входного потока данных
В следующей таблице описываются все параметры на странице Новые входные данные на портале Azure при настройке Центра Интернета вещей в качестве потокового входа.
Свойство | Description |
---|---|
Псевдоним входных данных | Понятное имя, используемое в запросах задания для ссылки на эти входные данные. |
Подписка | Выберите подписку, в которой существуют ресурсы Центра Интернета вещей. |
Центр IoT | Имя Центра Интернета вещей для использования в качестве источника входных данных. |
Группа потребителей | Для каждого задания Stream Analytics рекомендуется использовать отдельную группу получателей. Группа получателей используется для принимающих данных из Центра Интернета вещей. Stream Analytics использует группу получателей "$Default", если не указано иное. |
Имя политики общего доступа | Политика общего доступа, которая предоставляет доступ к Центру Интернета вещей. Каждой политике общего доступа присваивается имя, а также для нее задаются разрешения и ключи доступа. |
Ключ политики общего доступа | Ключ общего доступа, используемый для авторизации доступа к Центру Интернета вещей. Этот параметр автоматически заполняется, если только не будет указан вариант ручной настройки параметров Центра Интернета вещей. |
Конечная точка | Конечная точка для Центра Интернета вещей. |
Ключ секции | Это необязательное поле, доступное только в том случае, если задание настроено на использование уровня совместимости 1.2 или более поздней версии. Если входные данные секционируются по свойству, вы можете добавить сюда имя этого свойства. Он используется для повышения производительности запроса, если он включает предложение PARTITION BY или GROUP BY для этого свойства. Если это задание использует уровень совместимости 1.2 или более поздней версии, это поле по умолчанию имеет значение PartitionId. |
Формат сериализации событий | Формат сериализации (JSON, CSV, Avro, Parquet или Other (Protobuf, XML, собственная...)) входящего потока данных. Убедитесь, что формат JSON совпадает со спецификацией и не содержит ведущий "0" в десятичных числах. |
Кодирование | Сейчас UTF-8 — единственный поддерживаемый формат кодировки. |
Тип сжатия событий | Тип сжатия, используемый для чтения входящего потока данных, например Нет (по умолчанию), Gzip или Deflate. |
При использовании потоковой передачи данных из Центра Интернета вещей запрос Stream Analytics может получить доступ к следующим полям метаданных:
Свойство | Description |
---|---|
EventProcessedUtcTime | Дата и время обработки события. |
EventEnqueuedUtcTime | Дата и время получения события Центр Интернета вещей. |
PartitionId | Идентификатор секции для входного адаптера (нумерация идет от нуля). |
IoTHub.MessageId | Идентификатор, используемый для корреляции двустороннего обмена данными в Центре Интернета вещей. |
IoTHub.CorrelationId | Идентификатор, используемый в ответах на сообщение и отзывах в Центре Интернета вещей. |
IoTHub.ConnectionDeviceId | Идентификатор проверки подлинности, используемый для отправки этого сообщения. Это значение помечено на сообщениях, привязанных к службе, Центр Интернета вещей. |
IoTHub.ConnectionDeviceGenerationId | Идентификатор создания устройства, прошедшего проверку подлинности, используемый для отправки этого сообщения. Это значение помещается в сообщения, связанные со службой, Центром Интернета вещей. |
IoTHub.EnqueuedTime | Время, когда Центр Интернета вещей получает сообщение. |
Потоковая передача данных из Хранилища BLOB-объектов или Data Lake Storage 2-го поколения
В сценариях с большим количеством неструктурированных данных для хранения в облаке хранилище BLOB-объектов Azure или Azure Data Lake Storage 2-го поколения предлагает экономичное и масштабируемое решение. Данные в хранилище BLOB-объектов или Azure Data Lake Storage 2-го поколения считаются неактивных данных. Однако эти данные можно обрабатывать в виде потока данных Stream Analytics.
Обработка журналов — типичный сценарий для обработки входных данных такого рода с помощью Stream Analytics. В этом сценарии файлы данных телеметрии записываются из системы и должны быть проанализированы и обработаны для извлечения значимых данных.
Метка времени по умолчанию хранилища BLOB-объектов или события Azure Data Lake Storage 2-го поколения в Stream Analytics — это метка времени последнего изменения, т. еBlobLastModifiedUtcTime
. Если большой двоичный объект передается в учетную запись хранения в 13:00, а задание Azure Stream Analytics запускается с параметром Сейчас в 13:01, то этот большой двоичный объект не будет выбран, так как время его изменения не приходится на период выполнения задания.
Если большой двоичный объект передается в контейнер учетной записи хранения в 13:00, а задание Azure Stream Analytics запускается с параметром Другое время в 13:00 или раньше, то большой двоичный объект будет выбран, поскольку время его изменения приходится на период выполнения задания.
Если задание Azure Stream Analytics запущено в 13:00, а большой двоичный объект отправляется в контейнер учетной записи хранения в 13:01, Azure Stream Analytics выбирает большой двоичный объект. Метка времени, назначенная каждому большому двоичному объекту, основана только на BlobLastModifiedTime
. Папка большого двоичного объекта не имеет отношения к назначенной метке времени. Например, если большой двоичный объект 2019/10-01/00/b1.txt
имеет значение , BlobLastModifiedTime
2019-11-11
то метка времени, назначенная этому BLOB-объекту 2019-11-11
.
Для обработки данных как потока с помощью метки времени в полезных данных события необходимо использовать ключевое слово TIMESTAMP BY. Задание Stream Analytics извлекает данные из хранилища BLOB-объектов Azure или Azure Data Lake Storage 2-го поколения входные данные каждые секунды, если файл BLOB-объекта доступен. Если файл БОЛЬШОго двоичного объекта недоступен, экспоненциальный отступ с максимальной задержкой в 90 секунд.
Примечание.
Stream Analytics не поддерживает добавление содержимого в существующий файл большого двоичного объекта. Stream Analytics просматривает каждый файл только один раз, и любые изменения, которые произойдут в нем после того, как задание прочитает данные, не обрабатываются. Мы рекомендуем отправлять все данные для файла большого двоичного объекта за один раз, а затем добавлять более новые события в другой новый файл большого двоичного объекта.
В сценариях, когда многие большие двоичные объекты постоянно добавляются, и Stream Analytics обрабатывает большие двоичные объекты по мере их добавления, некоторые большие двоичные объекты могут быть пропущены в редких случаях из-за детализации BlobLastModifiedTime
. Этот случай можно устранить, отправив большие двоичные объекты по крайней мере через две секунды. Если этот параметр не подходит, можно использовать центры событий для потоковой передачи больших объемов событий.
Настройка хранилища BLOB-объектов в качестве потокового входа
В следующей таблице описываются все параметры на странице Новые входные данные на портале Azure при настройке хранилища больших двоичных объектов в качестве потокового входа.
Свойство | Description |
---|---|
Псевдоним входных данных | Понятное имя, используемое в запросах задания для ссылки на эти входные данные. |
Подписка | Выберите подписку, которая содержит ресурс хранилища. |
Учетная запись хранения | Имя учетной записи хранения, в которой находятся файлы больших двоичных объектов. |
Ключ учетной записи хранения | Секретный ключ, связанный с учетной записью хранения. Этот параметр заполняется автоматически, если вы не выберете режим ручной настройки параметров. |
Контейнер | Контейнеры упорядочивают большие двоичные объекты в логические группы. Чтобы создать новый контейнер, вы можете выбрать параметр Использование имеющихся (контейнеров) или Создать новый. |
Режим проверки подлинности | Укажите тип проверки подлинности, которую вы хотите использовать для подключения к учетной записи хранения. Для проверки подлинности с помощью учетной записи хранения можно использовать строка подключения или управляемое удостоверение. Для параметра управляемого удостоверения можно создать управляемое удостоверение, назначаемое системой, для задания Stream Analytics или управляемого удостоверения, назначаемого пользователем, для проверки подлинности с помощью учетной записи хранения. При использовании управляемого удостоверения управляемое удостоверение должно быть членом соответствующей роли в учетной записи хранения. |
Шаблон пути (необязательно) | Путь к файлу, используемый для поиска больших двоичных объектов в указанном контейнере. Если вы хотите считывать большие двоичные объекты из корневого каталога контейнера, не устанавливайте шаблон пути. В пути можно указать один или более экземпляров следующих трех переменных: {date} , {time} или {partition} .Пример 1: cluster1/logs/{date}/{time}/{partition} Пример 2: cluster1/logs/{date} Символ * не является допустимым значением префикса пути. Допустимыми являются только символы больших двоичных объектов Azure. Не включайте имена контейнеров или имена файлов. |
Формат даты (необязательное свойство) | При использовании переменной даты в пути это формат даты, по которому упорядочены файлы. Пример: YYYY/MM/DD Если входные данные большого двоичного объекта имеют {date} или {time} в пути, папки проверяются в порядке возрастания времени. |
Формат времени (необязательное свойство) | При использовании переменной времени в пути это формат времени, в котором размещаются файлы. В настоящее время единственным поддерживаемым значением в течение нескольких часов является HH . |
Ключ секции | Это необязательное поле, доступное только в том случае, если задание настроено на использование уровня совместимости 1.2 или более поздней версии. Если входные данные секционируются по свойству, вы можете добавить сюда имя этого свойства. Он используется для повышения производительности запроса, если он включает предложение PARTITION BY или GROUP BY для этого свойства. Если это задание использует уровень совместимости 1.2 или более поздней версии, это поле по умолчанию имеет значение PartitionId. |
Число входных разделов | Это поле доступно только в том случае, если в шаблоне пути есть сегмент {partition}. В качестве значения свойства допускаются целые числа >=1. Все включения {partition} в pathPattern будут заполняться числами в диапазоне от 0 до значения этого поля, уменьшенного на единицу. |
Формат сериализации событий | Формат сериализации (JSON, CSV, Avro, Parquet или Other (Protobuf, XML, собственная...)) входящего потока данных. Убедитесь, что формат JSON совпадает со спецификацией и не содержит ведущий "0" в десятичных числах. |
Кодирование | В настоящее время единственным поддерживаемым форматом кодирования файлов CSV и JSON является UTF-8. |
Сжатие | Тип сжатия, используемый для чтения входящего потока данных, например Нет (по умолчанию), Gzip или Deflate. |
При поступлении данных из хранилища больших двоичных объектов запрос Stream Analytics может получить доступ к следующим полям метаданных:
Свойство | Description |
---|---|
BlobName | Имя входного большого двоичного объекта, от которого поступило событие. |
EventProcessedUtcTime | Дата и время обработки события Stream Analytics. |
BlobLastModifiedUtcTime | Дата и время последнего изменения большого двоичного объекта. |
PartitionId | Идентификатор секции для входного адаптера (нумерация идет от нуля). |
Например, используя эти поля, можно писать запросы, как в следующем примере:
SELECT
BlobName,
EventProcessedUtcTime,
BlobLastModifiedUtcTime
FROM Input
Потоковая передача данных из Apache Kafka
Azure Stream Analytics позволяет подключаться непосредственно к кластерам Apache Kafka для приема данных. Решение является низким кодом и полностью управляется командой Azure Stream Analytics в Корпорации Майкрософт, что позволяет ему соответствовать стандартам соответствия бизнес-требованиям. Входные данные Kafka являются обратно совместимыми и поддерживают все версии с последним выпуском клиента, начиная с версии 0.10. Пользователи могут подключаться к кластерам Kafka в виртуальной сети и кластерах Kafka с общедоступной конечной точкой в зависимости от конфигураций. Конфигурация зависит от существующих соглашений о конфигурации Kafka. Поддерживаемые типы сжатия: None, Gzip, Snappy, LZ4 и Zstd.
Дополнительные сведения см. в статье "Потоковая передача данных из Kafka" в Azure Stream Analytics (предварительная версия).