Partager via


Utiliser Event Hubs et .NET pour envoyer et recevoir des messages de rubriques Atlas Kafka

Ce guide de démarrage rapide vous apprend à envoyer et à recevoir des événements de rubriques Atlas Kafka . Nous allons utiliser Azure Event Hubs et la bibliothèque .NET Azure.Messaging.EventHubs.

Configuration requise

Si vous débutez avec Event Hubs, consultez Vue d’ensemble d’Event Hubs avant de terminer ce guide de démarrage rapide.

Pour suivre ce guide de démarrage rapide, vous devez disposer de certaines conditions préalables :

  • Un abonnement Microsoft Azure. Pour utiliser les services Azure, y compris Event Hubs, vous avez besoin d’un abonnement Azure. Si vous n’avez pas de compte Azure, vous pouvez vous inscrire à un essai gratuit ou utiliser les avantages de votre abonné MSDN lorsque vous créez un compte.
  • Microsoft Visual Studio 2022. La bibliothèque de client Event Hubs utilise les nouvelles fonctionnalités introduites dans C# 8.0. Vous pouvez toujours utiliser la bibliothèque avec les versions C# précédentes, mais la nouvelle syntaxe ne sera pas disponible. Pour utiliser la syntaxe complète, il est recommandé de compiler avec le SDK .NET Core 3.0 ou version ultérieure et la version du langage définie sur latest. Si vous utilisez une version de Visual Studio antérieure à Visual Studio 2019, elle ne dispose pas des outils nécessaires pour générer des projets C# 8.0. Visual Studio 2022, y compris l’édition Community gratuite, peut être téléchargé ici.
  • Un compte Microsoft Purview actif.
  • Un Hubs d’événements configuré avec votre compte Microsoft Purview pour envoyer et recevoir des messages :
    • Votre compte est peut-être déjà configuré. Vous pouvez case activée votre compte Microsoft Purview dans le Portail Azure sous Paramètres, configuration Kafka. S’il n’est pas déjà configuré, suivez ce guide.

Publier des messages sur Microsoft Purview

Nous allons créer une application console .NET Core qui envoie des événements à Microsoft Purview via la rubrique Kafka Event Hubs , ATLAS_HOOK.

Pour publier des messages sur Microsoft Purview, vous aurez besoin d’un Event Hubs managé ou d’au moins un Event Hubs avec une configuration de hook.

Créer un projet Visual Studio

Ensuite, créez une application console C# .NET dans Visual Studio :

  1. Lancez Visual Studio.
  2. Dans la fenêtre Démarrer, sélectionnez Créer une application console de projet>(.NET Framework). .NET version 4.5.2 ou ultérieure est obligatoire.
  3. Dans Nom du projet, entrez PurviewKafkaProducer.
  4. Sélectionnez Créer pour créer le projet.

Créer une application console

  1. Démarrez Visual Studio 2022.
  2. Sélectionnez Créer un projet.
  3. Dans la boîte de dialogue Créer un projet , procédez comme suit : Si cette boîte de dialogue ne s’affiche pas, sélectionnez Fichier dans le menu, sélectionnez Nouveau, puis Projet.
    1. Sélectionnez C# pour le langage de programmation.
    2. Sélectionnez Console pour le type de l’application.
    3. Sélectionnez Application console (.NET Core) dans la liste des résultats.
    4. Puis sélectionnez Suivant.

Ajouter le package NuGet Event Hubs

  1. Sélectionnez Outils>NuGet Package ManagerConsole du gestionnaire > depackage dans le menu.

  2. Exécutez la commande suivante pour installer le package NuGet Azure.Messaging.EventHubs et le package NuGet Azure.Messaging.EventHubs.Producer :

    Install-Package Azure.Messaging.EventHubs
    
    Install-Package Azure.Messaging.EventHubs.Producer
    

Écrire du code qui envoie des messages au hub d’événements

  1. Ajoutez les instructions suivantes using en haut du fichier Program.cs :

    using System;
    using System.Text;
    using System.Threading.Tasks;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Producer;
    
  2. Ajoutez des constantes à la Program classe pour la chaîne de connexion Event Hubs et le nom Event Hubs.

    private const string connectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>";
    private const string eventHubName = "<EVENT HUB NAME>";
    
  3. Remplacez la Main méthode par la méthode suivante async Main et ajoutez un async ProduceMessage pour envoyer des messages dans Microsoft Purview. Pour plus d’informations, consultez les commentaires dans le code.

        static async Task Main()
        {
            // Read from the default consumer group: $Default
            string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
    
     		/ Create an event producer client to add events in the event hub
            EventHubProducerClient producer = new EventHubProducerClient(ehubNamespaceConnectionString, eventHubName);
    
     		await ProduceMessage(producer);
        }
    
     	static async Task ProduceMessage(EventHubProducerClient producer)
    
        {
     		// Create a batch of events 
     		using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
    
     		// Add events to the batch. An event is a represented by a collection of bytes and metadata. 
     		eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<First event>")));
     		eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Second event>")));
     		eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Third event>")));
    
     		// Use the producer client to send the batch of events to the event hub
     		await producerClient.SendAsync(eventBatch);
     		Console.WriteLine("A batch of 3 events has been published.");
    
     	}
    
  4. Créez le projet. Assurez-vous qu'il n'y a pas d'erreurs.

  5. Exécutez le programme et attendez le message de confirmation.

    Remarque

    Pour obtenir le code source complet avec des commentaires plus détaillés, consultez ce fichier dans GitHub

