Ändringsflöde i Azure Cosmos DB för Apache Cassandra

GÄLLER FÖR: Cassandra

Stöd för ändringsflöde i Azure Cosmos DB för Apache Cassandra är tillgängligt via frågepredikaten i Cassandra Query Language (CQL). Med hjälp av dessa predikatvillkor kan du köra frågor mot API:et för ändringsflöde. Program kan hämta de ändringar som görs i en tabell med hjälp av primärnyckeln (även kallad partitionsnyckeln) som krävs i CQL. Du kan sedan vidta ytterligare åtgärder baserat på resultaten. Ändringar av raderna i tabellen registreras i ordning efter ändringstiden och sorteringsordningen per partitionsnyckel.

I följande exempel visas hur du hämtar ett ändringsflöde på alla rader i en API för Cassandra Keyspace-tabell med hjälp av .NET. Predikatet COSMOS_CHANGEFEED_START_TIME() används direkt i CQL för att fråga efter objekt i ändringsflödet från en angiven starttid (i det här fallet aktuell datetime). Du kan ladda ned det fullständiga exemplet för C# här och för Java här.

I varje iteration, återupptas frågan vid den senaste tidpunkten ändringar lästes, med hjälp av växlingstillstånd. Vi kan se en kontinuerlig ström med nya ändringar i tabellen i nyckelutrymmet. Vi kommer att se ändringar i rader som infogas eller uppdateras. Det finns för närvarande inte stöd för borttagningsåtgärder som använder ändringsflöde i API:et för Cassandra.

Anteckning

Om du återanvänder en token när du har släppt en samling och sedan återskapat den med samma namn resulterar det i ett fel. Vi rekommenderar att du anger pageState till null när du skapar en ny samling och återanvänder samlingsnamnet.

    Session cassandraSession = utils.getSession();

    try {
        DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");  
        LocalDateTime now = LocalDateTime.now().minusHours(6).minusMinutes(30);  
        String query="SELECT * FROM uprofile.user where COSMOS_CHANGEFEED_START_TIME()='" 
            + dtf.format(now)+ "'";
        
        byte[] token=null; 
        System.out.println(query); 
        while(true)
        {
            SimpleStatement st=new  SimpleStatement(query);
            st.setFetchSize(100);
            if(token!=null)
                st.setPagingStateUnsafe(token);
            
            ResultSet result=cassandraSession.execute(st) ;
            token=result.getExecutionInfo().getPagingState().toBytes();
            
            for(Row row:result)
            {
                System.out.println(row.getString("user_name"));
            }
        }
    } finally {
        utils.close();
        LOGGER.info("Please delete your table after verifying the presence of the data in portal or from CQL");
    }

För att kunna hämta ändringarna till en enskild rad efter primärnyckel kan du lägga till primärnyckeln i frågan. I följande exempel visas hur du spårar ändringar för raden där "user_id = 1"

    String query="SELECT * FROM uprofile.user where user_id=1 and COSMOS_CHANGEFEED_START_TIME()='" 
                       + dtf.format(now)+ "'";
    SimpleStatement st=new  SimpleStatement(query);

Aktuella begränsningar

Följande begränsningar gäller när du använder ändringsflöde med API för Cassandra:

  • Infogningar och uppdateringar stöds för närvarande. Borttagningsåtgärden stöds inte ännu. Som en lösning kan du lägga till en mjuk markör på rader som tas bort. Lägg till exempel till ett fält på raden med namnet "deleted" (borttaget) och ange det till "true".
  • Den senaste uppdateringen sparas eftersom kärn-API:et för NoSQL och mellanliggande uppdateringar av entiteten inte är tillgängliga.

Felhantering

Följande felkoder och meddelanden stöds när du använder ändringsflöde i API för Cassandra:

  • HTTP-felkod 429 – När ändringsflödet är frekvensbegränsad returneras en tom sida.

Nästa steg