Java 用Azure Event Hubs クライアント ライブラリ - バージョン 5.16.0

Azure Event Hubsは、1 秒あたり何百万ものイベントを取り込み、複数のコンシューマーにストリーミングできる、拡張性の高いパブリッシュ/サブスクライブ サービスです。 これにより、接続されているデバイスとアプリケーションによって生成される大量のデータを処理および分析できます。 Event Hubs がデータを収集したら、リアルタイム分析プロバイダーまたはバッチ処理/ストレージ アダプターを使用して、データを取得、変換、格納できます。 Azure Event Hubsの詳細については、「Event Hubs とは」を参照してください。

Azure Event Hubs クライアント ライブラリにより、Azure Event Hubs イベントの発行と利用が可能になります。また、次の用途に使われます。

  • ビジネス インテリジェンスと診断の目的で、アプリケーションに関するテレメトリを生成します。
  • アプリケーションの状態に関する事実を公開します。これにより、関係のある当事者が、アクションを起こすトリガーとして監視し使うことができます。
  • ビジネスや他のエコシステム内で発生する興味深い操作と相互作用を観察し、疎結合されたシステムが密接に結びつくことなくやり取りできるようにします。
  • 1 つ以上の発行元からイベントを受信し、エコシステムのニーズに合わせてそれらを変換し、変換されたイベントをコンシューマーが観察できるように新しいストリームに公開します。

ソースコード | API リファレンス ドキュメント | 製品ドキュメント | サンプル | トラブルシューティング

目次

作業の開始

前提条件

パッケージを組み込む

BOM ファイルを含める

ライブラリの一般提供 (GA) バージョンに依存するには、azure-sdk-bom をプロジェクトに含めてください。 次のスニペットでは、{bom_version_to_target} プレースホルダーをバージョン番号に置き換えます。 BOM の詳細については、 AZURE SDK BOM README に関するページを参照してください。

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-sdk-bom</artifactId>
            <version>{bom_version_to_target}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

次に、次に示すように、バージョン タグを使用せずに、依存関係セクションに直接依存関係を含めます。

<dependencies>
  <dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs</artifactId>
  </dependency>
</dependencies>

直接依存関係を含める

BOM に存在しない特定のバージョンのライブラリに依存する場合は、次のように直接依存関係をプロジェクトに追加します。

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs</artifactId>
    <version>5.16.0</version>
</dependency>

クライアントを認証する

Event Hubs クライアント ライブラリが Event Hub と対話するには、それに接続して承認する方法を理解する必要があります。

接続文字列を使用して Event Hub プロデューサーを作成する

これを行う最も簡単な方法は、Event Hubs 名前空間の作成時に自動的に作成される接続文字列を使用することです。 Azure の共有アクセス ポリシーに慣れていない場合は、詳細なガイドに従って Event Hubs 接続文字列を取得できます。

非同期と同期の両方の Event Hub プロデューサー クライアントとコンシューマー クライアントは、 を使用して EventHubClientBuilder作成できます。 を build*Client() 呼び出すと、同期プロデューサーまたはコンシューマーが作成され、 build*AsyncClient() 非同期に対応するが作成されます。

次のスニペットでは、同期イベント ハブ プロデューサーを作成します。

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventHubProducerClient producer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .buildProducerClient();

Microsoft ID プラットフォーム (旧称 Azure Active Directory) を使用して Event Hub クライアントを作成する

Azure SDK for Java では Azure Identity パッケージがサポートされているため、Microsoft ID プラットフォームから資格情報を簡単に取得できます。 まず、パッケージを追加します。

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-identity</artifactId>
    <version>1.10.1</version>
</dependency>

資格情報を要求するために実装されたすべての方法は、パッケージの下にあります com.azure.identity.credential 。 次のサンプルは、Azure Active Directory (AAD) アプリケーション クライアント シークレットを使用して、Azure Event Hubsで承認する方法を示しています。

DefaultAzureCredential を使用した承認

承認は、 DefaultAzureCredential を使用するのが最も簡単です。 実行中の環境で使用するのに最適な資格情報が見つかります。 Event Hubs で Azure Active Directory 承認を使用する方法の詳細については、 関連するドキュメントを参照してください

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventHubProducerClient producer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .buildProducerClient();