Exemple de code qui crée une table SQL avec deux colonnes à l’aide d’un message JSON Créer une entité

	
	{
    "msgCreatedBy":"nayenama",
    "message":{
        "type":"ENTITY_CREATE_V2",
        "user":"admin",
        "entities":{
            "entities":[
                {
                    "typeName":"azure_sql_table",
                    "attributes":{
                        "owner":"admin",
                        "temporary":false,
                        "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
                        "name":"SalesOrderTable",
                        "description":"Sales Order Table added via Kafka"
                    },
                    "relationshipAttributes":{
                        "columns":[
                            {
                                "guid":"-1102395743156037",
                                "typeName":"azure_sql_column",
                                "uniqueAttributes":{
                                    "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID"
                                }
                            },
                            {
                                "guid":"-1102395743156038",
                                "typeName":"azure_sql_column",
                                "uniqueAttributes":{
                                    "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate"
                                }
                            }
                        ]
                    },
                    "guid":"-1102395743156036",
                    "version":0
                }
            ],
            "referredEntities":{
                "-1102395743156037":{
                    "typeName":"azure_sql_column",
                    "attributes":{
                        "owner":null,
                        "userTypeId":61,
                        "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID",
                        "precision":23,
                        "length":8,
                        "description":"Sales Order ID",
                        "scale":3,
                        "name":"OrderID",
                        "data_type":"int"
                    },
                    "relationshipAttributes":{
                        "table":{
                            "guid":"-1102395743156036",
                            "typeName":"azure_sql_table",
                            "entityStatus":"ACTIVE",
                            "displayText":"SalesOrderTable",
                            "uniqueAttributes":{
                                "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
                            }
                        }
                    },
                    "guid":"-1102395743156037",
                    "version":2
                },
                "-1102395743156038":{
                    "typeName":"azure_sql_column",
                    "attributes":{
                        "owner":null,
                        "userTypeId":61,
                        "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate",
                        "description":"Sales Order Date",
                        "scale":3,
                        "name":"OrderDate",
                        "data_type":"datetime"
                    },
                    "relationshipAttributes":{
                        "table":{
                            "guid":"-1102395743156036",
                            "typeName":"azure_sql_table",
                            "entityStatus":"ACTIVE",
                            "displayText":"SalesOrderTable",
                            "uniqueAttributes":{
                                "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
                            }
                        }
                    },
                    "guid":"-1102395743156038",
                    "status":"ACTIVE",
                    "createdBy":"ServiceAdmin",
                    "version":0
                }
            }
        }
    },
    "version":{
        "version":"1.0.0"
    },
    "msgCompressionKind":"NONE",
    "msgSplitIdx":1,
    "msgSplitCount":1
}


Recevoir des messages Microsoft Purview

Découvrez ensuite comment écrire une application console .NET Core qui reçoit des messages d’Event Hubs à l’aide d’un processeur d’événements. Le processeur d’événements gère les points de contrôle persistants et les réceptions parallèles à partir d’Event Hubs. Cela simplifie le processus de réception des événements. Vous devez utiliser le hub d’événements ATLAS_ENTITIES pour recevoir des messages de Microsoft Purview.

Pour recevoir des messages de Microsoft Purview, vous avez besoin d’un Event Hubs managé ou d’une configuration de notification Event Hubs.

Avertissement

Le Kit de développement logiciel (SDK) Event Hubs utilise la version la plus récente de l’API de stockage disponible. Cette version n’est pas nécessairement disponible sur votre plateforme Stack Hub. Si vous exécutez ce code sur Azure Stack Hub, vous rencontrerez des erreurs d’exécution, sauf si vous ciblez la version spécifique que vous utilisez. Si vous utilisez Stockage Blob Azure comme magasin de points de contrôle, passez en revue la version de l’API Stockage Azure prise en charge pour votre build Azure Stack Hub et, dans votre code, ciblez cette version.

La version la plus élevée disponible du service de stockage est la version 2019-02-02. Par défaut, la bibliothèque de client du KIT de développement logiciel (SDK) Event Hubs utilise la version la plus élevée disponible sur Azure (2019-07-07 au moment de la publication du SDK). Si vous utilisez Azure Stack Hub version 2005, en plus de suivre les étapes décrites dans cette section, vous devez également ajouter du code qui cible l’API du service de stockage version 2019-02-02. Pour savoir comment cibler une version spécifique de l’API de stockage, consultez cet exemple dans GitHub.

Créer un stockage Azure et un conteneur d’objets blob

