Event Hubs と .NET を使用して Atlas Kafka トピック メッセージを送受信する
注意
Microsoft Purview データ カタログは、名前を Microsoft Purview 統合カタログに変更しています。 すべての機能は同じままです。 新しい Microsoft Purview データ ガバナンス エクスペリエンスがリージョンで一般公開されると、名前の変更が表示されます。 リージョン内の名前を確認します。
このクイック スタートでは、 Atlas Kafka トピック イベントを送受信する方法について説明します。 Azure Event Hubsと Azure.Messaging.EventHubs .NET ライブラリを使用します。
Event Hubs を初めて使用する場合は、このクイック スタートを完了する前に 、「Event Hubs の概要 」を参照してください。
このクイック スタートに従うには、特定の前提条件が必要です。
- Microsoft Azure サブスクリプション。 Event Hubs を含む Azure サービスを使用するには、Azure サブスクリプションが必要です。 Azure アカウントをお持ちでない場合は、 無料試用版 にサインアップするか、 アカウントの作成時に MSDN サブスクライバー特典を使用できます。
-
Microsoft Visual Studio 2022。 Event Hubs クライアント ライブラリは、C# 8.0 で導入された新機能を利用します。 以前の C# バージョンでもライブラリを使用できますが、新しい構文は使用できません。 完全な構文を使用するには、 .NET Core SDK 3.0 以降と 言語バージョン を
latest
に設定してコンパイルすることをお勧めします。 Visual Studio 2019 より前のバージョンの Visual Studio を使用している場合、C# 8.0 プロジェクトのビルドに必要なツールはありません。 無料の Community エディションを含む Visual Studio 2022 は、 こちらからダウンロードできます。 - アクティブな Microsoft Purview アカウント。
-
メッセージを送受信するように Microsoft Purview アカウントで構成された Event Hubs:
- アカウントは既に構成されている可能性があります。 Microsoft Purview アカウントは、[設定] の [Kafka 構成] のAzure portalでチェックできます。 まだ構成されていない場合は、 このガイドに従ってください。
ATLAS_HOOK、Event Hubs Kafka トピックを使用して Microsoft Purview にイベントを送信する .NET Core コンソール アプリケーション を作成しましょう。
Microsoft Purview にメッセージを発行するには、 マネージド Event Hubs か、 フック構成を持つ少なくとも 1 つの Event Hubs が必要です。
次に、Visual Studio で C# .NET コンソール アプリケーションを作成します。
- Visual Studio を起動します。
- [スタート] ウィンドウで、[新しいプロジェクトの作成>Console App (.NET Framework) を選択します。 .NET バージョン 4.5.2 以降が必要です。
- [ プロジェクト名] に「 PurviewKafkaProducer」と入力します。
- [ 作成] を 選択してプロジェクトを作成します。
- Visual Studio 2022 を起動します。
- [ 新しいプロジェクトの作成] を選択します。
- [ 新しいプロジェクトの作成 ] ダイアログ ボックスで、次の手順を実行します。このダイアログ ボックスが表示されない場合は、メニューの [ ファイル ] を選択し、[ 新規作成] を選択し、[ プロジェクト] を選択します。
- プログラミング言語として [C# ] を選択します。
- アプリケーションの種類として [ コンソール ] を選択します。
- 結果の一覧から [ コンソール アプリ (.NET Core)] を選択します。
- さらに [次へ] を選択します。
メニューから [ツール>NuGet パッケージ マネージャー>Package Manager コンソール ] を選択します。
次のコマンドを実行して、 Azure.Messaging.EventHubs NuGet パッケージと Azure.Messaging.EventHubs.Producer NuGet パッケージをインストールします。
Install-Package Azure.Messaging.EventHubs
Install-Package Azure.Messaging.EventHubs.Producer
Program.cs ファイルの先頭に次の
using
ステートメントを追加します。using System; using System.Text; using System.Threading.Tasks; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Producer;
Event Hubs 接続文字列 と Event Hubs 名の
Program
クラスに定数を追加します。private const string connectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>"; private const string eventHubName = "<EVENT HUB NAME>";
Main
メソッドを次のasync Main
メソッドに置き換え、メッセージを Microsoft Purview にプッシュするasync ProduceMessage
を追加します。 詳細については、コードのコメントを参照してください。static async Task Main() { // Read from the default consumer group: $Default string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName; / Create an event producer client to add events in the event hub EventHubProducerClient producer = new EventHubProducerClient(ehubNamespaceConnectionString, eventHubName); await ProduceMessage(producer); } static async Task ProduceMessage(EventHubProducerClient producer) { // Create a batch of events using EventDataBatch eventBatch = await producerClient.CreateBatchAsync(); // Add events to the batch. An event is a represented by a collection of bytes and metadata. eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<First event>"))); eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Second event>"))); eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Third event>"))); // Use the producer client to send the batch of events to the event hub await producerClient.SendAsync(eventBatch); Console.WriteLine("A batch of 3 events has been published."); }
プロジェクトをビルドします。 エラーがないことを確認します。
プログラムを実行し、確認メッセージを待ちます。
注意
詳細なコメントを含む完全なソース コードについては、GitHub のこのファイルを参照してください
{
"msgCreatedBy":"nayenama",
"message":{
"type":"ENTITY_CREATE_V2",
"user":"admin",
"entities":{
"entities":[
{
"typeName":"azure_sql_table",
"attributes":{
"owner":"admin",
"temporary":false,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
"name":"SalesOrderTable",
"description":"Sales Order Table added via Kafka"
},
"relationshipAttributes":{
"columns":[
{
"guid":"-1102395743156037",
"typeName":"azure_sql_column",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID"
}
},
{
"guid":"-1102395743156038",
"typeName":"azure_sql_column",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate"
}
}
]
},
"guid":"-1102395743156036",
"version":0
}
],
"referredEntities":{
"-1102395743156037":{
"typeName":"azure_sql_column",
"attributes":{
"owner":null,
"userTypeId":61,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID",
"precision":23,
"length":8,
"description":"Sales Order ID",
"scale":3,
"name":"OrderID",
"data_type":"int"
},
"relationshipAttributes":{
"table":{
"guid":"-1102395743156036",
"typeName":"azure_sql_table",
"entityStatus":"ACTIVE",
"displayText":"SalesOrderTable",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
}
}
},
"guid":"-1102395743156037",
"version":2
},
"-1102395743156038":{
"typeName":"azure_sql_column",
"attributes":{
"owner":null,
"userTypeId":61,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate",
"description":"Sales Order Date",
"scale":3,
"name":"OrderDate",
"data_type":"datetime"
},
"relationshipAttributes":{
"table":{
"guid":"-1102395743156036",
"typeName":"azure_sql_table",
"entityStatus":"ACTIVE",
"displayText":"SalesOrderTable",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
}
}
},
"guid":"-1102395743156038",
"status":"ACTIVE",
"createdBy":"ServiceAdmin",
"version":0
}
}
}
},
"version":{
"version":"1.0.0"
},
"msgCompressionKind":"NONE",
"msgSplitIdx":1,
"msgSplitCount":1
}
次に、イベント プロセッサを使用してイベント ハブからメッセージを受信する .NET Core コンソール アプリケーションを記述する方法について説明します。 イベント プロセッサは、イベント ハブからの永続的なチェックポイントと並列受信を管理します。 これにより、イベントを受信するプロセスが簡略化されます。 Microsoft Purview からメッセージを受信するには、ATLAS_ENTITIES イベント ハブを使用する必要があります。
Microsoft Purview からメッセージを受信するには、 マネージド Event Hubs または Event Hubs通知構成が必要です。
警告
Event Hubs SDK では、使用可能な最新バージョンの Storage API が使用されます。 そのバージョンは、必ずしも Stack Hub プラットフォームで使用できるとは限りません。 Azure Stack Hub でこのコードを実行すると、使用している特定のバージョンをターゲットにしない限り、ランタイム エラーが発生します。 チェックポイント ストアとしてAzure Blob Storageを使用している場合は、Azure Stack Hub ビルドでサポートされている Azure Storage API バージョンを確認し、コードでそのバージョンをターゲットにします。
ストレージ サービスの使用可能な最も高いバージョンは、バージョン 2019-02-02 です。 既定では、Event Hubs SDK クライアント ライブラリでは、Azure で使用可能な最高バージョン (SDK のリリース時点で 2019-07-07) が使用されます。 Azure Stack Hub バージョン 2005 を使用している場合は、このセクションの手順に従うだけでなく、Storage サービス API バージョン 2019-02-02 を対象とするコードも追加する必要があります。 特定の Storage API バージョンをターゲットにする方法については、 GitHub のこのサンプルを参照してください。
チェックポイント ストアとして Azure Storage を使用します。 Azure Storage アカウントを作成するには、次の手順に従います。
-
接続文字列とコンテナー名を書き留めます。 受信コードで使用します。
- ソリューション エクスプローラー ウィンドウで、EventHubQuickStart ソリューションを選択して長押し (または右クリック) し、[追加] をポイントして、[新しいプロジェクト] を選択します。
- [ コンソール アプリ (.NET Core)] を選択し、[ 次へ] を選択します。
- プロジェクト名に「PurviewKafkaConsumer」と入力し、[作成] を選択します。
メニューから [ツール>NuGet パッケージ マネージャー>Package Manager コンソール ] を選択します。
次のコマンドを実行して、 Azure.Messaging.EventHubs NuGet パッケージをインストールします。
Install-Package Azure.Messaging.EventHubs
次のコマンドを実行して、 Azure.Messaging.EventHubs.Processor NuGet パッケージをインストールします。
Install-Package Azure.Messaging.EventHubs.Processor
Program.cs ファイルの先頭に次の
using
ステートメントを追加します。using System; using System.Text; using System.Threading.Tasks; using Azure.Storage.Blobs; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Processor;
Event Hubs 接続文字列とイベント ハブ名の
Program
クラスに定数を追加します。 角かっこ内のプレースホルダーを、イベント ハブとストレージ アカウント (アクセス キー - プライマリ 接続文字列) の作成時に取得した実際の値に置き換えます。{Event Hubs namespace connection string}
が、イベント ハブ文字列ではなく名前空間レベルの接続文字列であることを確認します。private const string ehubNamespaceConnectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>"; private const string eventHubName = "<EVENT HUB NAME>"; private const string blobStorageConnectionString = "<AZURE STORAGE CONNECTION STRING>"; private const string blobContainerName = "<BLOB CONTAINER NAME>";
Microsoft Purview にメッセージを 送信するときは、ATLAS_ENTITIESをイベント ハブ名として使用します。
Main
メソッドを次のasync Main
メソッドに置き換えます。 詳細については、コードのコメントを参照してください。static async Task Main() { // Read from the default consumer group: $Default string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName; // Create a blob container client that the event processor will use BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName); // Create an event processor client to process events in the event hub EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName); // Register handlers for processing events and handling errors processor.ProcessEventAsync += ProcessEventHandler; processor.ProcessErrorAsync += ProcessErrorHandler; // Start the processing await processor.StartProcessingAsync(); // Wait for 10 seconds for the events to be processed await Task.Delay(TimeSpan.FromSeconds(10)); // Stop the processing await processor.StopProcessingAsync(); }
次に、次のイベント ハンドラー メソッドとエラー ハンドラー メソッドを クラスに追加します。
static async Task ProcessEventHandler(ProcessEventArgs eventArgs) { // Write the body of the event to the console window Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray())); // Update checkpoint in the blob storage so that the app receives only new events the next time it's run await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken); } static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs) { // Write details about the error to the console window Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen."); Console.WriteLine(eventArgs.Exception.Message); return Task.CompletedTask; }
プロジェクトをビルドします。 エラーがないことを確認します。
注意
詳細なコメントを含む完全なソース コードについては、 GitHub のこのファイルを参照してください。
受信側アプリケーションを実行します。
{
"version":
{"version":"1.0.0",
"versionParts":[1]
},
"msgCompressionKind":"NONE",
"msgSplitIdx":1,
"msgSplitCount":1,
"msgSourceIP":"10.244.155.5",
"msgCreatedBy":
"",
"msgCreationTime":1618588940869,
"message":{
"type":"ENTITY_NOTIFICATION_V2",
"entity":{
"typeName":"azure_sql_table",
"attributes":{
"owner":"admin",
"createTime":0,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
"name":"SalesOrderTable",
"description":"Sales Order Table"
},
"guid":"ead5abc7-00a4-4d81-8432-d5f6f6f60000",
"status":"ACTIVE",
"displayText":"SalesOrderTable"
},
"operationType":"ENTITY_UPDATE",
"eventTime":1618588940567
}
}
GitHub のその他の例を確認してください。