Azure Cosmos DB for Apache Cassandra でフィードを変更する

適用対象: Cassandra

Azure Cosmos DB for Apache Cassandra の変更フィード サポートは、Cassandra Query Language (CQL) のクエリ述語を通じて利用できます。 これらの述語条件を使用して、フィード変更 API に対してクエリを実行できます。 アプリケーションでは、CQL で必要とされる主キー (パーティション キーとも呼ばれます) を使用して、テーブルへの変更を取得できます。 その後、結果に基づいてさらにアクションを実行できます。 テーブルの行に加えた変更は、変更時刻順かつ、パーティション キー別の並べ替え順序でキャプチャされます。

次の例では、.NET を使用して Cassandra 用 API のキー スペース テーブル内のすべての行で変更フィードを取得する方法を示します。 述語 COSMOS_CHANGEFEED_START_TIME () は CQL 内で直接使用され、指定された開始時刻 (この場合は現在の日付/時刻) か変更フィードの項目をクエリします。 C# はこちら、および Java はこちらから、完全なサンプルをダウンロードすることができます。

各反復処理では、ページング状態を使用して、最後に変更が読み取られた時点でクエリが再開されます。 キー スペースのテーブルに新しい変更のストリームが連続して表示されます。 挿入または更新された行の変更が表示されます。 Cassandra 用 API の変更フィードを使用した削除操作の監視は、現在はサポートされていません。

注意

コレクションを削除した後にトークンを再利用し、同じ名前で再作成すると、エラーが発生します。 新しいコレクションの作成時にコレクション名を再利用する場合は、pageState を null に設定することをお勧めします。

    Session cassandraSession = utils.getSession();

    try {
        DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");  
        LocalDateTime now = LocalDateTime.now().minusHours(6).minusMinutes(30);  
        String query="SELECT * FROM uprofile.user where COSMOS_CHANGEFEED_START_TIME()='" 
            + dtf.format(now)+ "'";
        
        byte[] token=null; 
        System.out.println(query); 
        while(true)
        {
            SimpleStatement st=new  SimpleStatement(query);
            st.setFetchSize(100);
            if(token!=null)
                st.setPagingStateUnsafe(token);
            
            ResultSet result=cassandraSession.execute(st) ;
            token=result.getExecutionInfo().getPagingState().toBytes();
            
            for(Row row:result)
            {
                System.out.println(row.getString("user_name"));
            }
        }
    } finally {
        utils.close();
        LOGGER.info("Please delete your table after verifying the presence of the data in portal or from CQL");
    }

主キーで 1 つの行に対する変更を取得するには、クエリに主キーを追加します。 次の例は、"user_id = 1" という行の変更履歴を残す方法を示しています。

    String query="SELECT * FROM uprofile.user where user_id=1 and COSMOS_CHANGEFEED_START_TIME()='" 
                       + dtf.format(now)+ "'";
    SimpleStatement st=new  SimpleStatement(query);

現在の制限

Cassandra 用 API で変更フィードを使用する場合、次の制限事項が適用されます。

  • 挿入と更新は現在サポートされています。 削除操作はまだサポートされていません。 回避策として、削除する行にソフト マーカーを追加できます。 たとえば、"削除済み" という行にフィールドを追加し、"true" に設定します。
  • 最新の更新プログラムは NoSQL 用 コア API のように永続化され、エンティティに対する中間更新プログラムは使用できません。

エラー処理

Cassandra 用 API で変更フィードを使用するときは、次のエラー コードとメッセージがサポートされます。

  • HTTP エラー コード 429 - 変更フィードのレートが制限されている場合、空のページが返されます。

次のステップ