клиентская библиотека обработчика событий Центры событий Azure для .NET версии 5.8.1

Центры событий Azure — это высокомасштабируемая служба публикации и подписки, которая может принимать миллионы событий в секунду и передавать их нескольким потребителям. Это позволяет обрабатывать и анализировать большие объемы данных, создаваемых подключенными устройствами и приложениями. После сбора данных Центры событий можно получать, преобразовывать и хранить их с помощью любого поставщика аналитики в режиме реального времени или адаптеров пакетной обработки и хранения. Если вы хотите узнать больше о Центры событий Azure, ознакомьтесь с разделом Что такое Центры событий.

Клиентская библиотека обработчика событий является компаньоном клиентской библиотеки Центры событий Azure, предоставляя автономный клиент для использования событий надежным, устойчивым и масштабируемым способом, который подходит для большинства рабочих сценариев. Для реализации с учетом мнений, созданной с использованием больших двоичных объектов службы хранилища Azure, обработчик событий рекомендуется для:

  • Чтение и обработка событий во всех разделах Концентратора событий в большом масштабе с устойчивостью к временным сбоям и периодическим сетевым проблемам.

  • Совместная обработка событий, когда несколько процессоров динамически распределяют и разделяют ответственность в контексте группы потребителей, корректно управляя нагрузкой по мере добавления и удаления процессоров из группы.

  • Управление контрольными точками и состоянием для устойчивой обработки с помощью больших двоичных объектов службы хранилища Azure в качестве базового хранилища данных.

Исходный код | Пакет (NuGet) | Справочная документация по | API Документация по продукту | Руководство по устранению неполадок

Начало работы

Предварительные требования

  • Подписка Azure: Чтобы использовать службы Azure, включая Центры событий Azure, вам потребуется подписка. Если у вас нет учетной записи Azure, вы можете зарегистрироваться для получения бесплатной пробной версии или использовать преимущества подписки Visual Studio при создании учетной записи.

  • Пространство имен Центров событий с концентратором событий: Для взаимодействия с Центры событий Azure также необходимо иметь пространство имен и концентратор событий. Если вы не знакомы с созданием ресурсов Azure, вы можете воспользоваться пошаговым руководством по созданию концентратора событий с помощью портал Azure. Здесь также можно найти подробные инструкции по использованию Azure CLI, Azure PowerShell или шаблонов Azure Resource Manager (ARM) для создания концентратора событий.

  • Учетная запись хранения Azure с хранилищем BLOB-объектов: Чтобы сохранить контрольные точки и управлять владением в службе хранилища Azure, вам потребуется учетная запись хранения Azure с доступными BLOB-объектами. Если вы не знакомы с учетными записями хранения Azure, вам может потребоваться выполнить пошаговые инструкции по созданию учетной записи хранения с помощью портал Azure. Здесь также можно найти подробные инструкции по использованию Azure CLI, Azure PowerShell или шаблонов Azure Resource Manager (ARM) для создания учетных записей хранения.

  • Контейнер больших двоичных объектов службы хранилища Azure: Данные контрольных точек и владения в службе хранилища Azure будут записываться в большие двоичные объекты в определенном контейнере. Для EventProcessorClient требуется существующий контейнер и он неявно создается для защиты от случайной неправильной настройки. Если вы не знакомы с контейнерами службы хранилища Azure, обратитесь к документации по управлению контейнерами. Здесь вы найдете подробные инструкции по использованию .NET, Azure CLI или Azure PowerShell для создания контейнера.

  • C# 8.0: Клиентская библиотека Центры событий Azure использует новые функции, появившиеся в C# 8.0. Чтобы воспользоваться преимуществами синтаксиса C# 8.0, рекомендуется компилировать с помощью пакета SDK для .NET Core версии 3.0 или более поздней версии с языковой версиейlatest.

    Пользователям Visual Studio, желающим использовать все преимущества синтаксиса C# 8.0, потребуется использовать Visual Studio 2019 или более поздней версии. Скачать Visual Studio 2019, в том числе бесплатный выпуск Community, можно здесь. Пользователи Visual Studio 2017 могут воспользоваться преимуществами синтаксиса C# 8, используя пакет NuGet Microsoft.Net.Compilers и задав версию языка, хотя процесс редактирования может быть не идеальным.

    Вы по-прежнему можете использовать библиотеку с предыдущими версиями языка C#, но вам придется управлять асинхронными перечисляемыми и асинхронными одноразовыми элементами вручную, а не использовать преимущества нового синтаксиса. Вы по-прежнему можете использовать любую версию платформы, поддерживаемую пакетом SDK для .NET Core, включая более ранние версии .NET Core или .NET Framework. Дополнительные сведения см. в статье Определение целевых платформ.

    Важное примечание. Для сборки или выполнения примеров и примеров без изменений необходимо использовать C# 11.0. Вы по-прежнему можете запускать примеры, если решите настроить их для других языковых версий.

