Использование Центров событий и .NET для отправки и получения сообщений в разделах Atlas Kafka
Статья
Примечание
Каталог данных Microsoft Purview меняется на Единый каталог Microsoft Purview. Все функции останутся неизменными. Вы увидите изменение имени, когда новый интерфейс управления данными Microsoft Purview станет общедоступным в вашем регионе.
Проверьте имя в регионе.
В этом кратком руководстве описано, как отправлять и получать события тем Atlas Kafka . Мы будем использовать Центры событий Azure и библиотеку Azure.Messaging.EventHubs .NET.
Чтобы выполнить инструкции из этого краткого руководства, необходимо выполнить определенные предварительные требования.
Подписка Microsoft Azure. Чтобы использовать службы Azure, включая Центры событий, требуется подписка Azure. Если у вас нет учетной записи Azure, вы можете зарегистрироваться для получения бесплатной пробной версии или использовать преимущества подписчика MSDN при создании учетной записи.
Microsoft Visual Studio 2022. В клиентской библиотеке Центров событий используются новые функции, появившиеся в C# 8.0. Вы по-прежнему можете использовать библиотеку с предыдущими версиями C#, но новый синтаксис будет недоступен. Чтобы использовать полный синтаксис, рекомендуется компилировать с пакетом SDK для .NET Core 3.0 или более поздней версии, а версия языка — latest. Если вы используете версию Visual Studio до Visual Studio 2019, в ней нет средств, необходимых для создания проектов C# 8.0. Visual Studio 2022, включая бесплатный выпуск Community, можно скачать здесь.
Возможно, ваша учетная запись уже настроена. Вы можете проверка учетную запись Microsoft Purview в портал Azure в разделе Параметры, конфигурация Kafka. Если он еще не настроен, следуйте инструкциям в этом руководстве.
Публикация сообщений в Microsoft Purview
Давайте создадим консольное приложение .NET Core, которое отправляет события в Microsoft Purview через раздел Kafka Центров событий, ATLAS_HOOK.
Далее создайте консольное приложение .NET C# в Visual Studio:
Запустите Visual Studio.
В окне Пуск выберите Создать консольное приложение проекта>(платформа .NET Framework). Требуется .NET версии 4.5.2 или более поздней.
В поле Имя проекта введите PurviewKafkaProducer.
Выберите Создать , чтобы создать проект.
Создание консольного приложения
Запустите Visual Studio 2022.
Выберите Создать новый проект.
В диалоговом окне Создание проекта выполните следующие действия: Если это диалоговое окно не отображается, выберите Файл в меню, выберите Создать, а затем — Проект.
Выберите C# для языка программирования.
Выберите Консоль для типа приложения.
Выберите Консольное приложение (.NET Core) в списке результатов.
Затем выберите Далее.
Добавление пакета NuGet Центров событий
В меню выберите Инструменты>Диспетчер пакетов>NuGet.
Выполните следующую команду, чтобы установить пакет NuGet Azure.Messaging.EventHubs и пакет NuGet Azure.Messaging.EventHubs.Producer :
Замените Main метод следующим async Main методом async ProduceMessage и добавьте для отправки сообщений в Microsoft Purview. Дополнительные сведения см. в комментариях к коду.
static async Task Main()
{
// Read from the default consumer group: $Default
string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
/ Create an event producer client to add events in the event hub
EventHubProducerClient producer = new EventHubProducerClient(ehubNamespaceConnectionString, eventHubName);
await ProduceMessage(producer);
}
static async Task ProduceMessage(EventHubProducerClient producer)
{
// Create a batch of events
using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
// Add events to the batch. An event is a represented by a collection of bytes and metadata.
eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<First event>")));
eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Second event>")));
eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Third event>")));
// Use the producer client to send the batch of events to the event hub
await producerClient.SendAsync(eventBatch);
Console.WriteLine("A batch of 3 events has been published.");
}
Выполните построение проекта. Убедитесь, что ошибок нет.
Запустите программу и дождитесь сообщения о подтверждении.
Примечание
Полный исходный код с дополнительными информационными комментариями см. в этом файле в GitHub.
Пример кода, который создает таблицу SQL с двумя столбцами с помощью сообщения JSON Create Entity
Далее узнайте, как создать консольное приложение .NET Core, которое получает сообщения из центров событий с помощью обработчика событий. Обработчик событий управляет постоянными контрольными точками и параллельными приемами из концентраторов событий. Это упрощает процесс получения событий. Для получения сообщений из Microsoft Purview необходимо использовать концентратор событий ATLAS_ENTITIES.
Пакет SDK центров событий использует последнюю доступную версию API хранилища. Эта версия может быть недоступна на платформе Stack Hub. При выполнении этого кода в Azure Stack Hub возникают ошибки среды выполнения, если только вы не используете конкретную версию. Если вы используете Хранилище BLOB-объектов Azure в качестве хранилища контрольных точек, просмотрите поддерживаемую версию API службы хранилища Azure для сборки Azure Stack Hub и в коде нацелитесь на эту версию.
Самая высокая доступная версия службы хранилища — 2019-02-02. По умолчанию клиентская библиотека SDK центров событий использует самую высокую доступную версию в Azure (2019-07-07 на момент выпуска пакета SDK). Если вы используете Azure Stack Hub версии 2005, помимо действий, описанных в этом разделе, вам также потребуется добавить код, предназначенный для API службы хранилища версии 2019-02-02.02. Сведения о том, как использовать определенную версию API хранилища, см. в этом примере в GitHub.
Создание хранилища Azure и контейнера BLOB-объектов
Мы будем использовать службу хранилища Azure в качестве хранилища контрольных точек. Чтобы создать учетную запись хранения Azure, выполните следующие действия.
Запишите строка подключения и имя контейнера. Вы будете использовать их в коде получения.
Создание проекта Visual Studio для получателя
В окне Обозреватель решений выберите и удерживайте (или щелкните правой кнопкой мыши) решение EventHubQuickStart, наведите указатель мыши на пункт Добавить и выберите Создать проект.
Выберите Консольное приложение (.NET Core) и нажмите кнопку Далее.
Введите PurviewKafkaConsumer в поле Имя проекта и нажмите кнопку Создать.
Добавление пакета NuGet Центров событий
В меню выберите Инструменты>Диспетчер пакетов>NuGet.
Выполните следующую команду, чтобы установить пакет NuGet Azure.Messaging.EventHubs :
Install-Package Azure.Messaging.EventHubs
Выполните следующую команду, чтобы установить пакет NuGet Azure.Messaging.EventHubs.Processor :
Добавьте следующие using инструкции в начало файла Program.cs .
using System;
using System.Text;
using System.Threading.Tasks;
using Azure.Storage.Blobs;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Processor;
Добавьте константы в Program класс для строка подключения Центров событий и имя концентратора событий. Замените заполнители в квадратных скобках реальными значениями, которые вы получили при создании концентратора событий и учетной записи хранения (ключи доступа — первичные строка подключения). Убедитесь, что {Event Hubs namespace connection string} является строка подключения уровня пространства имен, а не строкой концентратора событий.
Используйте ATLAS_ENTITIES в качестве имени концентратора событий при отправке сообщений в Microsoft Purview.
Замените Main метод следующим async Main методом. Дополнительные сведения см. в комментариях к коду.
static async Task Main()
{
// Read from the default consumer group: $Default
string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
// Create a blob container client that the event processor will use
BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
// Create an event processor client to process events in the event hub
EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);
// Register handlers for processing events and handling errors
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;
// Start the processing
await processor.StartProcessingAsync();
// Wait for 10 seconds for the events to be processed
await Task.Delay(TimeSpan.FromSeconds(10));
// Stop the processing
await processor.StopProcessingAsync();
}
Теперь добавьте в класс следующие методы обработчика событий и ошибок.
static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
{
// Write the body of the event to the console window
Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));
// Update checkpoint in the blob storage so that the app receives only new events the next time it's run
await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
}
static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
{
// Write details about the error to the console window
Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
Console.WriteLine(eventArgs.Exception.Message);
return Task.CompletedTask;
}
Выполните построение проекта. Убедитесь, что ошибок нет.
Узнайте, как использовать Центры событий Azure для надежной обработки потоков данных большого объема, чтобы вы могли кодировать приложения для отправки и получения сообщений через центр.
Создавайте эффективные запросы, создавайте политики индексирования, управляйте и подготавливайте ресурсы в API SQL и пакете SDK с помощью Microsoft Azure Cosmos DB.