Share via


使用事件中樞和 .NET 來傳送和接收 Atlas Kafka 主題訊息

本快速入門會教導您如何傳送和接收 Atlas Kafka 主題事件。 我們將使用Azure 事件中樞Azure.Messaging.EventHubs .NET 程式庫。

必要條件

如果您不熟悉事件中樞,請先參閱 事件中樞概觀 ,再完成本快速入門。

若要遵循本快速入門,您需要具備特定必要條件:

  • Microsoft Azure 訂用帳戶。 若要使用包括事件中樞在內的 Azure 服務,您需要 Azure 訂用帳戶。 如果您沒有 Azure 帳戶,您可以註冊 免費試 用,或在 建立帳戶時使用 MSDN 訂閱者權益。
  • Microsoft Visual Studio 2022。 事件中樞用戶端程式庫會使用 C# 8.0 中引進的新功能。 您仍然可以搭配舊版 C# 使用程式庫,但無法使用新的語法。 若要使用完整的語法,建議您使用 .NET Core SDK 3.0 或更高版本進行編譯,並將 語言版本 設定為 latest 。 如果您使用 Visual Studio 2019 之前的 Visual Studio 版本,則沒有建置 C# 8.0 專案所需的工具。 您可以 在這裡下載 Visual Studio 2022,包括免費的 Community 版本。
  • 使用中的 Microsoft Purview 帳戶
  • 使用您的 Microsoft Purview 帳戶設定的事件中樞,可傳送和接收訊息

將訊息發佈至 Microsoft Purview

讓我們建立 .NET Core 主控台應用程式,透過事件中樞 Kafka 主題將事件傳送至 Microsoft Purview ,ATLAS_HOOK

若要將訊息發佈至 Microsoft Purview,您需要一個受控 事件中樞,或 至少一個具有攔截組態的事件中樞

建立 Visual Studio 專案

