Uso de Event Hubs y .NET para enviar y recibir mensajes de temas de Atlas Kafka
Nota
El Catálogo de datos de Microsoft Purview está cambiando su nombre a Catálogo unificado de Microsoft Purview. Todas las características permanecerán iguales. Verá el cambio de nombre cuando la nueva experiencia de gobernanza de datos de Microsoft Purview esté disponible con carácter general en su región. Compruebe el nombre en su región.
En este inicio rápido se explica cómo enviar y recibir eventos de temas de Atlas Kafka . Usaremos Azure Event Hubs y la biblioteca .NET de Azure.Messaging.EventHubs.
Si no está familiarizado con Event Hubs, consulte Información general de Event Hubs antes de completar este inicio rápido.
Para seguir este inicio rápido, necesita ciertos requisitos previos:
- Una suscripción de Microsoft Azure. Para usar los servicios de Azure, incluidos Event Hubs, necesita una suscripción de Azure. Si no tiene una cuenta de Azure, puede registrarse para obtener una evaluación gratuita o usar las ventajas del suscriptor de MSDN al crear una cuenta.
-
Microsoft Visual Studio 2022. La biblioteca cliente de Event Hubs usa nuevas características que se introdujeron en C# 8.0. Todavía puede usar la biblioteca con versiones anteriores de C#, pero la nueva sintaxis no estará disponible. Para usar la sintaxis completa, se recomienda compilar con el SDK de .NET Core 3.0 o superior y la versión del lenguaje establecida en
latest
. Si usa una versión de Visual Studio anterior a Visual Studio 2019, no tiene las herramientas necesarias para compilar proyectos de C# 8.0. Visual Studio 2022, incluida la edición gratuita Community, se puede descargar aquí. - Una cuenta de Microsoft Purview activa.
-
Una instancia de Event Hubs configurada con la cuenta de Microsoft Purview para enviar y recibir mensajes:
- Es posible que la cuenta ya esté configurada. Puede comprobar la cuenta de Microsoft Purview en el Azure Portal en Configuración, Configuración de Kafka. Si aún no está configurado, siga esta guía.
Vamos a crear una aplicación de consola de .NET Core que envíe eventos a Microsoft Purview a través del tema de Kafka de Event Hubs, ATLAS_HOOK.
Para publicar mensajes en Microsoft Purview, necesitará una instancia administrada de Event Hubs o al menos una instancia de Event Hubs con una configuración de enlace.
A continuación, cree una aplicación de consola .NET de C# en Visual Studio:
- Inicie Visual Studio.
- En la ventana Inicio, seleccione Crear un nuevo proyecto>Aplicación de consola (.NET Framework). Se requiere la versión 4.5.2 o posterior de .NET.
- En Nombre del proyecto, escriba PurviewKafkaProducer.
- Seleccione Crear para crear el proyecto.
- Inicie Visual Studio 2022.
- Seleccione Crear un nuevo proyecto.
- En el cuadro de diálogo Crear un nuevo proyecto , siga estos pasos: Si no ve este cuadro de diálogo, seleccione Archivo en el menú, seleccione Nuevo y, a continuación, seleccione Proyecto.
- Seleccione C# para el lenguaje de programación.
- Seleccione Consola para el tipo de aplicación.
- Seleccione Aplicación de consola (.NET Core) en la lista de resultados.
- Después, seleccione Siguiente.
Seleccione Herramientas>NuGet Package ManagerConsole (Consola del Administrador > de paquetes de NuGet) en el menú.
Ejecute el siguiente comando para instalar el paquete NuGet Azure.Messaging.EventHubs y el paquete NuGet Azure.Messaging.EventHubs.Producer :
Install-Package Azure.Messaging.EventHubs
Install-Package Azure.Messaging.EventHubs.Producer
Agregue las siguientes
using
instrucciones a la parte superior del archivo Program.cs :using System; using System.Text; using System.Threading.Tasks; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Producer;
Agregue constantes a la
Program
clase para el cadena de conexión de Event Hubs y el nombre de Event Hubs.private const string connectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>"; private const string eventHubName = "<EVENT HUB NAME>";
Reemplace el
Main
método por el método siguienteasync Main
y agregue unasync ProduceMessage
para insertar mensajes en Microsoft Purview. Vea los comentarios en el código para obtener más información.static async Task Main() { // Read from the default consumer group: $Default string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName; / Create an event producer client to add events in the event hub EventHubProducerClient producer = new EventHubProducerClient(ehubNamespaceConnectionString, eventHubName); await ProduceMessage(producer); } static async Task ProduceMessage(EventHubProducerClient producer) { // Create a batch of events using EventDataBatch eventBatch = await producerClient.CreateBatchAsync(); // Add events to the batch. An event is a represented by a collection of bytes and metadata. eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<First event>"))); eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Second event>"))); eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Third event>"))); // Use the producer client to send the batch of events to the event hub await producerClient.SendAsync(eventBatch); Console.WriteLine("A batch of 3 events has been published."); }
Cree el proyecto. Asegúrese de que no haya errores.
Ejecute el programa y espere el mensaje de confirmación.
Nota
Para ver el código fuente completo con comentarios más informativos, consulte este archivo en GitHub.
{
"msgCreatedBy":"nayenama",
"message":{
"type":"ENTITY_CREATE_V2",
"user":"admin",
"entities":{
"entities":[
{
"typeName":"azure_sql_table",
"attributes":{
"owner":"admin",
"temporary":false,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
"name":"SalesOrderTable",
"description":"Sales Order Table added via Kafka"
},
"relationshipAttributes":{
"columns":[
{
"guid":"-1102395743156037",
"typeName":"azure_sql_column",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID"
}
},
{
"guid":"-1102395743156038",
"typeName":"azure_sql_column",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate"
}
}
]
},
"guid":"-1102395743156036",
"version":0
}
],
"referredEntities":{
"-1102395743156037":{
"typeName":"azure_sql_column",
"attributes":{
"owner":null,
"userTypeId":61,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID",
"precision":23,
"length":8,
"description":"Sales Order ID",
"scale":3,
"name":"OrderID",
"data_type":"int"
},
"relationshipAttributes":{
"table":{
"guid":"-1102395743156036",
"typeName":"azure_sql_table",
"entityStatus":"ACTIVE",
"displayText":"SalesOrderTable",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
}
}
},
"guid":"-1102395743156037",
"version":2
},
"-1102395743156038":{
"typeName":"azure_sql_column",
"attributes":{
"owner":null,
"userTypeId":61,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate",
"description":"Sales Order Date",
"scale":3,
"name":"OrderDate",
"data_type":"datetime"
},
"relationshipAttributes":{
"table":{
"guid":"-1102395743156036",
"typeName":"azure_sql_table",
"entityStatus":"ACTIVE",
"displayText":"SalesOrderTable",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
}
}
},
"guid":"-1102395743156038",
"status":"ACTIVE",
"createdBy":"ServiceAdmin",
"version":0
}
}
}
},
"version":{
"version":"1.0.0"
},
"msgCompressionKind":"NONE",
"msgSplitIdx":1,
"msgSplitCount":1
}
A continuación, aprenderá a escribir una aplicación de consola de .NET Core que recibe mensajes de event hubs mediante un procesador de eventos. El procesador de eventos administra puntos de control persistentes y recepciones paralelas de event hubs. Esto simplifica el proceso de recepción de eventos. Debe usar el centro de eventos ATLAS_ENTITIES para recibir mensajes de Microsoft Purview.
Para recibir mensajes de Microsoft Purview, necesitará una configuración de notificación de Event Hubs administrada o una instancia de Event Hubs.
Advertencia
El SDK de Event Hubs usa la versión más reciente de storage API disponible. Es posible que esa versión no esté necesariamente disponible en la plataforma de Stack Hub. Si ejecuta este código en Azure Stack Hub, experimentará errores en tiempo de ejecución a menos que tenga como destino la versión específica que usa. Si usa Azure Blob Storage como almacén de puntos de control, revise la versión admitida de la API de Azure Storage para la compilación de Azure Stack Hub y, en el código, seleccione esa versión.
La versión más alta disponible del servicio storage es la versión 2019-02-02. De forma predeterminada, la biblioteca cliente del SDK de Event Hubs usa la versión más alta disponible en Azure (2019-07-07 en el momento de la versión del SDK). Si usa la versión 2005 de Azure Stack Hub, además de seguir los pasos de esta sección, también tendrá que agregar código que tenga como destino la versión 2019-02-02 de la API del servicio storage. Para obtener información sobre cómo dirigirse a una versión específica de la API de Storage, consulte este ejemplo en GitHub.
Usaremos Azure Storage como almacén de puntos de control. Siga estos pasos para crear una cuenta de Azure Storage.
Obtener el cadena de conexión de la cuenta de almacenamiento
Anote el cadena de conexión y el nombre del contenedor. Los usará en el código de recepción.
- En la ventana Explorador de soluciones, seleccione y mantenga presionada (o haga clic con el botón derecho) la solución EventHubQuickStart, seleccione Agregar y seleccione Nuevo proyecto.
- Seleccione Aplicación de consola (.NET Core) y seleccione Siguiente.
- Escriba PurviewKafkaConsumer en Nombre del proyecto y seleccione Crear.
Seleccione Herramientas>NuGet Package ManagerConsole (Consola del Administrador > de paquetes de NuGet) en el menú.
Ejecute el siguiente comando para instalar el paquete NuGet Azure.Messaging.EventHubs :
Install-Package Azure.Messaging.EventHubs
Ejecute el siguiente comando para instalar el paquete NuGet Azure.Messaging.EventHubs.Processor :
Install-Package Azure.Messaging.EventHubs.Processor
Agregue las siguientes
using
instrucciones en la parte superior del archivo Program.cs .using System; using System.Text; using System.Threading.Tasks; using Azure.Storage.Blobs; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Processor;
Agregue constantes a la
Program
clase para el cadena de conexión de Event Hubs y el nombre del centro de eventos. Reemplace los marcadores de posición entre corchetes por los valores reales que obtuvo al crear el centro de eventos y la cuenta de almacenamiento (claves de acceso: cadena de conexión principal). Asegúrese de que es el{Event Hubs namespace connection string}
cadena de conexión de nivel de espacio de nombres y no la cadena del centro de eventos.private const string ehubNamespaceConnectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>"; private const string eventHubName = "<EVENT HUB NAME>"; private const string blobStorageConnectionString = "<AZURE STORAGE CONNECTION STRING>"; private const string blobContainerName = "<BLOB CONTAINER NAME>";
Use ATLAS_ENTITIES como nombre del centro de eventos al enviar mensajes a Microsoft Purview.
Reemplace el
Main
método por el método siguienteasync Main
. Vea los comentarios en el código para obtener más información.static async Task Main() { // Read from the default consumer group: $Default string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName; // Create a blob container client that the event processor will use BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName); // Create an event processor client to process events in the event hub EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName); // Register handlers for processing events and handling errors processor.ProcessEventAsync += ProcessEventHandler; processor.ProcessErrorAsync += ProcessErrorHandler; // Start the processing await processor.StartProcessingAsync(); // Wait for 10 seconds for the events to be processed await Task.Delay(TimeSpan.FromSeconds(10)); // Stop the processing await processor.StopProcessingAsync(); }
Ahora agregue los siguientes métodos de controlador de errores y eventos a la clase .
static async Task ProcessEventHandler(ProcessEventArgs eventArgs) { // Write the body of the event to the console window Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray())); // Update checkpoint in the blob storage so that the app receives only new events the next time it's run await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken); } static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs) { // Write details about the error to the console window Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen."); Console.WriteLine(eventArgs.Exception.Message); return Task.CompletedTask; }
Cree el proyecto. Asegúrese de que no haya errores.
Nota
Para ver el código fuente completo con comentarios más informativos, consulte este archivo en GitHub.
Ejecute la aplicación receptora.
{
"version":
{"version":"1.0.0",
"versionParts":[1]
},
"msgCompressionKind":"NONE",
"msgSplitIdx":1,
"msgSplitCount":1,
"msgSourceIP":"10.244.155.5",
"msgCreatedBy":
"",
"msgCreationTime":1618588940869,
"message":{
"type":"ENTITY_NOTIFICATION_V2",
"entity":{
"typeName":"azure_sql_table",
"attributes":{
"owner":"admin",
"createTime":0,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
"name":"SalesOrderTable",
"description":"Sales Order Table"
},
"guid":"ead5abc7-00a4-4d81-8432-d5f6f6f60000",
"status":"ACTIVE",
"displayText":"SalesOrderTable"
},
"operationType":"ENTITY_UPDATE",
"eventTime":1618588940567
}
}
Consulte más ejemplos en GitHub.