Использование эталонных данных для поиска в Stream Analytics

Эталонные данные — это конечный набор данных, которые являются статическими или медленно изменяющимися по своей природе. Эти данные используются для поиска или расширения потоков данных. Эталонные данные также называются таблицей подстановки.

Возьмем в качестве примера сценарий с использованием Интернета вещей. Вы можете хранить метаданные о датчиках, которые не меняются часто, в эталонных данных. Затем вы можете объединить эти данные с потоками данных Интернета вещей в реальном времени.

Azure Stream Analytics загружает эталонные данные в память, чтобы сократить задержки при обработке потока. Для использования эталонных данных в задании Stream Analytics обычно используется соединение эталонных данных в запросе.

Пример

Вы можете создавать поток событий в реальном времени, когда автомобили проезжают пункт взимания платы. Пункт может записывать номерные знаки в реальном времени. Эти данные можно объединить со статическим набором данных, содержащим регистрационные данные, для определения номерных знаков с истекшим сроком действия.

SELECT I1.EntryTime, I1.LicensePlate, I1.TollId, R.RegistrationId  
FROM Input1 I1 TIMESTAMP BY EntryTime  
JOIN Registration R  
ON I1.LicensePlate = R.LicensePlate  
WHERE R.Expired = '1'

Stream Analytics поддерживает Хранилище BLOB-объектов Azure, Azure Data Lake Storage 2-го поколения и База данных SQL Azure в качестве уровня хранилища для эталонных данных. Если у вас есть эталонные данные в других хранилищах данных, попробуйте использовать Фабрика данных Azure для извлечения, преобразования и загрузки данных в одно из поддерживаемых хранилищ данных. Дополнительные сведения см. в разделе "Действие копирования" в Фабрика данных Azure обзоре.

Хранилище BLOB-объектов Azure или Azure Data Lake служба хранилища 2-го поколения

Эталонные данные моделируются как последовательность BLOB-объектов в порядке возрастания даты и времени, указанных в имени BLOB-объекта. BLOB-объекты можно добавлять только в конец последовательности, используя значения даты и времени, большее, чем значение даты, определенное последним BLOB-объектом в последовательности. BLOB-объекты определяются во входной конфигурации.

Дополнительные сведения см. в статье Использование эталонных данных из Хранилища BLOB-объектов для задания Stream Analytics.

Настройка эталонных данных большого двоичного объекта

Чтобы настроить эталонные данные, необходимо сначала создать входные данные соответствующего типа. В следующей таблице описано каждое свойство, которое необходимо предоставить при создании входных эталонных данных.

Имя свойства Description
Псевдоним входных данных Понятное имя, с помощью которого запрос задания ссылается на эти входные данные.
Storage account Имя учетной записи хранения, в которой находятся большие двоичные объекты. Если учетная запись расположена в одной подписке с заданием Stream Analytics, ее имя можно выбрать в раскрывающемся списке.
Ключ учетной записи хранения Секретный ключ, связанный с учетной записью хранения. Ключ указывается автоматически, если учетная запись хранения расположена в одной подписке с заданием Stream Analytics.
Контейнер хранилища Контейнеры обеспечивают логическое группирование BLOB-объектов, хранящихся в Хранилище BLOB-объектов. При передаче BLOB-объекта в Хранилище BLOB-объектов для него необходимо указать контейнер.
Шаблон пути Это обязательное свойство, которое используется для поиска BLOB-объектов в указанном контейнере. В пути можно указать один или несколько экземпляров переменных {date} и {time}.
Пример 1: products/{date}/{time}/product-list.csv
Пример 2: products/{date}/product-list.csv
Пример 3: product-list.csv

