Utilizar hubs de eventos e .NET para enviar e receber mensagens de tópicos do Atlas Kafka
Observação
O Catálogo de Dados do Microsoft Purview está a alterar o nome para Catálogo Unificado do Microsoft Purview. Todas as funcionalidades permanecerão iguais. Verá o nome mudar quando a nova experiência de Governação de Dados do Microsoft Purview estiver geralmente disponível na sua região. Verifique o nome na sua região.
Este início rápido ensina-o a enviar e receber eventos de tópicos do Atlas Kafka . Vamos utilizar o Hubs de Eventos do Azure e a biblioteca .NET Azure.Messaging.EventHubs.
Se não estiver familiarizado com os Hubs de Eventos, veja Descrição geral dos Hubs de Eventos antes de concluir este início rápido.
Para seguir este início rápido, precisa de determinados pré-requisitos:
- Uma subscrição do Microsoft Azure. Para utilizar os serviços do Azure, incluindo os Hubs de Eventos, precisa de uma subscrição do Azure. Se não tiver uma conta do Azure, pode inscrever-se numa avaliação gratuita ou utilizar os benefícios de subscritor do MSDN quando criar uma conta.
-
Microsoft Visual Studio 2022. A biblioteca de cliente dos Hubs de Eventos utiliza as novas funcionalidades que foram introduzidas no C# 8.0. Ainda pode utilizar a biblioteca com versões C# anteriores, mas a nova sintaxe não estará disponível. Para utilizar a sintaxe completa, recomenda-se que compile com o SDK .NET Core 3.0 ou superior e a versão de idioma definida como
latest
. Se estiver a utilizar uma versão do Visual Studio anterior ao Visual Studio 2019, não tem as ferramentas necessárias para criar projetos C# 8.0. O Visual Studio 2022, incluindo a edição Community gratuita, pode ser transferido aqui. - Uma conta ativa do Microsoft Purview.
-
Um Hub de Eventos configurado com a sua conta do Microsoft Purview para enviar e receber mensagens:
- A sua conta pode já estar configurada. Pode marcar sua conta do Microsoft Purview no portal do Azure em Definições, configuração do Kafka. Se ainda não estiver configurado, siga este guia.
Vamos criar uma aplicação de consola .NET Core que envia eventos para o Microsoft Purview através do tópico Kafka dos Hubs de Eventos , ATLAS_HOOK.
Para publicar mensagens no Microsoft Purview, precisará de um Hub de Eventos gerido ou de, pelo menos, um Hub de Eventos com uma configuração de hook..
Em seguida, crie uma aplicação de consola .NET C# no Visual Studio:
- Inicie o Visual Studio.
- Na janela Iniciar, selecione Criar um novo projeto>Aplicação de Consola (.NET Framework). É necessária a versão 4.5.2 ou superior do .NET.
- Em Nome do projeto, introduza PurviewKafkaProducer.
- Selecione Criar para criar o projeto.
- Inicie o Visual Studio 2022.
- Selecione Criar um novo projeto.
- Na caixa de diálogo Criar um novo projeto , siga os seguintes passos: se não vir esta caixa de diálogo, selecione Ficheiro no menu, selecione Novo e, em seguida, selecione Projeto.
- Selecione C# para a linguagem de programação.
- Selecione Consola para o tipo de aplicação.
- Selecione Aplicação de Consola (.NET Core) na lista de resultados.
- Em seguida, selecione Avançar.
Selecione Ferramentas>Consola do Gestor de Pacotes nuGet Gestor> dePacotesno menu.
Execute o seguinte comando para instalar o pacote NuGet Azure.Messaging.EventHubs e o pacote NuGet Azure.Messaging.EventHubs.Producer :
Install-Package Azure.Messaging.EventHubs
Install-Package Azure.Messaging.EventHubs.Producer
Adicione as seguintes
using
instruções à parte superior do ficheiro Program.cs :using System; using System.Text; using System.Threading.Tasks; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Producer;
Adicione constantes à
Program
classe para os Hubs de Eventos cadeia de conexão e o nome dos Hubs de Eventos.private const string connectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>"; private const string eventHubName = "<EVENT HUB NAME>";
Substitua o
Main
método pelo seguinteasync Main
método e adicione umasync ProduceMessage
para enviar mensagens para o Microsoft Purview. Veja os comentários no código para obter detalhes.static async Task Main() { // Read from the default consumer group: $Default string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName; / Create an event producer client to add events in the event hub EventHubProducerClient producer = new EventHubProducerClient(ehubNamespaceConnectionString, eventHubName); await ProduceMessage(producer); } static async Task ProduceMessage(EventHubProducerClient producer) { // Create a batch of events using EventDataBatch eventBatch = await producerClient.CreateBatchAsync(); // Add events to the batch. An event is a represented by a collection of bytes and metadata. eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<First event>"))); eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Second event>"))); eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Third event>"))); // Use the producer client to send the batch of events to the event hub await producerClient.SendAsync(eventBatch); Console.WriteLine("A batch of 3 events has been published."); }
Compile o projeto. Assegure-se de que não existem erros.
Execute o programa e aguarde pela mensagem de confirmação.
Observação
Para obter o código fonte completo com comentários mais informativos, veja este ficheiro no GitHub
Código de exemplo que cria uma tabela sql com duas colunas através de uma mensagem Criar Entidade JSON
{
"msgCreatedBy":"nayenama",
"message":{
"type":"ENTITY_CREATE_V2",
"user":"admin",
"entities":{
"entities":[
{
"typeName":"azure_sql_table",
"attributes":{
"owner":"admin",
"temporary":false,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
"name":"SalesOrderTable",
"description":"Sales Order Table added via Kafka"
},
"relationshipAttributes":{
"columns":[
{
"guid":"-1102395743156037",
"typeName":"azure_sql_column",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID"
}
},
{
"guid":"-1102395743156038",
"typeName":"azure_sql_column",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate"
}
}
]
},
"guid":"-1102395743156036",
"version":0
}
],
"referredEntities":{
"-1102395743156037":{
"typeName":"azure_sql_column",
"attributes":{
"owner":null,
"userTypeId":61,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID",
"precision":23,
"length":8,
"description":"Sales Order ID",
"scale":3,
"name":"OrderID",
"data_type":"int"
},
"relationshipAttributes":{
"table":{
"guid":"-1102395743156036",
"typeName":"azure_sql_table",
"entityStatus":"ACTIVE",
"displayText":"SalesOrderTable",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
}
}
},
"guid":"-1102395743156037",
"version":2
},
"-1102395743156038":{
"typeName":"azure_sql_column",
"attributes":{
"owner":null,
"userTypeId":61,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate",
"description":"Sales Order Date",
"scale":3,
"name":"OrderDate",
"data_type":"datetime"
},
"relationshipAttributes":{
"table":{
"guid":"-1102395743156036",
"typeName":"azure_sql_table",
"entityStatus":"ACTIVE",
"displayText":"SalesOrderTable",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
}
}
},
"guid":"-1102395743156038",
"status":"ACTIVE",
"createdBy":"ServiceAdmin",
"version":0
}
}
}
},
"version":{
"version":"1.0.0"
},
"msgCompressionKind":"NONE",
"msgSplitIdx":1,
"msgSplitCount":1
}
Em seguida, saiba como escrever uma aplicação de consola .NET Core que recebe mensagens de hubs de eventos com um processador de eventos. O processador de eventos gere pontos de verificação persistentes e recepções paralelas a partir de hubs de eventos. Isto simplifica o processo de receção de eventos. Tem de utilizar o hub de eventos ATLAS_ENTITIES para receber mensagens do Microsoft Purview.
Para receber mensagens do Microsoft Purview, precisará de um Hub de Eventos gerido ou de uma configuração de notificação dos Hubs de Eventos.
Aviso
O SDK dos Hubs de Eventos utiliza a versão mais recente da API de Armazenamento disponível. Essa versão pode não estar necessariamente disponível na sua plataforma do Stack Hub. Se executar este código no Azure Stack Hub, ocorrerá erros de runtime, a menos que tenha como destino a versão específica que está a utilizar. Se estiver a utilizar Armazenamento de Blobs do Azure como um arquivo de pontos de verificação, reveja a versão suportada da API de Armazenamento do Azure para a compilação do Azure Stack Hub e, no código, direcione essa versão.
A versão mais elevada disponível do serviço de Armazenamento é a versão 2019-02-02. Por predefinição, a biblioteca de cliente do SDK dos Hubs de Eventos utiliza a versão mais elevada disponível no Azure (2019-07-07 no momento do lançamento do SDK). Se estiver a utilizar a versão 2005 do Azure Stack Hub, além de seguir os passos nesta secção, também terá de adicionar código que se destina à versão da API do Serviço de armazenamento 2019-02-02. Para saber como direcionar uma versão específica da API de Armazenamento, veja este exemplo no GitHub.
Vamos utilizar o Armazenamento do Azure como o arquivo de pontos de verificação. Utilize os seguintes passos para criar uma conta de Armazenamento do Azure.
Obter o cadeia de conexão da conta de armazenamento
Anote o cadeia de conexão e o nome do contentor. Irá utilizá-los no código de receção.
- Na janela Gerenciador de Soluções, selecione sem soltar (ou clique com o botão direito do rato) a solução EventHubQuickStart, aponte para Adicionar e selecione Novo Projeto.
- Selecione Aplicação de Consola (.NET Core) e selecione Seguinte.
- Introduza PurviewKafkaConsumer para o Nome do projeto e selecione Criar.
Selecione Ferramentas>Consola do Gestor de Pacotes nuGet Gestor> dePacotesno menu.
Execute o seguinte comando para instalar o pacote NuGet Azure.Messaging.EventHubs :
Install-Package Azure.Messaging.EventHubs
Execute o seguinte comando para instalar o pacote NuGet Azure.Messaging.EventHubs.Processor :
Install-Package Azure.Messaging.EventHubs.Processor
Adicione as seguintes
using
instruções na parte superior do ficheiro Program.cs .using System; using System.Text; using System.Threading.Tasks; using Azure.Storage.Blobs; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Processor;
Adicione constantes à
Program
classe para os Hubs de Eventos cadeia de conexão e o nome do hub de eventos. Substitua os marcadores de posição entre parênteses retos pelos valores reais que obteve quando criou o hub de eventos e a conta de armazenamento (chaves de acesso – cadeia de conexão primário). Certifique-se de que o{Event Hubs namespace connection string}
é o cadeia de conexão ao nível do espaço de nomes e não a cadeia do hub de eventos.private const string ehubNamespaceConnectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>"; private const string eventHubName = "<EVENT HUB NAME>"; private const string blobStorageConnectionString = "<AZURE STORAGE CONNECTION STRING>"; private const string blobContainerName = "<BLOB CONTAINER NAME>";
Utilize ATLAS_ENTITIES como o nome do hub de eventos ao enviar mensagens para o Microsoft Purview.
Substitua o
Main
método pelo seguinteasync Main
método. Veja os comentários no código para obter detalhes.static async Task Main() { // Read from the default consumer group: $Default string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName; // Create a blob container client that the event processor will use BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName); // Create an event processor client to process events in the event hub EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName); // Register handlers for processing events and handling errors processor.ProcessEventAsync += ProcessEventHandler; processor.ProcessErrorAsync += ProcessErrorHandler; // Start the processing await processor.StartProcessingAsync(); // Wait for 10 seconds for the events to be processed await Task.Delay(TimeSpan.FromSeconds(10)); // Stop the processing await processor.StopProcessingAsync(); }
Agora, adicione os seguintes métodos de processador de eventos e erros à classe .
static async Task ProcessEventHandler(ProcessEventArgs eventArgs) { // Write the body of the event to the console window Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray())); // Update checkpoint in the blob storage so that the app receives only new events the next time it's run await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken); } static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs) { // Write details about the error to the console window Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen."); Console.WriteLine(eventArgs.Exception.Message); return Task.CompletedTask; }
Compile o projeto. Assegure-se de que não existem erros.
Observação
Para obter o código fonte completo com comentários mais informativos, veja este ficheiro no GitHub.
Execute a aplicação recetora.
{
"version":
{"version":"1.0.0",
"versionParts":[1]
},
"msgCompressionKind":"NONE",
"msgSplitIdx":1,
"msgSplitCount":1,
"msgSourceIP":"10.244.155.5",
"msgCreatedBy":
"",
"msgCreationTime":1618588940869,
"message":{
"type":"ENTITY_NOTIFICATION_V2",
"entity":{
"typeName":"azure_sql_table",
"attributes":{
"owner":"admin",
"createTime":0,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
"name":"SalesOrderTable",
"description":"Sales Order Table"
},
"guid":"ead5abc7-00a4-4d81-8432-d5f6f6f60000",
"status":"ACTIVE",
"displayText":"SalesOrderTable"
},
"operationType":"ENTITY_UPDATE",
"eventTime":1618588940567
}
}
Veja mais exemplos no GitHub.