Чтобы быстро создать необходимые ресурсы в Azure и получить для них строки подключения, можно развернуть наш пример шаблона, щелкнув:

Развертывание в Azure

Установка пакета

Установите клиентную библиотеку обработчика событий Центры событий Azure для .NET с помощью NuGet:

dotnet add package Azure.Messaging.EventHubs.Processor

Аутентификация клиента

Получение строки подключения к Центрам событий

Чтобы клиентская библиотека Центров событий взаимодействовала с концентратором событий, ей необходимо понимать, как подключиться и авторизовать его. Самый простой способ сделать это — использовать строку подключения, которая создается автоматически при создании пространства имен Центров событий. Если вы не знакомы с использованием строк подключения к Центрам событий, вы можете воспользоваться пошаговым руководством, чтобы получить строку подключения к Центрам событий.

Получение строки подключения к службе хранилища Azure

Чтобы клиент обработчика событий использовал blob-объекты службы хранилища Azure для создания контрольных точек, ему необходимо понимать, как подключиться к учетной записи хранения и авторизоваться с помощью нее. Самый простой способ сделать это — использовать строку подключения, которая создается во время создания учетной записи хранения. Если вы не знакомы с авторизацией строки подключения учетной записи хранения в Azure, вы можете выполнить пошаговые инструкции по настройке строк подключения к службе хранилища Azure.

Основные понятия

  • Обработчик событий — это конструкция, предназначенная для управления обязанностями, связанными с подключением к данному концентратору событий и обработкой событий из каждой из его секций в контексте определенной группы потребителей. Процесс обработки событий, считываемых из секции, и обработка всех возникающих ошибок делегируется обработчиком событий предоставленному вами коду, что позволяет вашей логике сосредоточиться на предоставлении бизнес-ценности, в то время как процессор обрабатывает задачи, связанные с чтением событий, управление секциями и позволяет сохранять состояние в виде контрольных точек.

  • Контрольные точки — это процесс, с помощью которого читатели помечают и сохраняют свою позицию для событий, обработанных для секции. Контрольные точки являются ответственностью потребителя и выполняются для каждой секции, как правило, в контексте конкретной группы потребителей. EventProcessorClientДля это означает, что для сочетания групп потребителей и секций процессор должен отслеживать свое текущее положение в потоке событий. Дополнительные сведения см . в документации по продуктам Центров событий.

    Когда обработчик событий подключается, он начнет считывать события в контрольной точке, которая ранее была сохранена последним обработчиком этой секции в этой группе потребителей, если она существует. Так как обработчик событий считывает события в секции и действует с ним, он должен периодически создавать контрольные точки, чтобы пометить события как «завершенные» подчиненными приложениями и обеспечить устойчивость в случае сбоя обработчика событий или среды, в которой он находится. При необходимости можно повторно обработать события, которые ранее были помечены как "завершенные", указав более раннее смещение в этом процессе создания контрольных точек.

  • Секция — это упорядоченная последовательность событий, которая хранится в концентраторе событий. Секции   это средства организации данных, связанные с параллелизмом, требуемым потребителями событий. Служба "Центры событий Azure" обеспечивает потоковую передачу сообщений посредством шаблона секционированных потребителей, в котором каждый потребитель считывает только определенное подмножество (секцию) потока сообщений. По мере поступления новых событий они добавляются в конец этой последовательности. Количество секций указывается во время создания концентратора событий и не может быть изменено.

  • Группа потребителей — это представление всего концентратора событий. Группы потребителей предоставляют каждому из нескольких приложений возможность иметь отдельное представление потока событий, а также считывать поток независимо друг от друга, в собственном темпе и из собственного положения. В одной группе потребителей может быть не более 5 одновременных читателей. Однако рекомендуется только один активный потребитель для каждой пары из секции и группы потребителей. Каждый активный читатель получает все события из своего раздела. Если в одном разделе несколько читателей, они получат дублирующиеся события.

Дополнительные понятия и более подробное обсуждение см. в разделе Функции Центров событий.

Время существования клиента

класс EventProcessorClient можно безопасно кэшировать и использовать в качестве отдельного экземпляра в течение всего времени существования приложения, что рекомендуется при регулярном чтении событий. Клиенты отвечают за эффективное управление сетью, ЦП и использованием памяти, работая над тем, чтобы обеспечить низкий уровень использования в периоды бездействия. Вызов StopProcessingAsync или StopProcessing на процессоре необходим, чтобы обеспечить правильную очистку сетевых ресурсов и других неуправляемых объектов.

