次の方法で共有


Azure Cosmos DB for NoSQL と変更フィード プロセッサを使用する Java アプリケーションを作成する方法

Azure Cosmos DB は、Microsoft が提供するフル マネージドの NoSQL データベース サービスです。 これにより、グローバルに分散され、拡張性の高いアプリケーションを簡単に構築できます。 このハウツー ガイドでは、Azure Cosmos DB for NoSQL データベースを使用し、リアルタイムデータ処理用の変更フィード プロセッサを実装する Java アプリケーションを作成するプロセスについて説明します。 Java アプリケーションは、Azure Cosmos DB Java SDK v4 を使用して Azure Cosmos DB for NoSQL と通信します。

Important

このチュートリアルは、Azure Cosmos DB Java SDK v4 のみを対象としています。 詳細については、Azure Cosmos DB Java SDK v4 リリース ノートMaven リポジトリAzure Cosmos DB の変更フィード プロセッサ、Azure Cosmos DB Java SDK v4 のトラブルシューティング ガイド を参照してください。 v4 より前のバージョンを現在使用している場合、v4 へのアップグレードについては、Azure Cosmos DB Java SDK v4 への移行ガイドを参照してください。

[前提条件]

  • Azure Cosmos DB アカウント: Azure portal から作成することも、 Azure Cosmos DB Emulator を使用することもできます。

  • Java 開発環境: 少なくとも 8 つのバージョンの Java Development Kit (JDK) がコンピューターにインストールされていることを確認します。

  • Azure Cosmos DB Java SDK V4: Azure Cosmos DB と対話するために必要な機能が提供されます。

バックグラウンド

Azure Cosmos DB の変更フィードには、多くの用途を持つドキュメントの挿入に応答してアクションをトリガーするイベント ドリブン インターフェイスが用意されています。

変更フィード イベントの管理作業は、主に SDK に組み込まれている変更フィード プロセッサ ライブラリによって処理されます。 このライブラリは、必要に応じて、変更フィード イベントを複数のワーカーに分散するのに十分な強力な機能です。 必要なのは、変更フィード ライブラリにコールバックを提供することです。

この Java アプリケーションの簡単な例は、Azure Cosmos DB と変更フィード プロセッサを使用したリアルタイムデータ処理を示しています。 アプリケーションは、データ ストリームをシミュレートするために、サンプル ドキュメントを "フィード コンテナー" に挿入します。 フィード コンテナーにバインドされた変更フィード プロセッサは、受信した変更を処理し、ドキュメントの内容をログに記録します。 プロセッサは、並列処理のリースを自動的に管理します。

ソースコード

SDK サンプル リポジトリを複製し、次の例を SampleChangeFeedProcessor.javaで見つけることができます。

git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples.git
cd azure-cosmos-java-sql-api-sample/src/main/java/com/azure/cosmos/examples/changefeed/

Walkthrough

  1. Azure Cosmos DB と Azure Cosmos DB Java SDK V4 を使用して Java アプリケーションで ChangeFeedProcessorOptions を構成します。 ChangeFeedProcessorOptionsには、データ処理中の変更フィード プロセッサの動作を制御するための重要な設定が用意されています。

    options = new ChangeFeedProcessorOptions();
    options.setStartFromBeginning(false);
    options.setLeasePrefix("myChangeFeedDeploymentUnit");
    options.setFeedPollDelay(Duration.ofSeconds(5));
    options.setFeedPollThroughputControlConfig(throughputControlGroupConfig);
    
  2. ホスト名、フィード コンテナー、リース コンテナー、データ処理ロジックなど、関連する構成を使用して ChangeFeedProcessor を 初期化します。 start() メソッドはデータ処理を開始し、フィード コンテナーからの受信データ変更の同時およびリアルタイム処理を可能にします。

    logger.info("Start Change Feed Processor on worker (handles changes asynchronously)");
    ChangeFeedProcessor changeFeedProcessorInstance = new ChangeFeedProcessorBuilder()
        .hostName("SampleHost_1")
        .feedContainer(feedContainer)
        .leaseContainer(leaseContainer)
        .handleChanges(handleChanges())
        .options(options)
        .buildChangeFeedProcessor();
    changeFeedProcessorInstance.start()
                               .subscribeOn(Schedulers.boundedElastic())
                               .subscribe();
    
  3. handleChanges() メソッドを使用して、受信データの変更を処理するデリゲートを指定します。 このメソッドは、Change Feed から受信した JsonNode ドキュメントを処理します。 開発者は、Change Feed によって提供される JsonNode ドキュメントを処理するための 2 つのオプションがあります。 1 つのオプションは、JsonNode の形式でドキュメントを操作することです。 これは特に、すべてのドキュメントに対して単一の均一なデータ モデルがない場合に最適です。 2 番目のオプション - JsonNode を JsonNode と同じ構造の POJO に変換します。 その後、POJO を操作できます。

    private static Consumer<List<JsonNode>> handleChanges() {
        return (List<JsonNode> docs) -> {
            logger.info("Start handleChanges()");
    
            for (JsonNode document : docs) {
                try {
                    //Change Feed hands the document to you in the form of a JsonNode
                    //As a developer you have two options for handling the JsonNode document provided to you by Change Feed
                    //One option is to operate on the document in the form of a JsonNode, as shown below. This is great
                    //especially if you do not have a single uniform data model for all documents.
                    logger.info("Document received: " + OBJECT_MAPPER.writerWithDefaultPrettyPrinter()
                            .writeValueAsString(document));
    
                    //You can also transform the JsonNode to a POJO having the same structure as the JsonNode,
                    //as shown below. Then you can operate on the POJO.
                    CustomPOJO2 pojo_doc = OBJECT_MAPPER.treeToValue(document, CustomPOJO2.class);
                    logger.info("id: " + pojo_doc.getId());
    
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
            isWorkCompleted = true;
            logger.info("End handleChanges()");
    
        };
    }
    
  4. Java アプリケーションをビルドして実行します。 アプリケーションは変更フィード プロセッサを起動し、フィード コンテナーにサンプル ドキュメントを挿入して、受信した変更を処理します。

Conclusion

このガイドでは、Azure Cosmos DB for NoSQL データベースを使用し、リアルタイムのデータ処理に変更フィード プロセッサを使用する Azure Cosmos DB Java SDK V4 を使用して Java アプリケーションを作成する方法について説明しました。 このアプリケーションを拡張して、より複雑なユース ケースを処理し、Azure Cosmos DB を使用して堅牢でスケーラブルなグローバル分散アプリケーションを構築できます。

その他のリソース

次のステップ

変更フィード推定機能の詳細については、次の記事を参照してください。