Поделиться через


Использование ссылочных данных для поиска в Stream Analytics

Эталонные данные — это конечный набор данных, который статический или медленно меняется в природе. Он используется для поиска или расширения потоков данных. Справочные данные также называются подстановочной таблицей.

Рассмотрим сценарий Интернета вещей в качестве примера. Метаданные о датчиках, которые часто не изменяются, можно хранить в справочных данных. Затем вы можете присоединить его к потокам данных Интернета вещей в режиме реального времени.

Azure Stream Analytics загружает справочные данные в памяти для обеспечения низкой задержки потоковой обработки. Чтобы использовать ссылочные данные в задании Stream Analytics, в вашем запросе обычно используется присоединение ссылочных данных .

Пример

Вы можете получить поток событий в реальном времени, создаваемый, когда автомобили проходят пункт оплаты. Пункт оплаты может фиксировать номерные знаки в режиме реального времени. Эти данные могут присоединяться к стаическому набору данных, который содержит сведения о регистрации, чтобы определить номерные знаки с истекшим сроком действия.

SELECT I1.EntryTime, I1.LicensePlate, I1.TollId, R.RegistrationId  
FROM Input1 I1 TIMESTAMP BY EntryTime  
JOIN Registration R  
ON I1.LicensePlate = R.LicensePlate  
WHERE R.Expired = '1'

Stream Analytics поддерживает Azure Blob Storage, Azure Data Lake Storage Gen2 и Azure SQL Database в качестве уровня хранилища для справочных данных. Если у вас есть эталонные данные в других хранилищах данных, попробуйте использовать фабрику данных Azure для извлечения, преобразования и загрузки данных в одно из поддерживаемых хранилищ данных. Дополнительные сведения см. в статье Действия копирования в фабрике данных Azure.

Хранилище BLOB-объектов Azure или Azure Data Lake Storage 2-го поколения

Эталонные данные моделируются в виде последовательности объектов BLOB в порядке возрастания даты и времени, указанных в имени BLOB. BLOB можно добавлять только в конец последовательности, если его дата и время больше, чем дата и время, указанные для последнего BLOB в последовательности. БЛОБ-объекты определяются в конфигурации ввода.

Дополнительные сведения см. в статье Использование эталонных данных из облачного хранилища Блоб для задачи Stream Analytics.

Настройка ссылочных данных блобов

Чтобы настроить эталонные данные, сначала необходимо создать вход типа ссылочные данные. В следующей таблице объясняется каждое свойство, которое нужно указать при создании входных данных для ссылочных данных, а также его описание.

Имя свойства Описание
Алиас входных данных Понятное имя, используемое в запросе задания для ссылки на эти входные данные.
Учетная запись хранения Имя учетной записи хранения, в которой находятся блобы. Если она находится в той же подписке, что и задача Stream Analytics, выберите её из списка в раскрывающемся меню.
Ключ учетной записи хранения Секретный ключ, связанный с учетной записью хранения. Этот ключ автоматически заполняется, если учетная запись хранения находится в той же подписке, что и задача Stream Analytics.
Контейнер хранилища Контейнеры предоставляют логическую группировку BLOB-объектов, хранящихся в хранилище BLOB. При загрузке объекта в хранилище блобов необходимо указать для него контейнер.
Шаблон пути Это обязательное свойство используется для поиска BLOB в указанном контейнере. В пути можно указать один или несколько экземпляров переменных {date} и {time}.
Пример 1: products/{date}/{time}/product-list.csv
Пример 2: products/{date}/product-list.csv
Пример 3. product-list.csv

