Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Stream Analytics интегрируется с потоками данных Azure в качестве входных данных из пяти типов ресурсов:
- Azure Event Hubs
- Azure IoT Hub
- Azure хранилище BLOB-объектов
- Azure Data Lake Storage Gen2
- Apache Kafka
Эти входные ресурсы могут существовать в той же Azure подписке, что и задание Stream Analytics или в другой подписке.
Сжатие
Stream Analytics поддерживает сжатие для всех источников входных данных. Поддерживаемые типы сжатия: None, Gzip и Deflate. Stream Analytics не поддерживает сжатие ссылочных данных. Если входные данные в сжатом формате Avro, Stream Analytics обрабатывает их прозрачно. Вам не нужно указывать тип сжатия с сериализацией Avro.
Создание, изменение или проверка входных данных
Вы можете использовать портал Azure, Visual Studio и Visual Studio Code для добавления и изменения существующих входных данных в задании потоковой передачи. Вы также можете протестировать входные подключения и тестовые запросы на основе примеров данных на портале Azure, Visual Studio и Visual Studio Code. При написании запроса входные данные указываются в предложении FROM. Список доступных входных данных можно получить на странице запроса на портале. Если вы хотите использовать несколько входных данных, JOIN их или написать несколько SELECT запросов.
Примечание.
Для лучшего локального опыта разработки используйте средства Stream Analytics для Visual Studio Code. Существуют известные пробелы функций в средствах Stream Analytics для Visual Studio 2019 (версия 2.6.3000.0), и она не будет улучшена.
Потоковая передача данных из Центров событий
Azure Event Hubs — это высокомасштабируемая система публикации и подписки для обработки событий. Концентратор событий может обрабатывать миллионы событий в секунду, позволяя вам обрабатывать и анализировать огромное количество данных, создаваемых подключенными устройствами и приложениями. Вместе центры событий и Stream Analytics предоставляют комплексное решение для аналитики в режиме реального времени. Центры событий передают события в Azure в режиме реального времени, а задания Stream Analytics обрабатывают эти события в режиме реального времени. Например, в Центры событий можно отправлять сведения о щелчках, показания датчиков или журналы сетевых событий. Затем можно создать задания Stream Analytics для использования центров событий для входных данных для фильтрации, агрегирования и корреляции в режиме реального времени.
EventEnqueuedUtcTime — метка времени прибытия события в Event Hubs и служит временной меткой по умолчанию для событий, поступающих из Event Hubs в Stream Analytics. Чтобы обрабатывать данные как поток, используя метку времени в данных события, необходимо использовать ключевое слово TIMESTAMP BY.
Группы потребителей Event Hubs
Настройте входные данные каждого концентратора событий с помощью собственной группы потребителей. Если задание содержит самосоединение или множество входных данных, несколько читателей на последующих этапах будут считывать некоторые из входных данных. Эта ситуация влияет на количество читателей в группе потребителей. Чтобы избежать превышения ограничения Event Hubs на пять получателей на группу получателей для каждого раздела, назначьте отдельную группу получателей для каждой задачи Stream Analytics. Существует также ограничение в 20 групп потребителей для концентратора событий уровня "Стандартный". Дополнительные сведения см. в статье Устранение неполадок входных данных в Azure Stream Analytics.
Создание входных данных из Центров событий
В следующей таблице описано каждое свойство на странице ввода New на портале Azure для потоковой передачи данных из концентратора событий:
| Свойство | Описание |
|---|---|
| Входной псевдоним | Понятное имя, используемое в запросах задания для ссылки на эти входные данные. |
| Subscription | Выберите подписку Azure, в которой существует ресурс концентратора событий. |
| Пространство имен Концентратора событий | Пространство имен Центров событий — это контейнер для центров событий. При создании концентратора событий также создается пространство имен. |
| Имя концентратора событий | Имя концентратора событий для использования в качестве источника входных данных. |
| Группа потребителей Концентратора событий (рекомендуется) | Используйте отдельную группу потребителей для каждого задания Stream Analytics. Эта строка идентифицирует группу потребителей, используемую для поглощения данных из концентратора событий. Если вы не указываете группу потребителей, задание Stream Analytics использует группу потребителей $Default . |
| Режим проверки подлинности | Укажите тип проверки подлинности, который требуется использовать для подключения к концентратору событий. Используйте строку подключения или управляемое удостоверение для аутентификации в концентраторе событий. Для параметра управляемого удостоверения можно создать управляемое удостоверение, назначаемое системой, для задания Stream Analytics или управляемого удостоверения, назначаемого пользователем, для проверки подлинности в концентраторе событий. При использовании управляемого удостоверения оно должно быть членом роли Azure Event Hubs Data Receiver или Azure Event Hubs Data Owner. |
| Имя политики Концентратора событий | Политика общего доступа, которая предоставляет доступ к Центрам событий. Каждой политике общего доступа присваивается имя, а также для нее задаются разрешения и ключи доступа. Этот параметр заполняется автоматически, если не выбран вариант указания параметров Центров событий вручную. |
| Ключ раздела | Это необязательное поле доступно только в том случае, если задание настроено для использования уровня совместимости 1.2 или более поздней версии. Если входные данные секционируются по свойству, добавьте здесь имя этого свойства. Используйте это для повышения производительности запроса, если он содержит условие PARTITION BY или GROUP BY по этому свойству. Если это задание использует уровень совместимости 1.2 или выше, это поле по умолчанию используется PartitionId. |
| Формат сериализации событий | Формат сериализации (JSON, CSV, Avro) входящего потока данных. Убедитесь, что формат JSON соответствует спецификации и не содержит ведущих нулей перед десятичными числами. |
| Кодирование | Сейчас UTF-8 — единственный поддерживаемый формат кодировки. |
| Тип сжатия событий | Тип сжатия, используемый для чтения входящего потока данных, например Нет (по умолчанию), Gzip или Deflate. |
| Реестр схем | Выберите реестр схем с схемами для данных событий, полученных из концентратора событий. |
Когда данные поступают из входного потока Центров событий, у вас есть доступ к следующим полям метаданных в запросе Stream Analytics:
| Свойство | Описание |
|---|---|
| EventProcessedUtcTime | Дата и время, когда Stream Analytics обрабатывает событие. |
| EventEnqueuedUtcTime | Дата и время получения событий Центрами событий. |
| PartitionId (Идентификатор раздела) | Идентификатор раздела для входного адаптера (нумерация идет от нуля). |
С помощью этих полей можно написать запрос, как показано в следующем примере:
SELECT
EventProcessedUtcTime,
EventEnqueuedUtcTime,
PartitionId
FROM Input
Примечание.
При использовании Центров событий в качестве конечной точки для маршрутов IoT Hub доступ к метаданным IoT Hub можно получить с помощью функции GetMetadataPropertyValue.
Поток данных из IoT Hub
Azure IoT Hub — это высокомасштабируемая система обработки событий с архитектурой публикации и подписки, оптимизированная для сценариев Интернета вещей.
Метка времени по умолчанию для событий, поступающих из IoT Hub в Stream Analytics, — это время их прибытия в IoT Hub, которое EventEnqueuedUtcTime. Чтобы обрабатывать данные как поток с помощью метки времени в полезной нагрузке события, используйте ключевое слово TIMESTAMP BY.
группы потребителей IoT Hub
Настройте каждое входное соединение IoT Hub для Stream Analytics так, чтобы у него была своя группа потребителей. Если задание содержит самосоединение или имеет несколько входных данных, несколько читателей могут обрабатывать одни и те же входные данные. Эта ситуация влияет на количество читателей в одной потребительской группе. Чтобы избежать превышения ограничения Azure IoT Hub на пять читателей для каждой группы потребителей на раздел, укажите группу потребителей для каждой задачи Stream Analytics.
Настройте IoT Hub в качестве входного потока данных
В следующей таблице объясняется значение каждого свойства на странице New input на портале Azure при конфигурировании IoT Hub как входного потока.
| Свойство | Описание |
|---|---|
| Входной псевдоним | Понятное имя, используемое в запросах задания для ссылки на эти входные данные. |
| Subscription | Выберите подписку, в которой существует ресурс IoT Hub. |
| IoT Hub | Имя IoT Hub, используемого в качестве входных данных. |
| Группа потребителей | Используйте другую группу потребителей для каждого задания Stream Analytics. Группа потребителей получает данные из IoT Hub. Stream Analytics использует группу получателей "$Default", если не указано иное. |
| Имя политики общего доступа | Политика общего доступа, предоставляющая доступ к IoT Hub. Каждой политике общего доступа присваивается имя, а также для нее задаются разрешения и ключи доступа. |
| Ключ политики общего доступа | Общий ключ доступа, используемый для авторизации доступа к IoT Hub. Этот параметр автоматически заполняется, если вы не выберете параметр для предоставления параметров IoT Hub вручную. |
| Конечная точка | Конечная точка для IoT Hub. |
| Ключ раздела | Это необязательное поле, доступное только при настройке задания для использования уровня совместимости 1.2 или более поздней версии. Если вы секционируйте входные данные по свойству, можно добавить имя этого свойства здесь. Он используется для повышения производительности запроса, если он включает оператор PARTITION BY или GROUP BY для этого атрибута. Если это задание использует уровень совместимости 1.2 или более поздней версии, это поле по умолчанию имеет значение PartitionId. |
| Формат сериализации событий | Формат сериализации (JSON, CSV, Avro) входящего потока данных. Убедитесь, что формат JSON соответствует спецификации и не содержит ведущих нулей перед десятичными числами. |
| Кодирование | Сейчас UTF-8 — единственный поддерживаемый формат кодировки. |
| Тип сжатия событий | Тип сжатия, используемый для чтения входящего потока данных, например Нет (по умолчанию), Gzip или Deflate. |
При использовании потоковых данных из IoT Hub у вас есть доступ к следующим полям метаданных в запросе Stream Analytics:
| Свойство | Описание |
|---|---|
| EventProcessedUtcTime | Дата и время обработки события. |
| EventEnqueuedUtcTime | Дата и время, когда "IoT Hub" получает событие. |
| PartitionId (Идентификатор раздела) | Идентификатор раздела для входного адаптера (нумерация идет от нуля). |
| IoTHub.MessageId | Идентификатор, используемый для корреляции двустороннего взаимодействия в IoT Hub. |
| IoTHub.CorrelationId | Идентификатор, используемый в ответах на сообщениях и обратной связи в IoT Hub. |
| IoTHub.ConnectionDeviceId | Идентификатор проверки подлинности, используемый для отправки этого сообщения. IoT Hub проставляет это значение в сообщениях, предназначенных для службы. |
| IoTHub.ConnectionDeviceGenerationId | Идентификатор поколения проверенного устройства, используемого для отправки этого сообщения. IoT Hub метит это значение в сообщениях, направленных на службу. |
| IoTHub.EnqueuedTime | Время, когда IoT Hub получает сообщение. |
Потоковая обработка данных из хранилища BLOB или Data Lake Storage Gen2
В сценариях, связанных с хранением больших объемов неструктурированных данных в облаке, Azure Blob Storage или Azure Data Lake Storage Gen2 предоставляет экономичное и масштабируемое решение. Данные в хранилище объектов Blob или Azure Data Lake Storage Gen2 считаются неактивными данными. Однако Stream Analytics может обрабатывать эти данные как поток данных.
Часто используемый сценарий использования таких входных данных с Stream Analytics — обработка журналов. В этом сценарии вы записываете файлы данных телеметрии из системы и должны анализировать и обрабатывать их для извлечения значимых данных.
Метка времени по умолчанию для хранилища BLOB-объектов или события Azure Data Lake Storage Gen2 в Stream Analytics — это метка времени последнего изменения, которая BlobLastModifiedUtcTime. Если вы отправляете большой двоичный объект в учетную запись хранения в 13:00 и запускаете задание Azure Stream Analytics с помощью параметра Now в 13:01, задание не выбирает большой двоичный объект, так как его измененное время выходит за пределы периода выполнения задания.
Если вы загружаете блоб в контейнер учетной записи хранения в 13:00 и запускаете задание Azure Stream Analytics с помощью Custom Time в 13:00 или раньше, задание подхватывает блоб, так как время его изменения попадает в период выполнения задания.
Если вы запускаете задание Azure Stream Analytics с помощью Now в 13:00 и отправляете объект BLOB в контейнер учетной записи хранения в 13:01, Azure Stream Analytics забирает объект BLOB. Метка времени, назначенная каждому блобу, основана только на BlobLastModifiedTime. Папка, в которой находится blob, не имеет отношения к присвоенной метке времени. Например, если большой двоичный объект 2019/10-01/00/b1.txt имеет значение BlobLastModifiedTime из 2019-11-11, то метка времени, назначенная этому большому двоичному объекту, — 2019-11-11.
Чтобы обрабатывать данные в виде потока с помощью метки времени в нагрузке события, необходимо использовать ключевое слово TIMESTAMP BY. Задание Stream Analytics извлекает входные данные из хранилища BLOB-объектов Azure или Azure Data Lake Storage Gen2 каждую секунду, если файл BLOB-объекта доступен. Если BLOB-файл недоступен, задание использует экспоненциальное увеличение интервала между попытками с максимальной задержкой в 90 секунд.
Примечание.
Stream Analytics не поддерживает добавление данных в существующий объект Blob. Stream Analytics просматривает каждый файл только один раз, и он не обрабатывает какие-либо изменения, происходящие в файле после того, как задание считывает данные. Мы рекомендуем отправлять все данные для файла большого двоичного объекта за один раз, а затем добавлять более новые события в другой новый файл большого двоичного объекта.
В случаях, когда вы постоянно добавляете много BLOB-объектов, и Stream Analytics обрабатывает их по мере добавления, в редких случаях могут пропускаться некоторые BLOB-объекты из-за уровня детализации BlobLastModifiedTime. Эту проблему можно устранить, отправляя BLOB-объекты с интервалом не менее двух секунд. Если этот параметр не подходит, можно использовать центры событий для потоковой передачи больших объемов событий.
Настройка хранилища BLOB-объектов в качестве потокового входа
В следующей таблице объясняется каждое свойство на странице New input на портале Azure при конфигурации Blob-хранилища как потокового ввода.
| Свойство | Описание |
|---|---|
| Входной псевдоним | Понятное имя, используемое в запросах задания для ссылки на эти входные данные. |
| Subscription | Выберите подписку, в которой есть ресурс хранилища. |
| Учетная запись хранения | Имя учетной записи хранения, где находятся blob-файлы. |
| Ключ учетной записи хранения | Секретный ключ, связанный с учетной записью хранения. Этот параметр автоматически заполняется, если вы не выберете параметр для предоставления параметров вручную. |
| Контейнер | Контейнеры предоставляют логическую группировку для блобов. Вы можете выбрать существующий контейнер или создать новый , чтобы создать новый контейнер. |
| Режим проверки подлинности | Укажите тип проверки подлинности, который требуется использовать для подключения к учетной записи хранения. Для аутентификации с учетной записью хранилища можно использовать строку подключения или управляемое удостоверение. Для параметра управляемого удостоверения вы можете либо создать управляемое удостоверение, назначаемое системой, для задания Stream Analytics, либо управляемое удостоверение, назначаемое пользователем, для аутентификации с учетной записью хранения. При использовании управляемого удостоверения управляемое удостоверение должно быть членом соответствующей роли в учетной записи хранения. |
| Шаблон пути (необязательно) | Путь к файлу, используемый для обнаружения объектов BLOB в указанном контейнере. Если вы хотите считывать объекты BLOB из корня контейнера, не устанавливайте шаблон пути. В пути можно указать один или несколько экземпляров следующих трех переменных: {date}или {time}{partition}Пример 1: cluster1/logs/{date}/{time}/{partition}Пример 2: cluster1/logs/{date}Символ * не является допустимым значением префикса пути. Разрешены только допустимые символы Azure блобов. Не включайте имена контейнеров или имена файлов. |
| Формат даты (необязательно) | При использовании переменной даты в пути это формат даты, по которому упорядочены файлы. Пример: YYYY/MM/DD Если входные данные большого двоичного объекта имеют {date} или {time} в пути, Stream Analytics просматривает папки в порядке возрастания времени. |
| Формат времени (необязательно) | Если вы используете временную переменную в пути, это формат времени, в котором организуются файлы. В настоящее время единственное поддерживаемое значение — HH в течение нескольких часов. |
| Ключ раздела | Это необязательное поле, доступное только при настройке задания для использования уровня совместимости 1.2 или более поздней версии. Если вы секционируйте входные данные по свойству, можно добавить имя этого свойства здесь. Он используется для повышения производительности запроса, если он включает оператор PARTITION BY или GROUP BY для этого атрибута. Если это задание использует уровень совместимости 1.2 или более поздней версии, это поле по умолчанию имеет значение PartitionId. |
| Количество входных секций | Это поле доступно только если в шаблоне пути присутствует раздел {partition}. Значение этого свойства — целое число >=1. Все появления {partition} в pathPattern будут заменяться числами от 0 до значения этого поля минус 1. |
| Формат сериализации событий | Формат сериализации (JSON, CSV, Avro) входящего потока данных. Убедитесь, что формат JSON соответствует спецификации и не содержит ведущих нулей перед десятичными числами. |
| Кодирование | В настоящее время единственным поддерживаемым форматом кодирования файлов CSV и JSON является UTF-8. |
| Compression | Тип сжатия, используемый для чтения входящего потока данных, например Нет (по умолчанию), Gzip или Deflate. |
Когда ваши данные поступают из источника Blob-хранилища, вы можете получить доступ к следующим полям метаданных в запросе Stream Analytics:
| Свойство | Описание |
|---|---|
| BlobName | Имя входного BLOB, из которого поступило событие. |
| EventProcessedUtcTime | Дата и время, когда Stream Analytics обрабатывает событие. |
| BlobLastModifiedUtcTime | Дата и время последнего изменения объекта BLOB. |
| PartitionId (Идентификатор раздела) | Идентификатор раздела для входного адаптера (нумерация идет от нуля). |
С помощью этих полей можно написать запрос, как показано в следующем примере:
SELECT
BlobName,
EventProcessedUtcTime,
BlobLastModifiedUtcTime
FROM Input
Потоковая передача (стриминг) данных из Apache Kafka
Azure Stream Analytics позволяет подключаться непосредственно к кластерам Apache Kafka для приема данных. Решение является низким кодом и полностью управляется командой Azure Stream Analytics корпорации Майкрософт, поэтому она соответствует стандартам соответствия бизнес-требованиям. Входные данные Kafka являются обратно совместимыми и поддерживают все версии, начиная с версии 0.10 с последней версией клиента. Вы можете подключиться к кластерам Kafka в виртуальной сети и кластерах Kafka с общедоступной конечной точкой в зависимости от конфигураций. Конфигурация зависит от существующих соглашений о конфигурации Kafka. Поддерживаемые типы сжатия: None, Gzip, Snappy, LZ4 и Zstd.
Дополнительные сведения см. в разделе Передача данных из Kafka в Azure Stream Analytics (предварительный просмотр).