Потокобезопасность

Мы гарантируем, что все методы экземпляра клиента являются потокобезопасны и независимы друг от друга (руководство). Это гарантирует, что рекомендации по повторному использованию экземпляров клиента всегда будут безопасными, даже в разных потоках.

Типы моделей данных, такие как EventData и EventDataBatch , не являются потокобезопасны. Они не должны совместно использоваться в потоках и использоваться одновременно с клиентскими методами.

Дополнительные понятия

Параметры | клиента Обработчики событий | Обработка сбоев | Диагностики | Макетирование (процессор) | Макетирование (типы клиентов)

Примеры

Создание клиента обработчика событий

Поскольку EventProcessorClient имеет зависимость от BLOB-объектов службы хранилища Azure для сохранения состояния, необходимо предоставить BlobContainerClient для процессора, который был настроен для учетной записи хранения, и контейнера, который следует использовать. Контейнер, используемый для настройки , EventProcessorClient должен существовать.

EventProcessorClient Так как не может узнать о намерении указать несуществующий контейнер, он не будет неявно создавать контейнер. Это служит защитой от неправильно настроенного контейнера, из-за чего обработчик-злоумышленник не может совместно использовать права владения и мешать другим обработчикам в группе потребителей.

// The container specified when creating the BlobContainerClient must exist; it will
// not be implicitly created.

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

Настройка обработчиков событий и ошибок

Чтобы использовать EventProcessorClient, необходимо предоставить обработчики для обработки событий и ошибок. Эти обработчики считаются автономными, и разработчики несут ответственность за обеспечение учета исключений в коде обработчика.

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

async Task processEventHandler(ProcessEventArgs eventArgs)
{
    try
    {
        // Perform the application-specific processing for an event.  This method
        // is intended for illustration and is not defined in this snippet.

        await DoSomethingWithTheEvent(eventArgs.Partition, eventArgs.Data);
    }
    catch
    {
        // Handle the exception from handler code
    }
}

async Task processErrorHandler(ProcessErrorEventArgs eventArgs)
{
    try
    {
        // Perform the application-specific processing for an error.  This method
        // is intended for illustration and is not defined in this snippet.

        await DoSomethingWithTheError(eventArgs.Exception);
    }
    catch
    {
        // Handle the exception from handler code
    }
}

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

Запуск и остановка обработки

Будет EventProcessorClient выполнять обработку в фоновом режиме после явного запуска и продолжать делать это до явной остановки. Хотя это позволяет коду приложения выполнять другие задачи, он также возлагает ответственность за то, чтобы процесс не завершалось во время обработки, если другие задачи не выполняются.

var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

await processor.StartProcessingAsync();

try
{
    // The processor performs its work in the background; block until cancellation
    // to allow processing to take place.

    await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
    // This is expected when the delay is canceled.
}

try
{
    await processor.StopProcessingAsync();
}
finally
{
    // To prevent leaks, the handlers should be removed when processing is complete.

    processor.ProcessEventAsync -= processEventHandler;
    processor.ProcessErrorAsync -= processErrorHandler;
}

Использование субъекта Active Directory с клиентом обработчика событий

Библиотека удостоверений Azure предоставляет поддержку проверки подлинности Azure Active Directory, которую можно использовать для клиентских библиотек Azure, включая Центры событий и службу хранилища Azure.

Чтобы использовать субъект Active Directory, при создании клиента Центров событий указывается один из доступных учетных данных из Azure.Identity библиотеки. Кроме того, полное пространство имен Центров событий и имя нужного концентратора событий предоставляются вместо строки подключения Центров событий.

Чтобы использовать субъект Active Directory с контейнерами BLOB-объектов службы хранилища Azure, при создании клиента хранилища необходимо указать полный URL-адрес контейнера. Сведения о допустимых форматах URI для доступа к хранилищу BLOB-объектов можно найти в статье Именование контейнеров, больших двоичных объектов и метаданных.

var credential = new DefaultAzureCredential();
var blobStorageUrl ="<< FULLY-QUALIFIED CONTAINER URL (like https://myaccount.blob.core.windows.net/mycontainer) >>";

var fullyQualifiedNamespace = "<< FULLY-QUALIFIED EVENT HUBS NAMESPACE (like something.servicebus.windows.net) >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

var storageClient = new BlobContainerClient(new Uri(blobStorageUrl), credential);

var processor = new EventProcessorClient
(
    storageClient,
    consumerGroup,
    fullyQualifiedNamespace,
    eventHubName,
    credential
);

При использовании Azure Active Directory с Центрами событий субъекту должна быть назначена роль, которая позволяет выполнять чтение из Центров событий, например Azure Event Hubs Data Receiver роль. Дополнительные сведения об использовании авторизации Azure Active Directory с Центрами событий см. в соответствующей документации.