Nous allons utiliser stockage Azure comme magasin de points de contrôle. Procédez comme suit pour créer un compte de stockage Azure.

  1. Créer un compte de stockage Azure

  2. Créer un conteneur d’objets blob

  3. Obtenir la chaîne de connexion du compte de stockage

    Notez la chaîne de connexion et le nom du conteneur. Vous les utiliserez dans le code de réception.

Créer un projet Visual Studio pour le récepteur

  1. Dans la fenêtre Explorateur de solutions, sélectionnez la solution EventHubQuickStart et maintenez la touche enfoncée (ou cliquez avec le bouton droit), pointez sur Ajouter, puis sélectionnez Nouveau projet.
  2. Sélectionnez Application console (.NET Core), puis Suivant.
  3. Entrez PurviewKafkaConsumer comme nom du projet, puis sélectionnez Créer.

Ajouter le package NuGet Event Hubs

  1. Sélectionnez Outils>NuGet Package ManagerConsole du gestionnaire > depackage dans le menu.

  2. Exécutez la commande suivante pour installer le package NuGet Azure.Messaging.EventHubs :

    Install-Package Azure.Messaging.EventHubs
    
  3. Exécutez la commande suivante pour installer le package NuGet Azure.Messaging.EventHubs.Processor :

    Install-Package Azure.Messaging.EventHubs.Processor
    

Mettre à jour la méthode Main

  1. Ajoutez les instructions suivantes using en haut du fichier Program.cs .

    using System;
    using System.Text;
    using System.Threading.Tasks;
    using Azure.Storage.Blobs;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Consumer;
    using Azure.Messaging.EventHubs.Processor;
    
  2. Ajoutez des constantes à la Program classe pour la chaîne de connexion Event Hubs et le nom du hub d’événements. Remplacez les espaces réservés entre crochets par les valeurs réelles que vous avez obtenues lors de la création du hub d’événements et du compte de stockage (clés d’accès - chaîne de connexion principale). Assurez-vous que est la chaîne de connexion au niveau de l’espace {Event Hubs namespace connection string} de noms, et non la chaîne du hub d’événements.

        private const string ehubNamespaceConnectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>";
        private const string eventHubName = "<EVENT HUB NAME>";
        private const string blobStorageConnectionString = "<AZURE STORAGE CONNECTION STRING>";
        private const string blobContainerName = "<BLOB CONTAINER NAME>";
    

    Utilisez ATLAS_ENTITIES comme nom de hub d’événements lors de l’envoi de messages à Microsoft Purview.

  3. Remplacez la Main méthode par la méthode suivante async Main . Pour plus d’informations, consultez les commentaires dans le code.

        static async Task Main()
        {
            // Read from the default consumer group: $Default
            string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
    
            // Create a blob container client that the event processor will use 
            BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
    
            // Create an event processor client to process events in the event hub
            EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);
    
            // Register handlers for processing events and handling errors
            processor.ProcessEventAsync += ProcessEventHandler;
            processor.ProcessErrorAsync += ProcessErrorHandler;
    
            // Start the processing
            await processor.StartProcessingAsync();
    
            // Wait for 10 seconds for the events to be processed
            await Task.Delay(TimeSpan.FromSeconds(10));
    
            // Stop the processing
            await processor.StopProcessingAsync();
        }    
    
  4. Ajoutez maintenant les méthodes de gestionnaire d’événements et d’erreurs suivantes à la classe .

        static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
        {
            // Write the body of the event to the console window
            Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));
    
            // Update checkpoint in the blob storage so that the app receives only new events the next time it's run
            await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
        }
    
        static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
        {
            // Write details about the error to the console window
            Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
            Console.WriteLine(eventArgs.Exception.Message);
            return Task.CompletedTask;
        }    
    
  5. Créez le projet. Assurez-vous qu'il n'y a pas d'erreurs.

    Remarque

    Pour obtenir le code source complet avec des commentaires plus détaillés, consultez ce fichier sur GitHub.

  6. Exécutez l’application réceptrice.

Exemple de message reçu de Microsoft Purview

{
	"version":
		{"version":"1.0.0",
		 "versionParts":[1]
		},
		 "msgCompressionKind":"NONE",
		 "msgSplitIdx":1,
		 "msgSplitCount":1,
		 "msgSourceIP":"10.244.155.5",
		 "msgCreatedBy":
		 "",
		 "msgCreationTime":1618588940869,
		 "message":{
			"type":"ENTITY_NOTIFICATION_V2",
			"entity":{
				"typeName":"azure_sql_table",
					"attributes":{
						"owner":"admin",
						"createTime":0,
						"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
						"name":"SalesOrderTable",
						"description":"Sales Order Table"
						},
						"guid":"ead5abc7-00a4-4d81-8432-d5f6f6f60000",
						"status":"ACTIVE",
						"displayText":"SalesOrderTable"
					},
					"operationType":"ENTITY_UPDATE",
					"eventTime":1618588940567
				}
}

Prochaines étapes

Consultez d’autres exemples dans GitHub.