.NET 用Azure Event Hubs クライアント ライブラリ - バージョン 5.9.3

Azure Event Hubsは、1 秒あたり何百万ものイベントを取り込み、複数のコンシューマーにストリーミングできる、拡張性の高い発行サブスクライブ サービスです。 これにより、接続されたデバイスとアプリケーションによって生成される大量のデータを処理して分析できます。 Event Hubs がデータを収集したら、リアルタイム分析プロバイダーを使用するか、バッチ処理/ストレージ アダプターを使用してデータを取得、変換、格納できます。 Azure Event Hubsの詳細については、「Event Hubs とは」を参照してください。

Azure Event Hubs クライアント ライブラリにより、Azure Event Hubs イベントの発行と利用が可能になります。また、次の用途に使われます。

  • ビジネス インテリジェンスと診断の目的で、アプリケーションに関するテレメトリを生成します。

  • アプリケーションの状態に関する事実を公開します。これにより、関係のある当事者が、アクションを起こすトリガーとして監視し使うことができます。

  • ビジネスや他のエコシステム内で発生する興味深い操作と相互作用を観察し、疎結合されたシステムが密接に結びつくことなくやり取りできるようにします。

  • 1 つ以上の発行元からイベントを受信し、エコシステムのニーズに合わせてそれらを変換し、変換されたイベントをコンシューマーが観察できるように新しいストリームに公開します。

ソースコード | パッケージ (NuGet) | API リファレンス ドキュメント | 製品ドキュメント | 移行ガイド | トラブルシューティング ガイド

作業の開始

前提条件

  • Azure サブスクリプション:Azure Event Hubsを含む Azure サービスを使用するには、サブスクリプションが必要です。 既存の Azure アカウントをお持ちでない場合は、無料試用版にサインアップするか、アカウントの作成時Visual Studio サブスクリプション特典を使用できます。

  • Event Hubs 名前空間とイベント ハブ:Azure Event Hubsと対話するには、名前空間と Event Hub も使用できる必要があります。 Azure リソースの作成に慣れていない場合は、Azure portalを使用して Event Hub を作成するためのステップ バイ ステップ ガイドに従ってください。 ここでは、Azure CLI、Azure PowerShell、または Azure Resource Manager (ARM) テンプレートを使用してイベント ハブを作成するための詳細な手順も確認できます。

  • C# 8.0:Azure Event Hubs クライアント ライブラリでは、C# 8.0 で導入された新機能が使用されます。 C# 8.0 構文を利用するには、 の言語バージョンlatest.NET Core SDK 3.0 以降を使用してコンパイルすることをお勧めします。

    C# 8.0 構文を最大限に活用したい Visual Studio ユーザーは、Visual Studio 2019 以降を使用する必要があります。 無料の Community エディションを含む Visual Studio 2019 は、[こちら](https://visualstudio.microsoft.com/vs/)からダウンロードできます。 Visual Studio 2017 のユーザーは、 Microsoft.Net.Compilers NuGet パッケージ を使用して言語バージョンを設定することで C# 8 構文を利用できます。ただし、編集エクスペリエンスは理想的ではない場合があります。

    以前の C# 言語バージョンでもライブラリを使用できますが、新しい構文の恩恵を受けるのではなく、非同期列挙可能メンバーと非同期破棄可能メンバーを手動で管理する必要があります。 以前のバージョンの .NET Core や .NET Framework を含め、.NET Core SDK でサポートされているフレームワーク バージョンは引き続き対象にすることができます。 詳細については、「 ターゲット フレームワークを指定する方法」を参照してください。
    重要な注意:サンプルサンプルを変更せずにビルドまたは実行するには、C# 11.0 を使用する必要があります。 他の言語バージョン用に調整する場合は、引き続きサンプルを実行できます。 その例は、「 以前の言語バージョン」のサンプルで入手できます。

Azure で Event Hubs リソースの基本的なセットをすばやく作成し、それらの接続文字列を受け取るために、次をクリックしてサンプル テンプレートをデプロイできます。

Azure にデプロイする

パッケージをインストールする