主要な概念

  • Event Hub プロデューサーは、埋め込みデバイス ソリューション、モバイル デバイス アプリケーション、コンソールまたはその他のデバイスで実行されているゲーム タイトル、クライアントまたはサーバー ベースのビジネス ソリューション、または Web サイトの一部として、テレメトリ データ、診断情報、使用状況ログ、またはその他のログ データのソースです。

  • Event Hub コンシューマーは、イベント ハブからそのような情報を取得して処理します。 処理には、集計、複雑な計算、フィルター処理が含まれる場合があります。 生データまたは変換された形式で情報を配布または保存する処理が含まれる場合もあります。 Event Hub コンシューマーは、多くの場合、Azure Stream Analytics、Apache Spark、Apache Storm などの組み込みの分析機能を備えた、堅牢で大規模なプラットフォーム インフラストラクチャ パーツです。

  • パーティションは、Event Hub に保持される順序付けされた一連のイベントです。 Azure Event Hubs では、パーティション分割されたコンシューマー パターンを介してメッセージ ストリーミングを提供し、各コンシューマーがメッセージ ストリームの特定のサブセット (パーティション) のみを読み取ります。 新しいイベントが到着すると、このシーケンスの末尾に追加されます。 パーティションの数は、Event Hub の作成時に指定され、変更することはできません。

  • コンシューマー グループはは、Event Hub 全体のビューです。 コンシューマー グループを使用すると、複数のコンシューマー アプリケーションが個別のイベント ストリーム ビューを持つことができるようになり、それぞれの場所から独自のペースでストリームを個別に読み取ることができます。 コンシューマー グループあたり最大 5 つのリーダーをパーティションに同時に設定できますが、特定のパーティションとコンシューマー グループの組み合わせには、アクティブな 1 つのコンシューマーのみをお勧めします。 アクティブな各リーダーは、そのパーティションからイベントを受信します。同じパーティションに複数のリーダーがある場合は、重複するイベントを受信します。

詳細な概念と詳細な説明については、「 Event Hubs の機能」を参照してください。 また、AMQP の概念については、 OASIS Advanced Messaging Queuing Protocol (AMQP) バージョン 1.0 に関するページを参照してください。

イベントを Event Hub に発行する

イベントを発行するには、非同期 EventHubProducerAsyncClient または同期 EventHubProducerClientの を作成する必要があります。 各プロデューサーは、特定のパーティションにイベントを送信することも、Event Hubs サービスが発行先のパーティション イベントを決定することもできます。 イベントの発行を高可用性にする必要がある場合、またはイベント データをパーティション間で均等に分散する必要がある場合は、自動ルーティングを使用することをお勧めします。

Event Hub プロデューサーを作成してイベントを発行する

開発者は、 を使用して を呼び出buildProducer*Client()してプロデューサーをEventHubClientBuilder作成できます。 を CreateBatchOptions.setPartitionId(String) 指定すると、イベントが特定のパーティションに送信されます。 が指定されていない場合 partitionId 、イベントは自動的にパーティションにルーティングされます。 を CreateBatchOptions.setPartitionKey(String) 指定すると、イベントをハッシュし、同じパーティションに送信するように Event Hubs サービスに指示されます。

次のスニペットでは、同期プロデューサーを作成し、イベントを任意のパーティションに送信し、Event Hubs サービスが使用可能なパーティションにイベントをルーティングできるようにします。

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

EventHubProducerClient producer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .buildProducerClient();

List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));
EventDataBatch eventDataBatch = producer.createBatch();

for (EventData eventData : allEvents) {
    if (!eventDataBatch.tryAdd(eventData)) {
        producer.send(eventDataBatch);
        eventDataBatch = producer.createBatch();

        // Try to add that event that couldn't fit before.
        if (!eventDataBatch.tryAdd(eventData)) {
            throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                + eventDataBatch.getMaxSizeInBytes());
        }
    }
}

// send the last batch of remaining events
if (eventDataBatch.getCount() > 0) {
    producer.send(eventDataBatch);
}

// Clients are expected to be long-lived objects.
// Dispose of the producer to close any underlying resources when we are finished with it.
producer.close();

EventDataBatch.tryAdd(EventData)はスレッド セーフではないことに注意してください。 複数のスレッドを使用してイベントを追加する場合は、必ずメソッド アクセスを同期してください。

パーティション識別子を使用してイベントを発行する

多くの Event Hub 操作は、特定のパーティションのスコープ内で実行されます。 どのクライアントでも、 または getEventHubProperties() を呼び出getPartitionIds()して、Event Hub インスタンスで に関するパーティション ID とメタデータを取得できます。

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

EventHubProducerClient producer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .buildProducerClient();

// Creating a batch with partitionId set will route all events in that batch to partition `0`.
CreateBatchOptions options = new CreateBatchOptions().setPartitionId("0");
EventDataBatch batch = producer.createBatch(options);

// Add events to batch and when you want to send the batch, send it using the producer.
producer.send(batch);

パーティション キーを使用してイベントを発行する

イベントのセットが特定のパーティションに関連付けられていない場合は、Event Hubs サービスが異なるイベントまたはイベントのバッチをまとめて同じパーティションに保持するように要求することが望ましい場合があります。 これは、イベントを発行するときに を partition key 設定することで実現できます。 次のシナリオでは、すべてのイベントが都市に関連しているため、パーティション キーが "cities" に設定された状態で送信されます。

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

EventHubProducerClient producer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .buildProducerClient();

List<EventData> events = Arrays.asList(new EventData("Melbourne"), new EventData("London"),
    new EventData("New York"));

SendOptions sendOptions = new SendOptions().setPartitionKey("cities");
producer.send(events, sendOptions);

Event Hub パーティションからのイベントを使用する

イベントを使用するには、 または EventHubConsumerClientEventHubConsumerAsyncClient特定のコンシューマー グループに対して作成します。 さらに、コンシューマーは、イベント ストリーム内のイベントの受信を開始する場所を指定する必要があります。