При использовании Azure Active Directory со службой хранилища Azure субъекту должна быть назначена роль, которая разрешает доступ на чтение, запись и удаление больших двоичных объектов, например Storage Blob Data Contributor роль . Дополнительные сведения об использовании авторизации Active Directory со службой хранилища Azure см. в соответствующей документации и в примере авторизации службы хранилища Azure.

Устранение неполадок

Подробные сведения об устранении неполадок см. в руководстве по устранению неполадок Центров событий.

Обработка исключений

Исключения клиента обработчика событий

Клиент обработчика событий делает все попытки быть устойчивыми перед лицом исключений и будет предпринимать необходимые действия для продолжения обработки, если это невозможно. Для этого не требуется никаких действий от разработчиков ; она изначально является частью поведения процессора.

Чтобы разработчики могли проверять исключения, возникающие в клиентских операциях обработчика событий, и реагировать на них, они отображаются через ProcessError событие . Аргументы для этого события содержат сведения об исключении и контексте, в котором оно наблюдалось. Разработчики могут выполнять обычные операции с клиентом обработчика событий из этого обработчика событий, такие как остановка и (или) перезапуск в ответ на ошибки, но в противном случае могут не влиять на поведение обработчика исключений.

Базовый пример реализации обработчика ошибок см. в примере обработчика событий.

Исключения в обработчиках событий

Так как клиент обработчика событий не имеет соответствующего контекста для понимания серьезности исключений в обработчиках событий, предоставляемых разработчиками, он не может предположить, какие действия будут разумным ответом на них. В результате разработчики считаются ответственными за исключения, возникающие в обработчиках событий, которые они предоставляют с помощью try/catch блоков и других стандартных языковых конструкций.

Клиент обработчика событий не будет пытаться обнаруживать исключения в коде разработчика и не отображать их явным образом. Результирующее поведение будет зависеть от среды размещения процессора и контекста, в котором был вызван обработчик событий. Так как это может отличаться в разных сценариях, настоятельно рекомендуется, чтобы разработчики кодили свои обработчики событий с защитой и учитывали потенциальные исключения.

Ведение журнала и диагностика

Клиентская библиотека обработчика событий полностью инструментирована для ведения журнала сведений на различных уровнях детализации с помощью .NET EventSource для выдачи информации. Ведение журнала выполняется для каждой операции и соответствует шаблону маркировки начальной точки операции, ее завершения и всех возникающих исключений. Дополнительные сведения, которые могут предоставлять аналитические сведения, также регистрируются в контексте связанной операции.

Журналы клиента обработчика событий доступны для любого EventListener пользователя, выбрав источник с именем Azure-Messaging-EventHubs-Processor-EventProcessorClient или выбрав все источники с признаком AzureEventSource. Чтобы упростить запись журналов из клиентских библиотек Azure, библиотека, используемая Центрами событий, Azure.Core предлагает AzureEventSourceListener. Дополнительные сведения см. в статье Сбор журналов Центров событий с помощью AzureEventSourceListener.

Библиотека обработчика событий также инструментируется для распределенной трассировки с помощью Application Insights или OpenTelemetry. Дополнительные сведения см. в примере диагностики Azure.Core.

Дальнейшие действия

Помимо рассмотренных сценариев, библиотека обработчика Центры событий Azure предоставляет поддержку дополнительных сценариев, помогая воспользоваться полным набором EventProcessorClientфункций . Чтобы помочь изучить некоторые из этих сценариев, клиентская библиотека обработчика Центров событий предлагает проект примеров, которые будут служить иллюстрацией для распространенных сценариев. Дополнительные сведения см. в примерах сведений .

Участие

На этом проекте приветствуются публикации и предложения. Для участия в большинстве процессов по разработке документации необходимо принять лицензионное соглашение участника (CLA), в котором указывается, что вы предоставляете нам права на использование ваших публикаций. Для получения подробных сведений посетите веб-страницу https://cla.microsoft.com.

При отправке запроса на включение внесенных изменений CLA-бот автоматически определит необходимость предоставления соглашения CLA и соответствующего оформления запроса на включение внесенных изменений (например, добавление метки, комментария). Просто следуйте инструкциям бота. Будет достаточно выполнить их один раз для всех репозиториев, поддерживающих соглашение CLA.

В рамках этого проекта действуют правила поведения в отношении продуктов с открытым исходным кодом Майкрософт. Дополнительные сведения см. в разделе часто задаваемых вопросов о правилах поведения или обратитесь к opencode@microsoft.com с любыми дополнительными вопросами или комментариями.

Дополнительные сведения см. в нашем руководстве по участию .

Просмотры