Если BLOB-объект не существует по указанному пути, задание Stream Analytics будет бесконечно ожидать, пока он не станет доступным.
Формат даты [необязательное свойство] Если в указанном шаблоне пути использовалась переменная {date}, в раскрывающемся списке поддерживаемых форматов можно выбрать формат даты для упорядочивания BLOB-объектов.
Например: ГГГГ/ММ/ДД или ММ/ДД/ГГГГ.
Формат времени [необязательное свойство] Если в указанном шаблоне пути использовалась переменная {time}, в раскрывающемся списке поддерживаемых форматов можно выбрать формат даты для упорядочивания BLOB-объектов.
Примеры: ЧЧ, ЧЧ/мм или ЧЧ-мм.
Формат сериализации событий Чтобы запросы работали как следует, в задании Stream Analytics нужно указать, какой формат сериализации используется для потоков входящих данных. Поддерживаемые форматы для эталонных данных — это CSV и JSON.
Кодировка В настоящее время единственным поддерживаемым форматом кодировки является UTF-8.

Статические ссылочные данные

Возможно, ваши эталонные данные не изменятся. Чтобы включить поддержку статических эталонных данных, укажите статический путь во входной конфигурации.

Stream Analytics будет обрабатывать BLOB-объекты по указанному пути. Токены подстановки {date} и {time} можно не использовать. Так как эталонные данные невозможно изменить в Stream Analytics, перезапись BLOB-объекта в статических эталонных данных не рекомендуется.

Создание эталонных данных по расписанию

Эталонные данные могут быть медленно изменяющимся набором данных. Чтобы обновить эталонные данные, укажите шаблон пути во входной конфигурации с помощью токенов подстановки {date} и {time}. На основе этого свойства path pattern служба Stream Analytics будет выбирать обновленные определения ссылочных данных.

Например, шаблон sample/{date}/{time}/products.csv с форматом даты ГГГГ-ММ-ДД и форматом времени ЧЧ-мм указывает Stream Analytics выбрать обновленный BLOB-объект sample/2015-04-16/17-30/products.csv 16 апреля 2015 г. в 17:30 (время в формате UTC).

Stream Analytics каждую минуту автоматически проверяет BLOB-объекты с эталонными данными на наличие изменений. BLOB-объект с меткой времени 10:30:00 может быть отправлен с небольшой задержкой, например, в 10:30:30. Вы заметите небольшую задержку в задании Stream Analytics, ссылающегося на этот большой двоичный объект.

Чтобы избежать таких ситуаций, передайте BLOB-объект не раньше целевого времени, которое в этом примере равно 10:30:00. У задания Stream Analytics теперь будет достаточно времени для обнаружения и загрузки BLOB-объекта в памяти и выполнения операций.

Примечание.

Сейчас задания Stream Analytics ищут обновление BLOB-объекта, только если значение времени компьютера равно значению времени, закодированного в имени BLOB-объекта. Например, задание ищет sample/2015-04-16/17-30/products.csv как можно скорее, но не ранее 16 апреля 2015 г. в 5:30 (время в формате UTC). Оно никогда не будет искать BLOB-объект, время в закодированном имени которого опережает время в имени последнего обнаруженного BLOB-объекта.

Например, когда задание находит BLOB-объект sample/2015-04-16/17-30/products.csv, оно игнорирует все файлы с закодированной датой, предшествующей 17:30 16 апреля 2015 г. Если в том же контейнере создается BLOB-объект sample/2015-04-16/17-25/products.csv с более поздним временем, задание не будет его использовать.

В другом примере sample/2015-04-16/17-30/products.csv создается только 16 апреля 2015 г. в 22:03, но в контейнере нет BLOB-объекта с более ранней датой. Затем задание использует этот файл начиная с 22:03 16 апреля 2015 г., а до этого момента использует предыдущие эталонные данные.

Исключением из этого поведения является ситуация, когда заданию необходимо повторно обработать данные в прошлом или при первом запуске задания.

Во время запуска задание ищет самый последний BLOB-объект, созданный до указанного времени запуска задания. Такое поведение гарантирует, что при запуске задания у вас будет непустой эталонный набор данных. В противном случае задание будет показывать следующие данные диагностики: Initializing input without a valid reference data blob for UTC time <start time>.

При обновлении эталонного набора данных создается журнал диагностики: Loaded new reference data from <blob path>. По многим причинам в задании может потребоваться перезагрузка предыдущего эталонного набора данных. Чаще всего причина заключается в повторной обработке предыдущих данных. В это время создается один и тот же журнал диагностики. Это действие не означает, что текущие потоковые данные используют прошлые эталонные данные.

