Editja

Ixxerja permezz ta’


Change feed in the Azure Cosmos DB for Apache Cassandra

APPLIES TO: Cassandra

Change feed support in the Azure Cosmos DB for Apache Cassandra is available through the query predicates in the Cassandra Query Language (CQL). Using these predicate conditions, you can query the change feed API. Applications can get the changes made to a table using the primary key (also known as the partition key) as is required in CQL. You can then take further actions based on the results. Changes to the rows in the table are captured in the order of their modification time and the sort order per partition key.

The following example shows how to get a change feed on all the rows in a API for Cassandra Keyspace table using .NET. The predicate COSMOS_CHANGEFEED_START_TIME() is used directly within CQL to query items in the change feed from a specified start time (in this case current datetime). You can download the full sample, for C# here and for Java here.

In each iteration, the query resumes at the last point changes were read, using paging state. We can see a continuous stream of new changes to the table in the Keyspace. We will see changes to rows that are inserted, or updated. Watching for delete operations using change feed in API for Cassandra is currently not supported.

Note

Reusing a token after dropping a collection and then recreating it with the same name results in an error. We advise you to set the pageState to null when creating a new collection and reusing collection name.

    Session cassandraSession = utils.getSession();

    try {
        DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");  
        LocalDateTime now = LocalDateTime.now().minusHours(6).minusMinutes(30);  
        String query="SELECT * FROM uprofile.user where COSMOS_CHANGEFEED_START_TIME()='" 
            + dtf.format(now)+ "'";
        
        byte[] token=null; 
        System.out.println(query); 
        while(true)
        {
            SimpleStatement st=new  SimpleStatement(query);
            st.setFetchSize(100);
            if(token!=null)
                st.setPagingStateUnsafe(token);
            
            ResultSet result=cassandraSession.execute(st) ;
            token=result.getExecutionInfo().getPagingState().toBytes();
            
            for(Row row:result)
            {
                System.out.println(row.getString("user_name"));
            }
        }
    } finally {
        utils.close();
        LOGGER.info("Please delete your table after verifying the presence of the data in portal or from CQL");
    }

In order to get the changes to a single row by primary key, you can add the primary key in the query. The following example shows how to track changes for the row where "user_id = 1"

    String query="SELECT * FROM uprofile.user where user_id=1 and COSMOS_CHANGEFEED_START_TIME()='" 
                       + dtf.format(now)+ "'";
    SimpleStatement st=new  SimpleStatement(query);

Current limitations

The following limitations are applicable when using change feed with API for Cassandra:

  • Inserts and updates are currently supported. Delete operation is not yet supported. As a workaround, you can add a soft marker on rows that are being deleted. For example, add a field in the row called "deleted" and set it to "true".
  • Last update is persisted as in core API for NoSQL and intermediate updates to the entity are not available.

Error handling

The following error codes and messages are supported when using change feed in API for Cassandra:

  • HTTP error code 429 - When the change feed is rate limited, it returns an empty page.

Next steps