NuGet を使用して .NET 用のAzure Event Hubs クライアント ライブラリをインストールします。

dotnet add package Azure.Messaging.EventHubs

クライアントを認証する

Event Hubs クライアント ライブラリが Event Hub と対話するには、それに接続して承認する方法を理解する必要があります。 これを行う最も簡単な方法は、Event Hubs 名前空間の作成時に自動的に作成される接続文字列を使用することです。 Event Hubs での接続文字列の使用に慣れていない場合は、ステップ バイ ステップ ガイドに従って Event Hubs 接続文字列を取得できます。

接続文字列を作成したら、Event Hubs クライアントの種類のいずれかを使用して作成できます。

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

// It is recommended that you cache the Event Hubs clients for the lifetime of your
// application, closing or disposing when application ends.  This example disposes
// after the immediate scope for simplicity.

await using var producer = new EventHubProducerClient(connectionString, eventHubName);

資格情報の種類を使用して Event Hubs クライアントを認証する例については、「 Azure Active Directory (AAD) プリンシパルの使用」 または 「ID と共有アクセス資格情報 のサンプル」を参照してください。

ASP.NET Core アプリケーションの Event Hubs クライアントを認証する例については、「ASP.NET Core依存関係の挿入への登録」を参照してください。

主要な概念

  • Event Hub クライアントは、Event Hubs クライアント ライブラリと対話する開発者向けのプライマリ インターフェイスです。 複数の種類の Event Hub クライアントが用意されており、イベントの発行や使用など、Event Hubs の特定の用途にそれぞれ使用されます。

  • Event Hub プロデューサーは、テレメトリ データ、診断情報、使用状況ログ、他のログ データのソースとして機能するクライアントの一種であり、埋め込みデバイス ソリューション、モバイル デバイス アプリケーション、コンソールまたは他のデバイスで実行されているゲーム タイトル、一部のクライアントまたはサーバー ベースのビジネス ソリューション、または Web サイトの一部として機能します。

  • Event Hub コンシューマーは、Event Hub から情報を読み取り、その処理を許可するクライアントの一種です。 処理には、集計、複雑な計算、フィルター処理が含まれる場合があります。 生データまたは変換された形式で情報を配布または保存する処理が含まれる場合もあります。 Event Hub コンシューマーは、多くの場合、Azure Stream Analytics、Apache Spark、Apache Storm などの組み込みの分析機能を備えた、堅牢で大規模なプラットフォーム インフラストラクチャ パーツです。

  • パーティションは、Event Hub に保持される順序付けされた一連のイベントです。 パーティションは、イベント コンシューマーに必要な並列処理に関連付けられたデータ編成の手段です。 Azure Event Hubs では、パーティション分割されたコンシューマー パターンを介してメッセージ ストリーミングを提供し、各コンシューマーがメッセージ ストリームの特定のサブセット (パーティション) のみを読み取ります。 新しいイベントが到着すると、このシーケンスの末尾に追加されます。 パーティションの数は、Event Hub の作成時に指定され、変更することはできません。

  • コンシューマー グループはは、Event Hub 全体のビューです。 コンシューマー グループを使用すると、複数のコンシューマー アプリケーションが個別のイベント ストリーム ビューを持つことができるようになり、それぞれの場所から独自のペースでストリームを個別に読み取ることができます。 コンシューマー グループあたり最大 5 つのリーダーをパーティションに同時に設定できますが、特定のパーティションとコンシューマー グループの組み合わせには、アクティブな 1 つのコンシューマーのみをお勧めします。 各アクティブ リーダーでは、そのパーティションからすべてのイベントを受け取ります。同じパーティションに複数のリーダーがある場合は、重複するイベントを受信します。

その他の概念と詳細な説明については、「 Event Hubs の機能」を参照してください。

クライアントの有効期間