Если большой двоичный объект не существует в указанном пути, задание Stream Analytics ожидает неограниченное время, пока большой двоичный объект станет доступным.
Формат даты [необязательно] Если вы использовали {date} в указанном шаблоне пути, выберите формат даты, в котором ваши блобы упорядочены из раскрывающегося списка поддерживаемых форматов.
Пример: ГГГГ/ММ/ДД или ММ/ДД/ГГГГ
Формат времени [необязательно] Если вы использовали {time} в указанном шаблоне пути, выберите формат времени, в котором BLOB-объекты упорядочены из раскрывающегося списка поддерживаемых форматов.
Пример: HH, HH/mm или HH-mm
Формат сериализации событий Чтобы убедиться, что запросы работают так, как вы ожидаете, Stream Analytics должен знать, какой формат сериализации используется для входящих потоков данных. Для ссылочных данных поддерживаются форматы CSV и JSON.
Кодировка В настоящее время единственным поддерживаемым форматом кодировки является UTF-8.

Статические эталонные данные

Ваши справочные данные могут не изменяться. Чтобы включить поддержку статических ссылочных данных, укажите статический путь в входной конфигурации.

Stream Analytics извлекает блоб из указанного пути. Маркеры подстановки {date} и {time} не требуются. Так как ссылочные данные в Stream Analytics являются неизменяемыми, не рекомендуется перезаписывать статический файл ссылочных данных.

Генерировать эталонные данные по расписанию

Эталонные данные могут быть медленно изменяющимся набором данных. Чтобы обновить эталонные данные, укажите шаблон пути в входной конфигурации с помощью маркеров подстановки {date} и {time}. Stream Analytics учитывает обновленные определения ссылочных данных на основе этого шаблона пути.

Например, шаблон sample/{date}/{time}/products.csv с форматом даты YYYY-MM-DD и форматом времени HH-mm указывает Stream Analytics получить обновленный объект sample/2015-04-16/17-30/products.csv 16 апреля 2015 г. в 5:30 UTC.

Stream Analytics автоматически проверяет наличие обновленных ссылочных blob-объектов с интервалом в одну минуту. Блоб с меткой времени 10:30:00 может быть загружен с небольшой задержкой, например, 10:30:30. Вы заметите небольшую задержку в задании Stream Analytics, ссылающемся на этот блоб.

Чтобы избежать таких сценариев, отправьте двоичный объект раньше целевого времени, которое в этом примере составляет 10:30:00. Теперь задание Stream Analytics имеет достаточно времени для обнаружения объекта blob, его загрузки в память и выполнения операций.

Примечание.

В настоящее время задания Stream Analytics ищут обновление blob только тогда, когда время на компьютере достигает времени, закодированного в имени blob. Например, задание ищет sample/2015-04-16/17-30/products.csv как можно скорее, но не раньше 16 апреля 2015 года, в 17:30 по UTC. Он никогда не будет искать блоб с закодированным временем, более ранним, чем у последнего обнаруженного.

Например, после того как задание находит бинарный объект данных sample/2015-04-16/17-30/products.csv, оно игнорирует все файлы с датой до 16 апреля 2015 года в 17:30. Если sample/2015-04-16/17-25/products.csv объект, прибывающий с опозданием, создается в том же контейнере, задание не будет использовать его.

В другом примере sample/2015-04-16/17-30/products.csv производится только 16 апреля 2015 г. в 10:03 вечера, но в контейнере отсутствует объект с более ранней датой. Затем задание начинает использовать этот файл с 16 апреля 2015 г. в 10:03 вечера и использует предыдущие эталонные данные до этого момента.

Исключением из этого поведения является ситуация, когда задание должно повторно обрабатывать данные назад во времени или когда задание запускается впервые.

В момент запуска задание ищет самый последний облако данных, созданный до заданного времени начала задания. Это поведение гарантирует наличие непустого ссылочного набора данных при запуске задания. Если не удается найти нужный элемент, задание отображает следующую диагностику: Initializing input without a valid reference data blob for UTC time <start time>.

При обновлении эталонного набора данных создается журнал диагностики: Loaded new reference data from <blob path>. По ряду причин задача может потребовать перезагрузки предыдущего эталонного набора данных. Чаще всего причина заключается в повторной обработке прошлых данных. В то время создается тот же журнал диагностики. Это действие не означает, что текущие потоковые данные используют прошлые эталонные данные.