接下來,在 Visual Studio 中建立 C# .NET 主控台應用程式:

  1. 啟動 Visual Studio
  2. 在 [開始] 視窗中,選取 [建立新的專案>主控台應用程式 (.NET Framework) 。 需要 .NET 4.5.2 版或更新版本。
  3. [專案名稱] 中,輸入 PurviewKafkaProducer
  4. 取 [建立 ] 以建立專案。

建立主控台應用程式

  1. 啟動 Visual Studio 2022。
  2. 取 [建立新專案]
  3. 在 [ 建立新專案 ] 對話方塊上,執行下列步驟:如果您沒有看到此對話方塊,請選取功能表上的 [ 檔案 ],選取 [ 新增],然後選取 [ 專案]
    1. 選取 C# 作為程式設計語言。
    2. 取 [主控台 ] 作為應用程式的類型。
    3. 從結果清單中選取 [主控台應用程式] (.NET [核心) ]。
    4. 然後,選取 [下一步]

新增事件中樞 NuGet 套件

  1. 從功能表中選取[工具>NuGet 套件管理員>套件管理員主控台]。

  2. 執行下列命令來安裝 Azure.Messaging.EventHubs NuGet 套件和 Azure.Messaging.EventHubs.Producer NuGet 套件:

    Install-Package Azure.Messaging.EventHubs
    
    Install-Package Azure.Messaging.EventHubs.Producer
    

撰寫將訊息傳送至事件中樞的程式碼

  1. 將下列 using 語句新增至 Program.cs 檔案的頂端:

    using System;
    using System.Text;
    using System.Threading.Tasks;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Producer;
    
  2. 將常數新增至 Program 事件中樞連接字串和事件中樞名稱的 類別。

    private const string connectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>";
    private const string eventHubName = "<EVENT HUB NAME>";
    
  3. 將 方法 Main 取代為下列 async Main 方法,並新增 async ProduceMessage 以將訊息推送至 Microsoft Purview。 如需詳細資訊,請參閱程式碼中的批註。

        static async Task Main()
        {
            // Read from the default consumer group: $Default
            string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
    
     		/ Create an event producer client to add events in the event hub
            EventHubProducerClient producer = new EventHubProducerClient(ehubNamespaceConnectionString, eventHubName);
    
     		await ProduceMessage(producer);
        }
    
     	static async Task ProduceMessage(EventHubProducerClient producer)
    
        {
     		// Create a batch of events 
     		using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
    
     		// Add events to the batch. An event is a represented by a collection of bytes and metadata. 
     		eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<First event>")));
     		eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Second event>")));
     		eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Third event>")));
    
     		// Use the producer client to send the batch of events to the event hub
     		await producerClient.SendAsync(eventBatch);
     		Console.WriteLine("A batch of 3 events has been published.");
    
     	}
    
  4. 建置專案。 請確定沒有任何錯誤。

  5. 執行程式並等候確認訊息。

    注意事項

    如需具有詳細資訊批註的完整原始程式碼,請參閱 GitHub 中的此檔案

使用建立實體 JSON 訊息建立具有兩個數據行之 sql 資料表的範例程式碼

	
	{
    "msgCreatedBy":"nayenama",
    "message":{
        "type":"ENTITY_CREATE_V2",
        "user":"admin",
        "entities":{
            "entities":[
                {
                    "typeName":"azure_sql_table",
                    "attributes":{
                        "owner":"admin",
                        "temporary":false,
                        "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
                        "name":"SalesOrderTable",
                        "description":"Sales Order Table added via Kafka"
                    },
                    "relationshipAttributes":{
                        "columns":[
                            {
                                "guid":"-1102395743156037",
                                "typeName":"azure_sql_column",
                                "uniqueAttributes":{
                                    "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID"
                                }
                            },
                            {
                                "guid":"-1102395743156038",
                                "typeName":"azure_sql_column",
                                "uniqueAttributes":{
                                    "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate"
                                }
                            }
                        ]
                    },
                    "guid":"-1102395743156036",
                    "version":0
                }
            ],
            "referredEntities":{
                "-1102395743156037":{
                    "typeName":"azure_sql_column",
                    "attributes":{
                        "owner":null,
                        "userTypeId":61,
                        "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID",
                        "precision":23,
                        "length":8,
                        "description":"Sales Order ID",
                        "scale":3,
                        "name":"OrderID",
                        "data_type":"int"
                    },
                    "relationshipAttributes":{
                        "table":{
                            "guid":"-1102395743156036",
                            "typeName":"azure_sql_table",
                            "entityStatus":"ACTIVE",
                            "displayText":"SalesOrderTable",
                            "uniqueAttributes":{
                                "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
                            }
                        }
                    },
                    "guid":"-1102395743156037",
                    "version":2
                },
                "-1102395743156038":{
                    "typeName":"azure_sql_column",
                    "attributes":{
                        "owner":null,
                        "userTypeId":61,
                        "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate",
                        "description":"Sales Order Date",
                        "scale":3,
                        "name":"OrderDate",
                        "data_type":"datetime"
                    },
                    "relationshipAttributes":{
                        "table":{
                            "guid":"-1102395743156036",
                            "typeName":"azure_sql_table",
                            "entityStatus":"ACTIVE",
                            "displayText":"SalesOrderTable",
                            "uniqueAttributes":{
                                "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
                            }
                        }
                    },
                    "guid":"-1102395743156038",
                    "status":"ACTIVE",
                    "createdBy":"ServiceAdmin",
                    "version":0
                }
            }
        }
    },
    "version":{
        "version":"1.0.0"
    },
    "msgCompressionKind":"NONE",
    "msgSplitIdx":1,
    "msgSplitCount":1
}


接收 Microsoft Purview 訊息

接下來,瞭解如何撰寫使用事件處理器從事件中樞接收訊息的 .NET Core 主控台應用程式。 事件處理器會管理來自事件中樞的持續性檢查點和平行接收。 這可簡化接收事件的程式。 您必須使用ATLAS_ENTITIES事件中樞來接收來自 Microsoft Purview 的訊息。

若要從 Microsoft Purview 接收訊息,您需要受控 事件中樞事件中樞通知設定

警告

事件中樞 SDK 使用最新版的儲存體 API。 該版本不一定可在您的 Stack Hub 平臺上使用。 如果您在 Azure Stack Hub 上執行此程式碼,除非您以所使用的特定版本為目標,否則會發生執行階段錯誤。 如果您使用Azure Blob 儲存體作為檢查點存放區,請檢閱Azure Stack Hub 組建支援的 Azure 儲存體 API 版本,並在您的程式碼中以該版本為目標。

儲存體服務的最高可用版本是 2019-02-02 版。 根據預設,在 SDK) 發行時,事件中樞 SDK 用戶端程式庫會在 Azure (2019-07-07 上使用最高可用版本。 如果您使用 Azure Stack Hub 2005 版,除了遵循本節中的步驟之外,您還需要新增以儲存體服務 API 2019-02-02 版為目標的程式碼。 若要瞭解如何以特定儲存體 API 版本為目標,請參閱 GitHub 中的此範例

建立 Azure 儲存體和 Blob 容器

我們將使用 Azure 儲存體作為檢查點存放區。 使用下列步驟來建立 Azure 儲存體帳戶。

  1. 建立 Azure 儲存體帳戶

  2. 建立 Blob 容器

  3. 取得儲存體帳戶的連接字串

    記下連接字串和容器名稱。 您會在接收程式碼中使用它們。

為接收者建立 Visual Studio 專案

  1. 在 [方案總管] 視窗中,選取並按住 (,或以滑鼠右鍵按一下EventHubQuickStart方案) ,指向 [新增],然後選取 [新增專案]
  2. 取 [主控台應用程式] (.NET [核心) ],然後選取 [ 下一步]
  3. 輸入 PurviewKafkaConsumer 作為 [專案名稱],然後選取 [ 建立]

新增事件中樞 NuGet 套件

  1. 從功能表中選取[工具>NuGet 套件管理員>套件管理員主控台]。

  2. 執行下列命令以安裝 Azure.Messaging.EventHubs NuGet 套件:

    Install-Package Azure.Messaging.EventHubs
    
  3. 執行下列命令以安裝 Azure.Messaging.EventHubs.Processor NuGet 套件:

    Install-Package Azure.Messaging.EventHubs.Processor
    

更新 Main 方法

  1. Program.cs檔案頂端新增下列 using 語句。

    using System;
    using System.Text;
    using System.Threading.Tasks;
    using Azure.Storage.Blobs;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Consumer;
    using Azure.Messaging.EventHubs.Processor;
    
  2. 將常數新增至 Program 事件中樞連接字串和事件中樞名稱的 類別。 將括弧中的預留位置取代為您在建立事件中樞時取得的實際值,以及儲存體帳戶 (存取金鑰 - 主要連接字串) 。 請確定 {Event Hubs namespace connection string} 是命名空間層級連接字串,而不是事件中樞字串。

        private const string ehubNamespaceConnectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>";
        private const string eventHubName = "<EVENT HUB NAME>";
        private const string blobStorageConnectionString = "<AZURE STORAGE CONNECTION STRING>";
        private const string blobContainerName = "<BLOB CONTAINER NAME>";
    

    將訊息傳送至 Microsoft Purview 時,請使用 ATLAS_ENTITIES 作為事件中樞名稱。

  3. Main將 方法取代為下列 async Main 方法。 如需詳細資訊,請參閱程式碼中的批註。

        static async Task Main()
        {
            // Read from the default consumer group: $Default
            string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
    
            // Create a blob container client that the event processor will use 
            BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
    
            // Create an event processor client to process events in the event hub
            EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);
    
            // Register handlers for processing events and handling errors
            processor.ProcessEventAsync += ProcessEventHandler;
            processor.ProcessErrorAsync += ProcessErrorHandler;
    
            // Start the processing
            await processor.StartProcessingAsync();
    
            // Wait for 10 seconds for the events to be processed
            await Task.Delay(TimeSpan.FromSeconds(10));
    
            // Stop the processing
            await processor.StopProcessingAsync();
        }    
    
  4. 現在,將下列事件和錯誤處理常式方法新增至 類別。

        static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
        {
            // Write the body of the event to the console window
            Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));
    
            // Update checkpoint in the blob storage so that the app receives only new events the next time it's run
            await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
        }
    
        static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
        {
            // Write details about the error to the console window
            Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
            Console.WriteLine(eventArgs.Exception.Message);
            return Task.CompletedTask;
        }    
    
  5. 建置專案。 請確定沒有任何錯誤。

    注意事項

    如需具有詳細資訊批註的完整原始程式碼,請參閱 GitHub 上的此檔案

  6. 執行接收者應用程式。

從 Microsoft Purview 接收的訊息範例

{
	"version":
		{"version":"1.0.0",
		 "versionParts":[1]
		},
		 "msgCompressionKind":"NONE",
		 "msgSplitIdx":1,
		 "msgSplitCount":1,
		 "msgSourceIP":"10.244.155.5",
		 "msgCreatedBy":
		 "",
		 "msgCreationTime":1618588940869,
		 "message":{
			"type":"ENTITY_NOTIFICATION_V2",
			"entity":{
				"typeName":"azure_sql_table",
					"attributes":{
						"owner":"admin",
						"createTime":0,
						"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
						"name":"SalesOrderTable",
						"description":"Sales Order Table"
						},
						"guid":"ead5abc7-00a4-4d81-8432-d5f6f6f60000",
						"status":"ACTIVE",
						"displayText":"SalesOrderTable"
					},
					"operationType":"ENTITY_UPDATE",
					"eventTime":1618588940567
				}
}

後續步驟

查看 GitHub 中的更多範例。