Фабрика данных Azure может использоваться для управления заданием по созданию обновленных больших двоичных объектов, необходимых службе Stream Analytics для обновления определений ссылочных данных.

Фабрика данных представляет собой облачную службу интеграции информации, которая организует и автоматизирует перемещение и преобразование данных. Фабрика данных позволяет подключаться к большому количеству облачных и локальных хранилищ данных, позволяя легко перемещать данные по обычному расписанию, которое вы укажете.

Дополнительные сведения о том, как можно настроить конвейер Фабрики данных и создать эталонные данные для Stream Analytics, обновляемые по заданному расписанию, см. в этом примере в GitHub.

Советы по обновлению эталонных данных большого двоичного объекта

  • Не перезаписывайте BLOB-объекты эталонных данных, так как они являются неизменяемыми.
  • Рекомендуемый способ обновления ссылочных данных — выполнить следующие действия.
    • использовать {date} и {time} в шаблоне пути;
    • добавить новый BLOB-объект, используя тот же контейнер и шаблон пути, которые определены во входных данных задания;
    • использовать дату и время, большие, чем дата и время, указанные в последнем BLOB-объекте в последовательности.
  • Ссылочные blob-объекты не упорядочены по времени последнего изменения большого двоичного объекта. Они упорядочиваются только по дате и времени, указанным в имени BLOB-объекта, с использованием подстановок {date} и {time}.
  • Чтобы не отображалось слишком большое количество BLOB-объектов, попробуйте удалить очень старые BLOB-объекты, которые больше не будут обрабатываться. В некоторых случаях, например при перезапуске, Stream Analytics может потребоваться повторная обработка небольшого объема данных.

База данных SQL Azure

Ваше задание Stream Analytics извлекает эталонные данные Базы данных SQL и сохраняет их в памяти в виде моментального снимка для обработки. Моментальный снимок эталонных данных также хранится в контейнере в учетной записи хранения. Учетная запись хранения указывается в параметрах конфигурации.

Контейнер создается автоматически при запуске задания. Если задание остановлено или переходит в состояние сбоя, при его перезапуске автоматически созданные контейнеры удаляются.

Если эталонные данные являются медленно изменяющимся набором данных, необходимо периодически обновлять моментальный снимок, который используется в задании.

Stream Analytics позволяет задать частоту обновления при настройке входного подключения к Базе данных SQL. Среда выполнения Stream Analytics будет запрашивать Базу данных SQL с интервалом, определенным частотой обновления. Наиболее быстрая частота обновления, которая поддерживается — один раз в минуту. Для каждого обновления Stream Analytics сохраняет новый моментальный снимок в указанной учетной записи хранения.

Stream Analytics предоставляет два варианта выполнения запросов к Базе данных SQL. Запрос моментального снимка является обязательным, и его необходимо включить в каждое задание. После чего Stream Analytics выполняет запрос моментального снимка с учетом интервала обновления. В качестве эталонного набора данных используется результат запроса (моментальный снимок).

Запрос моментального снимка должен соответствовать большинству сценариев. При возникновении проблем с производительностью с большими наборами данных и быстрой частотой обновления вы можете использовать вариант разностного запроса. Запросы, выполнение которых занимает более 60 с до возврата эталонного набора данных, приведут к времени ожидания.

Используя вариант разностных запросов, Stream Analytics выполняет запрос моментального снимка изначально, чтобы получить базовый эталонный набор данных. После этого Stream Analytics выполняет разностный запрос с учетом интервала обновления, чтобы получить добавочные изменения. Эти добавочные изменения постоянно применяются к эталонному набору данных, чтобы обновлять их. Использование варианта разностного запроса помогает снизить затраты на хранение и количество сетевых операций ввода-вывода.

Настройка эталонных данных Базы данных SQL

Чтобы настроить эталонные данные Базы данных SQL, необходимо сначала создать входные эталонные данные. В следующей таблице описано каждое свойство, которое необходимо предоставить при создании входных эталонных данных. Дополнительные сведения см. в статье Использование эталонных данных из Базы данных SQL для задания Stream Analytics.