Azure Data Factory можно использовать для оркестрации задачи обновления объектов BLOB, которые требуются Stream Analytics для обновления определений ссылочных данных.

Фабрика данных — это облачная служба интеграции данных, которая управляет и автоматизирует перемещение и преобразование данных. Фабрика данных поддерживает подключения к большому количеству облачных и локальных хранилищ данных. Он может легко перемещать данные по заданному расписанию.

Дополнительные сведения о том, как настроить конвейер фабрики данных для генерации эталонных данных для Stream Analytics, обновляемых по предопределенному расписанию, см. в этом примере на GitHub.

Советы по обновлению ссылочных данных BLOB

  • Не перезаписывайте большие двоичные объекты ссылочных данных, так как они неизменяемы.
  • Рекомендуемый способ обновления ссылочных данных — выполнить следующие действия.
    • Используйте {date}/{time} в шаблоне пути.
    • Добавьте новый BLOB, используя тот же контейнер и шаблон пути, которые определены во входных данных задания.
    • Используйте дату/время позже, чем указано последним двоичным блоком в последовательности.
  • Ссылочные blob-объекты не упорядочены времени последнего изменения большого двоичного объекта. Они упорядочены только по дате и времени, указанным в имени BLOB с использованием замен {date} и {time}.
  • Чтобы избежать необходимости перечислять большое количество больших двоичных объектов, удалите старые большие двоичные объекты, для которых обработка больше не будет выполнена. Stream Analytics может потребоваться повторно обработать небольшое количество в некоторых сценариях, например перезапуск.

База данных SQL Azure

Задание для Stream Analytics извлекает справочные данные из базы данных SQL и сохраняет их в виде моментального снимка в памяти для обработки. Снимок ваших ссылочных данных также сохраняется в контейнере учетной записи хранения. Вы указываете учетную запись хранения в параметрах конфигурации.

Контейнер создается автоматически при запуске задания. Если задание останавливается или переходит в состояние сбоя, автоматически созданные контейнеры удаляются при перезапуске задания.

Если ваши эталонные данные представляют собой набор данных с медленным изменением, необходимо периодически обновлять снимок состояния, используемый в вашей задаче.

С помощью Stream Analytics можно задать частоту обновления при настройке входного подключения базы данных SQL. Среда выполнения Stream Analytics запрашивает экземпляр базы данных SQL через интервал, указанный скоростью обновления. Самая быстрая частота обновления, поддерживаемая один раз в минуту. При каждом обновлении Stream Analytics сохраняет новый снимок в указанном аккаунте хранения.

Stream Analytics предоставляет два варианта запроса к экземпляру базы данных SQL. Запрос моментального снимка является обязательным и должен быть включен в каждое задание. Аналитика потоков периодически выполняет запрос моментального снимка в соответствии с заданным интервалом обновления. Он использует результат запроса (моментального снимка) в качестве эталонного набора данных.

Запрос моментального снимка должен соответствовать большинству сценариев. При возникновении проблем с производительностью больших наборов данных и скорости быстрого обновления используйте параметр разностного запроса. Запросы, которые занимают более 60 секунд для возврата эталонного набора данных, приводят к истечении времени ожидания.

При использовании параметра дельта-запроса Stream Analytics изначально запускает запрос моментального снимка, чтобы получить базовый эталонный набор данных. После этого Stream Analytics периодически выполняет дельта-запрос на основе интервала обновления, чтобы получить изменения по сравнению с предыдущей версией. Эти добавочные изменения постоянно применяются к ссылочному набору данных, чтобы сохранить его обновленным. Использование параметра разностного запроса может помочь сократить затраты на хранение и операции ввода-вывода сети.

Настройка эталонных данных базы данных SQL

Чтобы настроить эталонные данные базы данных SQL, сначала необходимо создать входные данные ссылочных данных. В следующей таблице объясняется каждое из свойств, которые необходимо указать при создании входных данных для ссылочных данных вместе с его описанием. Дополнительные сведения см. в разделе Использование эталонных данных из базы данных SQL для задания Stream Analytics.

