Dela via


Ändringsflöde i Azure Cosmos DB för Apache Cassandra

GÄLLER FÖR: Kassandra

Stöd för ändringsflöde i Azure Cosmos DB för Apache Cassandra är tillgängligt via frågepredikaten i Cassandra Query Language (CQL). Med hjälp av dessa predikatvillkor kan du fråga ändringsflödes-API:et. Program kan hämta de ändringar som görs i en tabell med hjälp av den primära nyckeln (även kallad partitionsnyckeln) som krävs i CQL. Du kan sedan vidta ytterligare åtgärder baserat på resultaten. Ändringar i raderna i tabellen samlas in i ordning efter ändringstiden och sorteringsordningen per partitionsnyckel.

I följande exempel visas hur du hämtar ett ändringsflöde på alla rader i en API för Cassandra Keyspace-tabell med hjälp av .NET. Predikatet COSMOS_CHANGEFEED_START_TIME() används direkt i CQL för att fråga efter objekt i ändringsflödet från en angiven starttid (i det här fallet aktuell datetime). Du kan ladda ned hela exemplet, för C# här och för Java här.

I varje iteration, återupptas frågan vid den senaste tidpunkten ändringar lästes, med hjälp av växlingstillstånd. Vi kan se en kontinuerlig ström av nya ändringar i tabellen i nyckelområdet. Vi kommer att se ändringar i rader som infogas eller uppdateras. Det går för närvarande inte att titta efter borttagningsåtgärder med hjälp av ändringsflöde i API:et för Cassandra.

Kommentar

Om du återanvänder en token efter att en samling har släppts och sedan återskapats med samma namn resulterar det i ett fel. Vi rekommenderar att du anger pageState till null när du skapar en ny samling och återanvänder samlingsnamnet.

    Session cassandraSession = utils.getSession();

    try {
        DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");  
        LocalDateTime now = LocalDateTime.now().minusHours(6).minusMinutes(30);  
        String query="SELECT * FROM uprofile.user where COSMOS_CHANGEFEED_START_TIME()='" 
            + dtf.format(now)+ "'";
        
        byte[] token=null; 
        System.out.println(query); 
        while(true)
        {
            SimpleStatement st=new  SimpleStatement(query);
            st.setFetchSize(100);
            if(token!=null)
                st.setPagingStateUnsafe(token);
            
            ResultSet result=cassandraSession.execute(st) ;
            token=result.getExecutionInfo().getPagingState().toBytes();
            
            for(Row row:result)
            {
                System.out.println(row.getString("user_name"));
            }
        }
    } finally {
        utils.close();
        LOGGER.info("Please delete your table after verifying the presence of the data in portal or from CQL");
    }

För att få ändringarna till en enskild rad efter primärnyckel kan du lägga till den primära nyckeln i frågan. I följande exempel visas hur du spårar ändringar för raden där "user_id = 1"

    String query="SELECT * FROM uprofile.user where user_id=1 and COSMOS_CHANGEFEED_START_TIME()='" 
                       + dtf.format(now)+ "'";
    SimpleStatement st=new  SimpleStatement(query);

Aktuella begränsningar

Följande begränsningar gäller när du använder ändringsflöde med API för Cassandra:

  • Infogningar och uppdateringar stöds för närvarande. Borttagningsåtgärden stöds inte ännu. Som en lösning kan du lägga till en mjuk markör på rader som tas bort. Lägg till exempel till ett fält på raden med namnet "borttaget" och ange det till "true".
  • Den senaste uppdateringen sparas som i kärn-API:et för NoSQL och mellanliggande uppdateringar av entiteten är inte tillgängliga.

Felhantering

Följande felkoder och meddelanden stöds när du använder ändringsflöde i API för Cassandra:

  • HTTP-felkod 429 – När ändringsflödet är hastighetsbegränsad returneras en tom sida.

Nästa steg