Можно использовать управляемый экземпляр Azure SQL в качестве входных эталонных данных. Вам нужно настроить общедоступную конечную точку в Управляемом экземпляре SQL. Затем вручную настройте следующие параметры в Stream Analytics. Виртуальная машина Azure, на которой работает SQL Server с подключенной базой данных, также дополнительно настраивается вручную с использованием этих параметров.

Имя свойства Description
Псевдоним входных данных Понятное имя, с помощью которого запрос задания ссылается на эти входные данные.
Отток подписок Ваша подписка
База данных База данных SQL с эталонными данными. Для Управляемого экземпляра SQL необходимо указать порт 3342. Например, sampleserver.public.database.windows.net,3342.
Username Имя пользователя, связанное с вашим экземпляром Базы данных SQL.
Пароль Пароль, связанный с вашим экземпляром Базы данных SQL.
Периодическое обновление Этот параметр позволяет выбрать частоту обновления. Выберите Вкл., чтобы указать частоту обновления в формате ДД:ЧЧ:MM.
Запрос моментального снимка Это параметр запроса по умолчанию, который извлекает эталонные данные из вашего экземпляра Базы данных SQL.
Разностный запрос Для более сложных сценариев с большими наборами данных и краткой частотой обновления добавьте разностной запрос.

Ограничение размера

Для оптимизации производительности используйте эталонные наборы данных размером менее 300 МБ. Эталонные наборы данных размером до 5 ГБ поддерживаются в заданиях с шестью и более единицами потоковой передачи. Использование большого набора эталонных данных может повлиять на общую задержку вашего задания.

Сложность запроса может возрасти, если включить обработку с отслеживанием состояния, включая агрегаты данных на основе периодов, временные соединения и временные аналитические функции. При увеличении сложности уменьшается максимальный поддерживаемый размер эталонных данных.

Если Stream Analytics не сможет загружать эталонные данные и выполнять сложные операции, заданию не хватит памяти и оно завершится сбоем. В таких случаях метрика использования единиц потоковой передачи достигает 100 %.

Количество единиц потоковой передачи Рекомендуемый размер
1 не более 50 МБ
3 не более 150 МБ
6 и более До 5 ГБ

Для эталонных данных сжатие не поддерживается. Для эталонных наборов данных размером больше 300 МБ используйте в качестве источника Базу данных SQL с вариантом разностного запроса, чтобы обеспечить оптимальную производительность. Если параметр разностного запроса не используется в таких сценариях, вы увидите пики в метрике задержки подложки при каждом обновлении эталонного набора данных.

Объединение нескольких эталонных наборов данных в задании

К входным данным потоковой передачи можно присоединить только ссылочные данные. Чтобы объединить несколько эталонных наборов данных, разбейте запрос на несколько шагов. Приведем пример:

With Step1 as (
    --JOIN input stream with reference data to get 'Desc'
    SELECT streamInput.*, refData1.Desc as Desc
    FROM    streamInput
    JOIN    refData1 ON refData1.key = streamInput.key 
)
--Now Join Step1 with second reference data
SELECT *
INTO    output 
FROM    Step1
JOIN    refData2 ON refData2.Desc = Step1.Desc 

Задание IoT Edge

Для пограничных заданий Stream Analytics поддерживаются только локальные эталонные данные. Когда задание развертывается на устройстве IoT Edge, оно загружает эталонные данные из определенного пользователем пути к файлу. Подготовьте на устройстве файл с эталонными данными.

Для контейнера Windows поместите файл с эталонными данными на локальный диск и предоставьте доступ к этому диску контейнеру Docker. Для контейнера Linux создайте том Docker и разместите на нем файл с данными.

Эталонные данные в обновлении IoT Edge активируются с помощью развертывания. После активации модуль Stream Analytics обрабатывает обновленные данные, не останавливая выполнение задания.

Обновить эталонные данные можно двумя способами:

  • обновить путь эталонных данных в задании Stream Analytics на портале Azure;
  • обновить развертывание IoT Edge.

Следующие шаги