訓練
認證
Microsoft Certified: Azure Cosmos DB Developer Specialty - Certifications
透過 Microsoft Azure Cosmos DB 在 SQL API 和 SDK 中撰寫有效率的查詢、建立索引編製原則、管理及佈建資源。
注意
Microsoft Purview 資料目錄 會將其名稱變更為 Microsoft Purview Unified Catalog。 所有功能都會維持不變。 當新的 Microsoft Purview 數據控管體驗在您的區域中正式推出時,您會看到名稱變更。 檢查您區域中的名稱。
本快速入門會教導您如何傳送和接收 Atlas Kafka 主題事件。 我們將使用 Azure 事件中樞 和 Azure.Messaging.EventHubs .NET 連結庫。
如果您不熟悉事件中樞,請先參閱 事件中樞概觀 ,再完成本快速入門。
若要遵循本快速入門,您需要具備特定必要條件:
latest
。 如果您使用 Visual Studio 2019 之前的 Visual Studio 版本,則沒有建置 C# 8.0 專案所需的工具。 您可以 在這裡下載 Visual Studio 2022,包括免費的 Community 版本。讓我們建立 .NET Core 控制台應用程式,透過事件中樞 Kafka 主題將事件傳送至 Microsoft Purview,ATLAS_HOOK。
若要將訊息發佈至 Microsoft Purview,您需要一個受控 事件中樞,或 至少一個具有攔截組態的事件中樞。
接下來,在 Visual Studio 中建立 C# .NET 控制台應用程式:
從功能表中選取 [工具>NuGet 套件管理員>套件管理員主控台 ]。
執行下列命令來安裝 Azure.Messaging.EventHubs NuGet 套件和 Azure.Messaging.EventHubs.Producer NuGet 套件:
Install-Package Azure.Messaging.EventHubs
Install-Package Azure.Messaging.EventHubs.Producer
將下列 using
語句新增至 Program.cs 檔案的頂端:
using System;
using System.Text;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
將常數新增至Program
事件中樞 連接字串 和事件中樞名稱的 類別。
private const string connectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>";
private const string eventHubName = "<EVENT HUB NAME>";
將方法 Main
取代為下列 async Main
方法, async ProduceMessage
並新增 以將訊息推送至 Microsoft Purview。 如需詳細資訊,請參閱程序代碼中的批註。
static async Task Main()
{
// Read from the default consumer group: $Default
string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
/ Create an event producer client to add events in the event hub
EventHubProducerClient producer = new EventHubProducerClient(ehubNamespaceConnectionString, eventHubName);
await ProduceMessage(producer);
}
static async Task ProduceMessage(EventHubProducerClient producer)
{
// Create a batch of events
using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
// Add events to the batch. An event is a represented by a collection of bytes and metadata.
eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<First event>")));
eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Second event>")));
eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Third event>")));
// Use the producer client to send the batch of events to the event hub
await producerClient.SendAsync(eventBatch);
Console.WriteLine("A batch of 3 events has been published.");
}
建置專案。 請確定沒有任何錯誤。
執行程式並等候確認訊息。
注意
如需具有詳細資訊批註的完整原始程式碼,請參閱 GitHub 中的此檔案
{
"msgCreatedBy":"nayenama",
"message":{
"type":"ENTITY_CREATE_V2",
"user":"admin",
"entities":{
"entities":[
{
"typeName":"azure_sql_table",
"attributes":{
"owner":"admin",
"temporary":false,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
"name":"SalesOrderTable",
"description":"Sales Order Table added via Kafka"
},
"relationshipAttributes":{
"columns":[
{
"guid":"-1102395743156037",
"typeName":"azure_sql_column",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID"
}
},
{
"guid":"-1102395743156038",
"typeName":"azure_sql_column",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate"
}
}
]
},
"guid":"-1102395743156036",
"version":0
}
],
"referredEntities":{
"-1102395743156037":{
"typeName":"azure_sql_column",
"attributes":{
"owner":null,
"userTypeId":61,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID",
"precision":23,
"length":8,
"description":"Sales Order ID",
"scale":3,
"name":"OrderID",
"data_type":"int"
},
"relationshipAttributes":{
"table":{
"guid":"-1102395743156036",
"typeName":"azure_sql_table",
"entityStatus":"ACTIVE",
"displayText":"SalesOrderTable",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
}
}
},
"guid":"-1102395743156037",
"version":2
},
"-1102395743156038":{
"typeName":"azure_sql_column",
"attributes":{
"owner":null,
"userTypeId":61,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate",
"description":"Sales Order Date",
"scale":3,
"name":"OrderDate",
"data_type":"datetime"
},
"relationshipAttributes":{
"table":{
"guid":"-1102395743156036",
"typeName":"azure_sql_table",
"entityStatus":"ACTIVE",
"displayText":"SalesOrderTable",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
}
}
},
"guid":"-1102395743156038",
"status":"ACTIVE",
"createdBy":"ServiceAdmin",
"version":0
}
}
}
},
"version":{
"version":"1.0.0"
},
"msgCompressionKind":"NONE",
"msgSplitIdx":1,
"msgSplitCount":1
}
接下來,瞭解如何撰寫使用事件處理器從事件中樞接收訊息的 .NET Core 控制台應用程式。 事件處理器會管理來自事件中樞的持續性檢查點和平行接收。 這可簡化接收事件的程式。 您必須使用ATLAS_ENTITIES事件中樞來接收來自 Purview Microsoft訊息。
若要從 Microsoft Purview 接收訊息,您需要受控 事件中樞或 事件中樞通知組態。
警告
事件中樞 SDK 使用最新版的記憶體 API。 該版本不一定可在您的 Stack Hub 平臺上使用。 如果您在 Azure Stack Hub 上執行此程式代碼,除非您以所使用的特定版本為目標,否則會發生運行時錯誤。 如果您使用 Azure Blob 儲存體 作為檢查點存放區,請檢閱 Azure Stack Hub 組建支援的 Azure 記憶體 API 版本,並在您的程式代碼中以該版本為目標。
記憶體服務的最高可用版本是 2019-02-02 版。 根據預設,在 SDK) 發行時,事件中樞 SDK 用戶端連結庫會在 Azure (2019-07-07 上使用最高可用版本。 如果您使用 Azure Stack Hub 2005 版,除了遵循本節中的步驟之外,您還需要新增以記憶體服務 API 2019-02-02 版為目標的程式代碼。 若要瞭解如何以特定記憶體 API 版本為目標,請參閱 GitHub 中的此範例。
我們將使用 Azure 記憶體作為檢查點存放區。 使用下列步驟來建立 Azure 記憶體帳戶。
記下 連接字串 和容器名稱。 您會在接收程式代碼中使用它們。
從功能表中選取 [工具>NuGet 套件管理員>套件管理員主控台 ]。
執行下列命令以安裝 Azure.Messaging.EventHubs NuGet 套件:
Install-Package Azure.Messaging.EventHubs
執行下列命令以安裝 Azure.Messaging.EventHubs.Processor NuGet 套件:
Install-Package Azure.Messaging.EventHubs.Processor
在Program.cs檔案頂端新增下列using
語句。
using System;
using System.Text;
using System.Threading.Tasks;
using Azure.Storage.Blobs;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Processor;
將常數新增至Program
事件中樞 連接字串 和事件中樞名稱的 類別。 將括弧中的佔位元取代為您在建立事件中樞時取得的實際值,並將記憶體帳戶 (存取密鑰 - 主要 連接字串) 。 請確定 {Event Hubs namespace connection string}
是命名空間層級 連接字串,而不是事件中樞字串。
private const string ehubNamespaceConnectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>";
private const string eventHubName = "<EVENT HUB NAME>";
private const string blobStorageConnectionString = "<AZURE STORAGE CONNECTION STRING>";
private const string blobContainerName = "<BLOB CONTAINER NAME>";
將 訊息傳送至 Microsoft Purview 時,請使用 ATLAS_ENTITIES 作為事件中樞名稱。
Main
將方法取代為下列async Main
方法。 如需詳細資訊,請參閱程序代碼中的批註。
static async Task Main()
{
// Read from the default consumer group: $Default
string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
// Create a blob container client that the event processor will use
BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
// Create an event processor client to process events in the event hub
EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);
// Register handlers for processing events and handling errors
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;
// Start the processing
await processor.StartProcessingAsync();
// Wait for 10 seconds for the events to be processed
await Task.Delay(TimeSpan.FromSeconds(10));
// Stop the processing
await processor.StopProcessingAsync();
}
現在,將下列事件和錯誤處理程式方法新增至 類別。
static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
{
// Write the body of the event to the console window
Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));
// Update checkpoint in the blob storage so that the app receives only new events the next time it's run
await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
}
static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
{
// Write details about the error to the console window
Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
Console.WriteLine(eventArgs.Exception.Message);
return Task.CompletedTask;
}
建置專案。 請確定沒有任何錯誤。
注意
如需具有詳細資訊批注的完整原始程式碼,請參閱 GitHub 上的此檔案。
執行接收者應用程式。
{
"version":
{"version":"1.0.0",
"versionParts":[1]
},
"msgCompressionKind":"NONE",
"msgSplitIdx":1,
"msgSplitCount":1,
"msgSourceIP":"10.244.155.5",
"msgCreatedBy":
"",
"msgCreationTime":1618588940869,
"message":{
"type":"ENTITY_NOTIFICATION_V2",
"entity":{
"typeName":"azure_sql_table",
"attributes":{
"owner":"admin",
"createTime":0,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
"name":"SalesOrderTable",
"description":"Sales Order Table"
},
"guid":"ead5abc7-00a4-4d81-8432-d5f6f6f60000",
"status":"ACTIVE",
"displayText":"SalesOrderTable"
},
"operationType":"ENTITY_UPDATE",
"eventTime":1618588940567
}
}
查看 GitHub 中的更多範例。
訓練
認證
Microsoft Certified: Azure Cosmos DB Developer Specialty - Certifications
透過 Microsoft Azure Cosmos DB 在 SQL API 和 SDK 中撰寫有效率的查詢、建立索引編製原則、管理及佈建資源。