Управляемый экземпляр SQL Azure можно использовать в качестве входных данных для ссылки. Необходимо настроить общедоступную конечную точку в управляемом экземпляре SQL. Затем вы вручную настроите следующие параметры в Stream Analytics. Виртуальная машина Azure под управлением SQL Server с присоединенной базой данных также поддерживается путем настройки этих параметров вручную.

Имя свойства Описание
Алиас ввода Понятное имя, используемое в запросе задания для ссылки на эти входные данные.
Подписка Ваша подписка
База данных Экземпляр базы данных SQL, содержащий эталонные данные. Для управляемого экземпляра SQL необходимо указать порт 3342. Примером является sampleserver.public.database.windows.net,3342.
Имя пользователя Имя пользователя, связанное с экземпляром базы данных SQL.
Пароль Пароль, связанный с экземпляром базы данных SQL.
Периодическое обновление Этот параметр позволяет выбрать частоту обновления. Выберите В, чтобы указать частоту обновления в DD:HH:MM.
Запрос моментального снимка Этот параметр запроса по умолчанию извлекает эталонные данные из экземпляра базы данных SQL.
Запрос Delta Для расширенных сценариев с большими наборами данных и короткой скоростью обновления добавьте разностный запрос.

Ограничение размера

Используйте эталонные наборы данных, которые меньше 300 МБ для оптимальной производительности. Ссылочные наборы данных размером 5 ГБ или ниже поддерживаются в заданиях с шестью единицами потоковой передачи или более. Использование большого ссылочного набора данных может повлиять на задержку выполнения задания.

Сложность запроса может увеличиться, чтобы включить обработку с отслеживанием состояния, такие как оконные агрегаты, темпоральные соединения и темпоральные аналитические функции. При увеличении сложности максимальный поддерживаемый размер ссылочных данных уменьшается.

Если Stream Analytics не может загружать эталонные данные и выполнять сложные операции, задание исчерпывает память и завершается сбоем. В таких случаях метрика процентного использования блоков потоковой передачи достигает 100%.

Количество единиц потоковой передачи Рекомендуемый размер
1 50 МБ или ниже
3 150 МБ или ниже
6 и более 5 ГБ или ниже

Поддержка сжатия недоступна для эталонных данных. Для ссылочных наборов данных размером более 300 МБ используйте базу данных SQL в качестве источника с параметром разностного запроса для оптимальной производительности. Если в таких сценариях не используется параметр дельта-запроса, при каждом обновлении эталонного набора данных вы будете наблюдать резкие колебания в метрике задержки порогового значения.

Объединить несколько ссылочных наборов данных в задаче

Присоединять можно только входные данные справочного типа к входным данным потоковой передачи. Таким образом, чтобы объединить несколько ссылочных наборов данных, разбийте запрос на несколько шагов. Ниже приведен пример:

With Step1 as (
    --JOIN input stream with reference data to get 'Desc'
    SELECT streamInput.*, refData1.Desc as Desc
    FROM    streamInput
    JOIN    refData1 ON refData1.key = streamInput.key 
)
--Now Join Step1 with second reference data
SELECT *
INTO    output 
FROM    Step1
JOIN    refData2 ON refData2.Desc = Step1.Desc 

Задания IoT Edge

Для граничных заданий Stream Analytics поддерживаются только локальные справочные данные. При развертывании задания на устройстве IoT Edge он загружает справочные данные из пользовательского пути к файлу. Подготовите файл ссылочных данных на устройстве.

Для контейнера Windows поместите файл ссылочных данных на локальный диск и поделитесь локальным диском с контейнером Docker. Для контейнера Linux создайте том Docker и загрузите файл данных в том.

Референтные данные об обновлении IoT Edge инициализируются развертыванием. После активации модуль Stream Analytics выбирает обновленные данные без остановки выполнения задания.

Эталонные данные можно обновить двумя способами:

  • Обновите путь ссылочных данных в задании Stream Analytics на портале Azure.
  • Обновите развертывание IoT Edge.

Дальнейшие действия