События
31 мар., 23 - 2 апр., 23
Самое большое событие обучения Fabric, Power BI и SQL. 31 марта – 2 апреля. Используйте код FABINSIDER, чтобы сэкономить $400.
Зарегистрироваться сегодняЭтот браузер больше не поддерживается.
Выполните обновление до Microsoft Edge, чтобы воспользоваться новейшими функциями, обновлениями для системы безопасности и технической поддержкой.
В этом руководстве вы создадите задание Azure Stream Analytics, которое считывает события из Центры событий Azure, выполняет запрос к данным события, а затем вызывает функцию Azure, которая записывается в экземпляр Кэш Azure для Redis.
Примечание
В этом руководстве описано следующее:
Если у вас еще нет подписки Azure, создайте бесплатную учетную запись, прежде чем начинать работу.
Прежде чем начать работу, нужно сделать следующее:
Войдите на портал Azure.
Прежде чем Stream Analytics сможет проанализировать поток мошеннических вызовов, необходимо отправить некоторые примеры данных в концентратор событий. В этом руководстве вы отправляете данные в Azure с помощью Центры событий Azure.
Следуйте инструкциям ниже, чтобы создать концентратор событий и отправить в него данные вызовов:
Войдите на портал Azure.
Выберите все службы в меню слева, выберите Интернет вещей, наведите указатель мыши на центры событий и нажмите кнопку +(Добавить).
На странице "Создание пространства имен" выполните следующие действия.
Выберите подписку Azure, в которой нужно создать концентратор событий.
Для группы ресурсов выберите "Создать" и введите имя группы ресурсов. Пространство имен Центров событий создается в этой группе ресурсов.
Для имени пространства имен введите уникальное имя пространства имен Центров событий.
В поле "Расположение" выберите регион, в котором нужно создать пространство имен.
Для ценовой категории выберите "Стандартный".
В нижней части страницы выберите Review + create (Проверить и создать).
На странице "Проверка и создание" мастера создания пространства имен выберите "Создать" в нижней части страницы после просмотра всех параметров.
После успешного развертывания пространства имен выберите Перейти к ресурсу, чтобы перейти на страницу Пространство имен Центров событий.
На странице Пространство имен Центров событий выберите +Центр событий в строке команд.
На странице Создание центра событий введите имя концентратора событий. Установите для параметра Количество разделов значение 2. Используйте параметры по умолчанию в оставшихся параметрах и выберите "Просмотр и создание".
На странице Просмотр и создание выберите Создать в нижней части страницы. Дождитесь успешного завершения развертывания.
Чтобы приложение могло отправлять данные в Центры событий Azure, в концентраторе событий должна быть политика, обеспечивающая доступ. Политика доступа создает строку подключения, которая включает сведения об авторизации.
На странице пространства имен Центров событий выберите политики общего доступа в меню слева.
Выберите RootManageSharedAccessKey из списка политик.
Затем нажмите кнопку копирования рядом со строкой подключения — первичный ключ.
Вставьте строку подключения в текстовый редактор. Эта строка подключения вам понадобится в следующем разделе.
Строка подключения выглядит следующим образом:
Endpoint=sb://<Your event hub namespace>.servicebus.windows.net/;SharedAccessKeyName=<Your shared access policy name>;SharedAccessKey=<generated key>
Обратите внимание, что строка подключения содержит несколько пар "ключ-значение", разделенных точкой с запятой: Endpoint, SharedAccessKeyName и SharedAccessKey.
Перед запуском приложения TelcoGenerator необходимо настроить его для отправки данных в созданный ранее концентратор событий Azure.
Извлеките содержимое файла TelcoGenerator.zip.
Откройте файл TelcoGenerator\TelcoGenerator\telcodatagen.exe.config
в текстовом редакторе по своему выбору. Файлов с расширением .config
несколько, поэтому убедитесь, что открываете нужный файл.
Обновите элемент <appSettings>
в файле конфигурации, указав следующие сведения:
EntityPath
значение (;EntityPath=myeventhub
) в конце.
Обязательно удалите точку с запятой перед значением EntityPath.Сохраните файл.
Далее откройте командное окно и перейдите в папку, в которой распаковано приложение TelcoGenerator. Затем введите следующую команду:
.\telcodatagen.exe 1000 0.2 2
Эта команда принимает следующие параметры:
Через несколько секунд приложение запустит отображение записей вызовов на экране, так как будет отправлять их в концентратор событий. Данные телефонных звонков содержат следующие поля:
Запись | Определение |
---|---|
CallrecTime | Метка времени начала вызова. |
SwitchNum | Телефонный переключатель, используемый для совершения вызова. В этом примере переключатели выражены строками, представляющими страну или регион происхождения (США, Китай, Соединенное Королевство, Германия или Австралия). |
CallingNum | Номер телефона звонящего. |
CallingIMSI | Идентификатор абонента международной мобильной связи (IMSI). Это уникальный идентификатор звонящего. |
CalledNum | Номер телефона получателя. |
CalledIMSI | Идентификатор IMSI. Это уникальный идентификатор получателя звонка. |
Теперь, когда у вас есть поток событий звонков, можно создать задание Stream Analytics, которое считывает данные из концентратора событий.
Для подписки выберите подписку, содержащую пространство имен Центров событий.
Для группы ресурсов выберите созданную ранее группу ресурсов.
В разделе сведений об экземпляре введите уникальное имя задания Stream Analytics.
Для региона выберите регион, в котором нужно создать задание Stream Analytics. Рекомендуется разместить задание и концентратор событий в одном регионе, чтобы обеспечить лучшую производительность и не платить за передачу данных между регионами.
Для среды< размещения выберите Cloud , если она еще не выбрана. Задания Stream Analytics можно развернуть в облаке или на граничных устройствах. Значение Cloud позволяет выполнять развертывание в облаке Azure, а значение Edge — на устройстве IoT Edge.
Для единиц потоковой передачи выберите 1. Единица потоковой передачи предоставляет вычислительные ресурсы, которые необходимы для выполнения задания. По умолчанию установлено значение 1. Чтобы узнать о масштабировании единиц потоковой передачи, ознакомьтесь со статьей Обзор и настройка единиц потоковой передачи.
В нижней части страницы выберите Review + create (Проверить и создать).
Следующий шаг — определить источник входных данных, откуда задание будет считывать данные с помощью концентратора событий, созданного в предыдущем разделе.
На странице Задание Stream Analytics в разделе Топология заданий в меню слева выберите Входные данные.
На странице "Входные данные" выберите +Добавить входные данные и концентратор событий.
На странице концентратора событий выполните следующие действия.
Для псевдонима входных данных введите CallStream. Псевдоним ввода — это понятное имя для идентификации входных данных. Псевдоним ввода может содержать только буквенно-цифровые символы и дефисы, а также должен содержать 3–63 символов.
Для подписки выберите подписку Azure, в которой вы создали концентратор событий. Концентратор событий может находиться в той же подписке, что и задание Stream Analytics, или в другой.
Для пространства имен Центров событий выберите пространство имен Центров событий, созданное в предыдущем разделе. Все пространства имен, доступные в текущей подписке, перечислены в раскрывающемся списке.
Для имени концентратора событий выберите концентратор событий, созданный в предыдущем разделе. Все центры событий, доступные в выбранном пространстве имен, перечислены в раскрывающемся списке.
Для группы потребителей концентратора событий следует выбрать параметр "Создать", чтобы создать новую группу потребителей в концентраторе событий. Для каждого задания Stream Analytics рекомендуется использовать отдельную группу получателей. Если группа потребителей не указана, задание Stream Analytics использует группу потребителей $Default
. Если задание содержит самосоединение или несколько источников входных данных, некоторые входные данные могут позднее считываться несколькими читателями. Эта ситуация влияет на количество читателей в группе потребителей.
В качестве режима аутентификации выберите вариант Строка подключения. Проще протестировать учебник с помощью этого параметра.
Для имени политики концентратора событий выберите "Использовать существующую", а затем выберите созданную ранее политику.
В нижней части страницы нажмите кнопку Сохранить.
Создайте кэш в Кэш Azure для Redis, выполнив действия, описанные в разделе "Создание экземпляра Кэш Azure для Redis".
После создания кэша в разделе Параметры выберите Ключи доступа. Запишите основную строку подключения.
Сведения см. в разделе Создание приложения-функции документации по службе "Функции". Этот пример был создан с помощью:
Создайте приложение-функцию HttpTrigger по умолчанию в Visual Studio Code, следуя этому руководству. Используются следующие сведения: язык: среда выполнения .NET 6
: C#
(под функцией версии 4), шаблон: HTTP trigger
Установите клиентскую библиотеку Redis, выполнив следующую команду в терминале, расположенном в папке проекта:
dotnet add package StackExchange.Redis --version 2.2.88
Добавьте элементы RedisConnectionString
и RedisDatabaseIndex
в раздел Values
файла local.settings.json
, заполнив строку подключения целевого сервера:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"RedisConnectionString": "Your Redis Connection String",
"RedisDatabaseIndex":"0"
}
}
Индекс базы данных Redis — это число от 0 до 15, с помощью которого можно идентифицировать базу данных в экземпляре.
Замените функцию целиком (CS-файл в проекте) на следующий фрагмент кода. Замените пространство имен, имя класса и имя функции на собственные:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using StackExchange.Redis;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
string RedisConnectionString = Environment.GetEnvironmentVariable("RedisConnectionString");
int RedisDatabaseIndex = int.Parse(Environment.GetEnvironmentVariable("RedisDatabaseIndex"));
using (var connection = ConnectionMultiplexer.Connect(RedisConnectionString))
{
// Connection refers to a property that returns a ConnectionMultiplexer
IDatabase db = connection.GetDatabase(RedisDatabaseIndex);
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
string key = data[i].Time + " - " + data[i].CallingNum1;
db.StringSet(key, data[i].ToString());
log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}");
// Simple get of data types from the cache
string value = db.StringGet(key);
log.LogInformation($"Database got: {key} => {value}");
}
}
return new OkResult(); // 200
}
}
}
Когда Stream Analytics получает исключение "Сущность запроса HTTP слишком большая" из функции, размер пакетов, отправляемых в службу "Функции", уменьшается. Следующий код гарантирует, что Stream Analytics не отправляет пакеты слишком большого размера. Убедитесь, что максимальное количество пакетов и размеры значений, используемые в функции, соответствуют значениям, введенным на портале Stream Analytics.
Теперь функцию можно опубликовать в Azure.
Откройте функцию в портал Azure и задайте параметры приложения для RedisConnectionString
иRedisDatabaseIndex
.
Откройте задание Stream Analytics на портале Azure.
Перейдите к функции и выберите Обзор>Выходные данные>Добавить. Чтобы добавить новые выходные данные, выберите Функция Azure в качестве приемника. Новый выходной адаптер службы "Функции" поддерживает следующие свойства:
Имя свойства | Description |
---|---|
Псевдоним выходных данных | Понятное имя, используемое в запросах задания для ссылки на эти выходные данные. |
Вариант импорта | Вы можете использовать эту функцию из текущей подписки или указать параметры вручную, если функция находится в другой подписке. |
Приложение-функция | Имя приложения службы "Функции". |
Function | Имя функции в приложении службы "Функции" (имя функции run.csx). |
Максимальный размер пакета | Задает максимальный размер каждого пакета выходных данных в байтах, который передается в функцию. По умолчанию установлено значение 262 144 байт (256 КБ). |
Максимальное количество пакетов | Указывает максимальное число событий в каждом пакете, который отправляется в функцию. Значение по умолчанию — 100. Это необязательное свойство. |
Ключ | Позволяет использовать функцию из другой подписки. Укажите значение ключа для доступа к функции. Это необязательное свойство. |
Введите имя для выходного псевдонима. В этом руководстве оно называется saop1, но вы можете использовать любое имя вашего выбора. Заполните другие сведения.
Откройте задание Stream Analytics и обновите запрос следующим образом.
Важно!
В следующем примере скрипта предполагается, что для входного имени и saop1 используется callStream для имени вывода. Если вы использовали разные имена, не забудьте обновить запрос.
SELECT
System.Timestamp as Time, CS1.CallingIMSI, CS1.CallingNum as CallingNum1,
CS2.CallingNum as CallingNum2, CS1.SwitchNum as Switch1, CS2.SwitchNum as Switch2
INTO saop1
FROM CallStream CS1 TIMESTAMP BY CallRecTime
JOIN CallStream CS2 TIMESTAMP BY CallRecTime
ON CS1.CallingIMSI = CS2.CallingIMSI AND DATEDIFF(ss, CS1, CS2) BETWEEN 1 AND 5
WHERE CS1.SwitchNum != CS2.SwitchNum
Запустите приложение telcodatagen.exe, выполнив следующую команду в командной строке. Команда использует следующий формат: telcodatagen.exe [#NumCDRsPerHour] [SIM Card Fraud Probability] [#DurationHours]
.
telcodatagen.exe 1000 0.2 2
Запустите задание Stream Analytics.
На странице "Монитор" для функции Azure вы увидите, что эта функция вызывается.
На странице Кэш Azure для Redis кэш выберите метрики в меню слева, добавьте метрику записи кэша и задайте длительность последнего часа. Вы увидите диаграмму, аналогичную следующему изображению.
Сначала получите ключ для записи, вставленной в Кэш Azure для Redis. В коде ключ вычисляется в функции Azure, как показано в следующем фрагменте кода:
string key = data[i].Time + " - " + data[i].CallingNum1;
db.StringSet(key, data[i].ToString());
log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}");
Перейдите к портал Azure и найдите приложение Функции Azure.
Выберите Функции в меню слева.
Выберите HTTPTrigger1 из списка функций.
Выберите "Монитор" в меню слева.
Перейдите на вкладку "Журналы ".
Запишите ключ из информационного сообщения, как показано на следующем снимке экрана. Этот ключ используется для поиска значения в Кэш Azure для Redis.
Перейдите на портал Azure и найдите кэш Azure для Redis. Выберите Консоль.
Используйте команды кэша Azure для Redis, чтобы проверить наличие данных в кэше Azure для Redis. (Команда принимает формат Get {key}.) Используйте ключ, скопированный из журналов Монитора для функции Azure (в предыдущем разделе).
Получение "KEY-FROM-THE-PREVIOUS-SECTION"
Эта команда должна вывести значение для указанного ключа:
Если при отправке событий в Функции Azure происходит сбой, Stream Analytics повторяет большинство операций. Все исключения http извлекаются до успешного выполнения, за исключением ошибки HTTP 413 (слишком большая сущность). Ошибка "Сущность слишком большая" рассматривается как ошибка данных, к которой применяется политика повтора или удаления.
Примечание
Время ожидания для запросов HTTP от Stream Analytics к Функциям Azure устанавливается на 100 секунд. Если приложение Функции Azure занимает более 100 секунд, чтобы обработать пакет, Stream Analytics ошибится и повторится для пакета.
Повторная попытка времени ожидания может привести к повторяющимся событиям, записанным в приемник выходных данных. Когда Stream Analytics пытается повторно обработать пакет с ошибкой, он повторяет попытки для всех событий в этом пакете. Рассмотрим, например, пакет из 20 событий, которые отправляются в Функции Azure из Stream Analytics. Предположим, что приложению "Функции Azure" требуется 100 секунд для обработки первых 10 событий в этом пакете. Через 100 секунд Stream Analytics приостанавливает запрос, так как он не получил положительный ответ от Функции Azure, а другой запрос отправляется для того же пакета. Первые 10 событий в пакете обрабатываются приложением "Функции Azure" повторно, что приводит к дублированию.
На портале Azure при попытке сброса значения максимального размера пакета или максимального числа пакетов до нуля (значения по умолчанию) после сохранения значение возвращается к ранее введенному. В этом случае введите значения по умолчанию для этих полей вручную.
Служба Stream Analytics в настоящее время не поддерживает использование маршрутизации HTTP в Функциях Azure.
Поддержка подключения к Функции Azure, размещенной в виртуальной сети, не включена.
Ставшие ненужными группу ресурсов, задание потоковой передачи и все связанные ресурсы можно удалить. При удалении задания будет прекращена тарификация за единицы потоковой передачи, потребляемые заданием. Если вы планируете использовать это задание в будущем, вы можете остановить и перезапустить его позже. Если вы не собираетесь использовать это задание дальше, удалите все ресурсы, созданные в ходе работы с этим руководством, сделав следующее:
В этом руководстве вы создали простое задание Stream Analytics, которое запускает функцию Azure. Чтобы узнать больше о заданиях Stream Analytics, перейдите к следующему руководству:
События
31 мар., 23 - 2 апр., 23
Самое большое событие обучения Fabric, Power BI и SQL. 31 марта – 2 апреля. Используйте код FABINSIDER, чтобы сэкономить $400.
Зарегистрироваться сегодняОбучение
Модуль
Создание решения Java на основе событий в режиме реального времени в Azure - Training
Отправка данных телеметрики на основе событий в режиме реального времени в Azure Cosmos DB с помощью функций Azure и концентратора событий.
Сертификация
Создавайте комплексные решения в Microsoft Azure для создания Функции Azure, реализации веб-приложений и управления ими, разработке решений, использующих службу хранилища Azure, и т. д.
Документация
Вывод данных из Azure Stream Analytics - Azure Stream Analytics
В этой статье рассматриваются различные варианты вывода данных Azure Stream Analytics.
Вывод данных из Azure Stream Analytics в Функции Azure - Azure Stream Analytics
В этой статье описывается, как использовать Функции Azure для вывода данных Azure Stream Analytics.
Потоковые данные в качестве входных данных Azure Stream Analytics - Azure Stream Analytics
Узнайте больше о настройке подключения данных в Azure Stream Analytics. Входные данные включают поток данных из событий, а также ссылочные данные.