Event Hubs の各クライアントの種類は、アプリケーションの有効期間中にシングルトンとしてキャッシュして使用しても安全です。これは、イベントが定期的に発行または読み取られる場合のベスト プラクティスです。 クライアントは、ネットワーク、CPU、メモリの使用を効率的に管理し、非アクティブな期間中に使用率を低く保つ役割を担います。 ネットワーク リソースやその他のアンマネージド オブジェクトが適切にクリーンアップされるようにするには、クライアントで または DisposeAsyncCloseAsync呼び出す必要があります。

スレッド セーフ

すべてのクライアント インスタンス メソッドがスレッド セーフであり、相互に独立していることを保証します (ガイドライン)。 これにより、クライアント インスタンスの再利用に関する推奨事項は、スレッド間でも常に安全になります。

や などのEventDataEventDataBatchデータ モデル型はスレッド セーフではありません。 スレッド間で共有したり、クライアント メソッドと同時に使用したりしないでください。

その他の概念

クライアント オプション | エラーの | 処理診断 | あざける

Event Hub を検査する

多くの Event Hub 操作は、特定のパーティションのスコープ内で実行されます。 パーティションは Event Hub によって所有されているため、その名前は作成時に割り当てられます。 使用できるパーティションを理解するには、Event Hub クライアントのいずれかを使用して、Event Hub に対してクエリを実行します。 これらの例では、図示するために EventHubProducerClient を例として示していますが、概念と形式はクライアント全体で共通です。

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

// It is recommended that you cache the Event Hubs clients for the lifetime of your
// application, closing or disposing when application ends.  This example disposes
// after the immediate scope for simplicity.

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    string[] partitionIds = await producer.GetPartitionIdsAsync();
}

イベントを Event Hub に発行する

イベントを発行するには、EventHubProducerClient を作成する必要があります。 プロデューサーは、イベントをバッチで発行し、特定のパーティションを要求できます。また、イベントの発行先のパーティションを Event Hubs サービスで決定することもできます。 イベントの発行を高可用性にする必要がある場合、またはイベント データをパーティション間で均等に分散する必要がある場合は、自動ルーティングを使用することをお勧めします。 この例では、自動ルーティングを利用します。

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

// It is recommended that you cache the Event Hubs clients for the lifetime of your
// application, closing or disposing when application ends.  This example disposes
// after the immediate scope for simplicity.

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    using EventDataBatch eventBatch = await producer.CreateBatchAsync();

    if ((!eventBatch.TryAdd(new EventData("First"))) ||
        (!eventBatch.TryAdd(new EventData("Second"))))
    {
       throw new ApplicationException("Not all events could be added to the batch!");
    }

    await producer.SendAsync(eventBatch);
}

Event Hub からのイベントを読み取る

Event Hub からイベントを読み取るには、特定のコンシューマー グループの EventHubConsumerClient を作成する必要があります。 Event Hub が作成されると、Event Hubs の探索を開始するために使用できる既定のコンシューマーグループが提供されます。 この例では、反復子を使用して Event Hub に発行されたすべてのイベントの読み取りに焦点を当てます。

メモ: この使用に対するアプローチは、Event Hubs クライアント ライブラリの探索とプロトタイプ作成のエクスペリエンスを向上させることを目的としていることに注意してください。 運用環境のシナリオでは使用しないことをお勧めします。 運用環境で使用する場合は、堅牢で高パフォーマンスのエクスペリエンスを提供するため、イベント プロセッサ クライアントを使用することをお勧めします。

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

