Változáscsatorna az Apache Cassandra Azure Cosmos DB-ben

A KÖVETKEZŐKRE VONATKOZIK: Cassandra

Az Apache Cassandra-hoz készült Azure Cosmos DB változáscsatorna-támogatása a Cassandra lekérdezési nyelvének (CQL) lekérdezési predikátumain keresztül érhető el. Ezekkel a predikátumfeltételekkel lekérdezheti a változáscsatorna API-t. Az alkalmazások a CQL-ben szükséges elsődleges kulccsal (más néven partíciókulcstal) lekérhetik a táblák módosításait. Ezután további műveleteket is végrehajthat az eredmények alapján. A táblázat sorainak módosításait a módosítási idő és a partíciókulcsonkénti rendezési sorrend szerint rögzíti a rendszer.

Az alábbi példa bemutatja, hogyan lehet változáscsatornát lekérni a Cassandra Keyspace-tábla összes sorára a .NET használatával. A COSMOS_CHANGEFEED_START_TIME() predikátumot a rendszer közvetlenül a CQL-ben használja a változáscsatorna elemeinek lekérdezéséhez egy megadott kezdési időpontból (ebben az esetben az aktuális dátumidőből). A teljes mintát itt töltheti le c#- ra és Javára itt.

Az egyes iterációkban a lekérdezés az utolsó pont módosításainak olvasása után folytatódik lapozási állapot használatával. A tábla új módosításainak folyamatos streamje látható a Kulcstérben. Látni fogjuk a beszúrt vagy frissített sorok módosításait. A Cassandra API változáscsatornával végzett törlési műveleteinek figyelése jelenleg nem támogatott.

Megjegyzés

Ha egy gyűjtemény elvetése után újrahasznál egy jogkivonatot, majd ugyanazzal a névvel újra létrehozni, az hibát eredményez. Javasoljuk, hogy új gyűjtemény létrehozásakor és a gyűjtemény nevének újrafelhasználásakor állítsa a pageState értéket null értékre.

    Session cassandraSession = utils.getSession();

    try {
        DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");  
        LocalDateTime now = LocalDateTime.now().minusHours(6).minusMinutes(30);  
        String query="SELECT * FROM uprofile.user where COSMOS_CHANGEFEED_START_TIME()='" 
            + dtf.format(now)+ "'";
        
        byte[] token=null; 
        System.out.println(query); 
        while(true)
        {
            SimpleStatement st=new  SimpleStatement(query);
            st.setFetchSize(100);
            if(token!=null)
                st.setPagingStateUnsafe(token);
            
            ResultSet result=cassandraSession.execute(st) ;
            token=result.getExecutionInfo().getPagingState().toBytes();
            
            for(Row row:result)
            {
                System.out.println(row.getString("user_name"));
            }
        }
    } finally {
        utils.close();
        LOGGER.info("Please delete your table after verifying the presence of the data in portal or from CQL");
    }

Ha egy sor módosításait elsődleges kulcs alapján szeretné lekérni, hozzáadhatja az elsődleges kulcsot a lekérdezéshez. Az alábbi példa bemutatja, hogyan követheti nyomon a "user_id = 1" sor módosításait

    String query="SELECT * FROM uprofile.user where user_id=1 and COSMOS_CHANGEFEED_START_TIME()='" 
                       + dtf.format(now)+ "'";
    SimpleStatement st=new  SimpleStatement(query);

Aktuális korlátozások

A változáscsatorna Cassandra API-val való használatakor a következő korlátozások érvényesek:

  • A beszúrások és frissítések jelenleg támogatottak. A törlési művelet még nem támogatott. Áthidaló megoldásként hozzáadhat egy helyreállítható jelölőt a törölt sorokhoz. Adjon hozzá például egy "törölt" nevű mezőt a sorba, és állítsa "igaz" értékre.
  • Az utolsó frissítés megmarad, mint a NoSQL-hez készült alapvető API-ban, és az entitás köztes frissítései nem érhetők el.

Hibakezelés

A változáscsatorna Cassandra API-ban való használatakor a következő hibakódok és üzenetek támogatottak:

  • HTTP-hibakód: 429 – Ha a változáscsatorna sebességkorlátos, üres lapot ad vissza.

Következő lépések