EventHubConsumerAsyncClient でイベントを使用する

次のスニペットでは、 から partitionId イベントを受信し、パーティションにプッシュされる最新のイベントのみをリッスンする非同期コンシューマーを作成します。 開発者は、別のパーティション ID で を呼び出receiveFromPartition(String, EventPosition)すことによって、同じ EventHubConsumerAsyncClient を使用して複数のパーティションからイベントの受信を開始できます。

EventHubConsumerAsyncClient consumer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        new DefaultAzureCredentialBuilder().build())
    .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
    .buildAsyncConsumerClient();

// Obtain partitionId from EventHubConsumerAsyncClient.getPartitionIds()
String partitionId = "0";
EventPosition startingPosition = EventPosition.latest();

// Keep a reference to `subscription`. When the program is finished receiving events, call
// subscription.dispose(). This will stop fetching events from the Event Hub.
//
// NOTE: This is a non-blocking call and will move to the next line of code after setting up the async
// operation.  If the program ends after this, or the class is immediately disposed, no events will be
// received.
Disposable subscription = consumer.receiveFromPartition(partitionId, startingPosition)
    .subscribe(partitionEvent -> {
        PartitionContext partitionContext = partitionEvent.getPartitionContext();
        EventData event = partitionEvent.getData();

        System.out.printf("Received event from partition '%s'%n", partitionContext.getPartitionId());
        System.out.printf("Contents of event as string: '%s'%n", event.getBodyAsString());
    }, error -> {
        // This is a terminal signal.  No more events will be received from the same Flux object.
        System.err.print("An error occurred:" + error);
    }, () -> {
        // This is a terminal signal.  No more events will be received from the same Flux object.
        System.out.print("Stream has ended.");
    });

EventHubConsumerClient でイベントを使用する

開発者は、 を使用してバッチでイベントを返す同期コンシューマーを EventHubConsumerClient作成できます。 次のスニペットでは、パーティションのイベント ストリームの先頭からイベントの読み取りを開始するコンシューマーが作成されます。

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventHubConsumerClient consumer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
    .buildConsumerClient();

Instant twelveHoursAgo = Instant.now().minus(Duration.ofHours(12));
EventPosition startingPosition = EventPosition.fromEnqueuedTime(twelveHoursAgo);
String partitionId = "0";

// Reads events from partition '0' and returns the first 100 received or until the 30 seconds has elapsed.
IterableStream<PartitionEvent> events = consumer.receiveFromPartition(partitionId, 100,
    startingPosition, Duration.ofSeconds(30));

Long lastSequenceNumber = -1L;
for (PartitionEvent partitionEvent : events) {
    // For each event, perform some sort of processing.
    System.out.print("Event received: " + partitionEvent.getData().getSequenceNumber());
    lastSequenceNumber = partitionEvent.getData().getSequenceNumber();
}

// Figure out what the next EventPosition to receive from is based on last event we processed in the stream.
// If lastSequenceNumber is -1L, then we didn't see any events the first time we fetched events from the
// partition.
if (lastSequenceNumber != -1L) {
    EventPosition nextPosition = EventPosition.fromSequenceNumber(lastSequenceNumber, false);

    // Gets the next set of events from partition '0' to consume and process.
    IterableStream<PartitionEvent> nextEvents = consumer.receiveFromPartition(partitionId, 100,
        nextPosition, Duration.ofSeconds(30));
}

EventProcessorClient を使用してイベントを使用する

Event Hub のすべてのパーティションのイベントを使用するには、特定のコンシューマー グループに 対して を EventProcessorClient 作成します。

EventProcessorClient 、イベントの処理を提供するコールバック関数に委任します。これにより、プロセッサが基になるコンシューマー操作を管理する責任を負いながら、値を提供するために必要なロジックに集中できます。

この例では、 を構築 EventProcessorClientすることに重点を置き、サンプルで使用できる を SampleCheckpointStore 使用し、Event Hub から受信したイベントを処理してコンソールに書き込むコールバック関数を使用します。 運用アプリケーションの場合は、Checkpoint Store のような永続ストア と Azure Storage BLOB を使用することをお勧めします。

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
    .consumerGroup("<< CONSUMER GROUP NAME >>")
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .checkpointStore(new SampleCheckpointStore())
    .processEvent(eventContext -> {
        System.out.printf("Partition id = %s and sequence number of event = %s%n",
            eventContext.getPartitionContext().getPartitionId(),
            eventContext.getEventData().getSequenceNumber());
    })
    .processError(errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
    })
    .buildEventProcessorClient();

トラブルシューティング

「TROUBLESHOOTING.md」を参照してください。

次のステップ

説明した以外にも、Azure Event Hubs クライアント ライブラリでは、Azure Event Hubs サービスの完全な機能セットを利用するための他の多くのシナリオのサポートが提供されています。 これらのシナリオの一部を調べるには、サンプル README をチェックします。

共同作成

このプロジェクトのアクティブなコントリビューターになりたい場合は、投稿 ガイドライン を参照してください。

インプレッション数