// It is recommended that you cache the Event Hubs clients for the lifetime of your
// application, closing or disposing when application ends.  This example disposes
// after the immediate scope for simplicity.

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsAsync(cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the Event Hub.  When an event
        // is available, the loop will iterate with the event that was received.  Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

Event Hub パーティションからイベントを読み取る

Event Hub パーティションのイベントを読み取るには、特定のコンシューマー グループの を EventHubConsumerClient 作成する必要があります。 Event Hub が作成されると、Event Hubs の探索を開始するために使用できる既定のコンシューマーグループが提供されます。 特定のパーティションから読み取るために、コンシューマーはイベント ストリーム内のイベントの受信を開始する場所も指定する必要があります。この例では、Event Hub の最初のパーティションに対して発行されたすべてのイベントを読み取る方法に重点を置きます。

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

// It is recommended that you cache the Event Hubs clients for the lifetime of your
// application, closing or disposing when application ends.  This example disposes
// after the immediate scope for simplicity.

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    EventPosition startingPosition = EventPosition.Earliest;
    string partitionId = (await consumer.GetPartitionIdsAsync()).First();

    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsFromPartitionAsync(partitionId, startingPosition, cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the partition.  When an event
        // is available, the loop will iterate with the event that was received.  Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

イベント プロセッサ クライアントを使用してイベントを処理する

運用シナリオの大部分では、 イベント プロセッサ クライアント を使用してイベントの読み取りと処理を行うことをお勧めします。 プロセッサは、状態を保持する手段を提供しながら、パフォーマンスとフォールト トレラントな方法でイベント ハブのすべてのパーティションにわたってイベントを処理するための堅牢なエクスペリエンスを提供することを目的としています。 イベント プロセッサ クライアントは、特定のイベント ハブのコンシューマー グループのコンテキスト内で協調的に動作することもできます。この場合、インスタンスがグループで使用可能になったり使用できなくなったりすると、分散と作業の分散が自動的に管理されます。

EventProcessorClient は、状態の永続化のために Azure Storage BLOB に依存しているため、プロセッサに BlobContainerClient を指定する必要があります。これは、使用する必要があるストレージ アカウントとコンテナーに構成されています。

var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

await processor.StartProcessingAsync();

try
{
    // The processor performs its work in the background; block until cancellation
    // to allow processing to take place.

    await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
    // This is expected when the delay is canceled.
}

try
{
    await processor.StopProcessingAsync();
}
finally
{
    // To prevent leaks, the handlers should be removed when processing is complete.

    processor.ProcessEventAsync -= processEventHandler;
    processor.ProcessErrorAsync -= processErrorHandler;
}

詳細については、イベント プロセッサ クライアント README と付属の サンプルを参照してください。

Event Hub クライアントでの Active Directory プリンシパルの使用

Azure Identity ライブラリは、Azure Active Directory (AAD) 認証サポートを提供します。これは、Event Hubs を含む Azure クライアント ライブラリに使用できます。

Active Directory プリンシパルを使用するために、Event Hubs クライアントの作成時に Azure.Identity ライブラリから使用できる資格情報の 1 つが指定されます。 さらに、Event Hubs の接続文字列の代わりに、完全修飾 Event Hubs 名前空間と目的のイベント ハブの名前が指定されます。 これらの例では、図示するために EventHubProducerClient を例として示していますが、概念と形式はクライアント全体で共通です。

var fullyQualifiedNamespace = "<< FULLY-QUALIFIED EVENT HUBS NAMESPACE (like something.servicebus.windows.net) >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var credential = new DefaultAzureCredential();

// It is recommended that you cache the Event Hubs clients for the lifetime of your
// application, closing or disposing when application ends.  This example disposes
// after the immediate scope for simplicity.

await using (var producer = new EventHubProducerClient(fullyQualifiedNamespace, eventHubName, credential))
{
    using EventDataBatch eventBatch = await producer.CreateBatchAsync();

    if ((!eventBatch.TryAdd(new EventData("First"))) ||
        (!eventBatch.TryAdd(new EventData("Second"))))
    {
       throw new ApplicationException("Not all events could be added to the batch!");
    }

    await producer.SendAsync(eventBatch);
}

Azure Active Directory を使用する場合は、ロールなどの Event Hubs へのアクセスを許可するロールがプリンシパルに Azure Event Hubs Data Owner 割り当てられている必要があります。 Event Hubs で Azure Active Directory 承認を使用する方法の詳細については、 関連するドキュメントを参照してください

依存関係の挿入に登録 ASP.NET Core

ASP.NET Core アプリケーションに依存関係として Event Hubs クライアントの 1 つを挿入するには、ASP.NET Core パッケージ用の Azure クライアント ライブラリ統合をインストールします。

dotnet add package Microsoft.Extensions.Azure

インストール後、目的の Event Hubs クライアントの種類を メソッドに Startup.ConfigureServices 登録します。

public void ConfigureServices(IServiceCollection services)
{
    services.AddAzureClients(builder =>
    {
        builder.AddEventHubProducerClient(Configuration.GetConnectionString("EventHubs"));
    });
  
    services.AddControllers();
}

上記のコードを使用するには、これをアプリケーションの構成に追加します。

{
  "ConnectionStrings": {
    "EventHubs": "<connection_string>"
  }
}

クライアントに共有 Azure.Identity 資格情報を使用することを好むアプリケーションの場合、登録は若干異なります。

var fullyQualifiedNamespace = "<< FULLY-QUALIFIED EVENT HUBS NAMESPACE (like something.servicebus.windows.net) >>";

public void ConfigureServices(IServiceCollection services)
{
    services.AddAzureClients(builder =>
    {
        // This will register the EventHubProducerClient using the default credential.
        builder.AddEventHubProducerClientWithNamespace(fullyQualifiedNamespace);

        // By default, DefaultAzureCredential is used, which is likely desired for most
        // scenarios. If you need to restrict to a specific credential instance, you could
        // register that instance as the default credential instead.
        builder.UseCredential(new ManagedIdentityCredential());
    });
  
    services.AddControllers();
}

詳細については、「 Azure SDK for .NET を使用した依存関係の挿入」を参照してください。

トラブルシューティング

トラブルシューティングの詳細については、 Event Hubs トラブルシューティング ガイドを参照してください。

ログ記録と診断

Event Hubs クライアント ライブラリは、.NET EventSource を使用して情報を出力するために、さまざまな詳細レベルで情報をログに記録するために完全にインストルメント化されています。 ログは各操作に対して実行され、操作の開始点、完了、および発生した例外をマークするパターンに従います。 分析情報を提供する可能性がある追加情報も、関連する操作のコンテキストでログに記録されます。

Event Hubs クライアント ログは、"Azure-Messaging-EventHubs" という名前のソースをオプトインするか、特徴が "AzureEventSource" であるすべてのソースをオプトインすることで、任意 EventListener のユーザーが使用できます。 Azure クライアント ライブラリからのログのキャプチャを容易にするために、 Azure.Core Event Hubs で使用されるライブラリには が AzureEventSourceListener用意されています。 詳細については、「 AzureEventSourceListener を使用した Event Hubs ログのキャプチャ」を参照してください。

Event Hubs クライアント ライブラリは、Application Insights または OpenTelemetry を使用した分散トレース用にもインストルメント化されています。 詳細については、 Azure.Core Diagnostics サンプルを参照してください。

次のステップ

Azure Event Hubs クライアント ライブラリでは、説明した入門シナリオに加えて、Azure Event Hubs サービスの完全な機能セットを利用するのに役立つ追加のシナリオのサポートが提供されています。 これらのシナリオの一部を調べるのに役立つ目的で、Event Hubs クライアント ライブラリには、一般的なシナリオの図として機能するサンプルのプロジェクトが用意されています。 詳細については、 README のサンプルを参照してください。

共同作成

このプロジェクトでは、共同作成と提案を歓迎しています。 ほとんどの共同作成では、共同作成者使用許諾契約書 (CLA) にご同意いただき、ご自身の共同作成内容を使用する権利を Microsoft に供与する権利をお持ちであり、かつ実際に供与することを宣言していただく必要があります。 詳細については、 https://cla.microsoft.com を参照してください。

pull request を送信すると、CLA を提供して PR (ラベル、コメントなど) を適宜装飾する必要があるかどうかを CLA ボットが自動的に決定します。 ボットによって提供される手順にそのまま従ってください。 この操作は、Microsoft の CLA を使用するすべてのリポジトリについて、1 回だけ行う必要があります。

このプロジェクトでは、Microsoft オープン ソースの倫理規定を採用しています。 詳しくは、「Code of Conduct FAQ (倫理規定についてよくある質問)」を参照するか、opencode@microsoft.com 宛てに質問またはコメントをお送りください。

詳細については、 投稿ガイド を参照してください。

インプレッション数