Változáscsatorna az Apache Cassandra Azure Cosmos DB-ben
A KÖVETKEZŐKRE VONATKOZIK: Cassandra
Az Apache Cassandra-hoz készült Azure Cosmos DB változáscsatorna-támogatása a Cassandra lekérdezési nyelvének (CQL) lekérdezési predikátumain keresztül érhető el. Ezekkel a predikátumfeltételekkel lekérdezheti a változáscsatorna API-t. Az alkalmazások a CQL-ben szükséges elsődleges kulccsal (más néven partíciókulcstal) lekérhetik a táblák módosításait. Ezután további műveleteket is végrehajthat az eredmények alapján. A táblázat sorainak módosításait a módosítási idő és a partíciókulcsonkénti rendezési sorrend szerint rögzíti a rendszer.
Az alábbi példa bemutatja, hogyan lehet változáscsatornát lekérni a Cassandra Keyspace-tábla összes sorára a .NET használatával. A COSMOS_CHANGEFEED_START_TIME() predikátumot a rendszer közvetlenül a CQL-ben használja a változáscsatorna elemeinek lekérdezéséhez egy megadott kezdési időpontból (ebben az esetben az aktuális dátumidőből). A teljes mintát itt töltheti le c#- ra és Javára itt.
Az egyes iterációkban a lekérdezés az utolsó pont módosításainak olvasása után folytatódik lapozási állapot használatával. A tábla új módosításainak folyamatos streamje látható a Kulcstérben. Látni fogjuk a beszúrt vagy frissített sorok módosításait. A Cassandra API változáscsatornával végzett törlési műveleteinek figyelése jelenleg nem támogatott.
Megjegyzés
Ha egy gyűjtemény elvetése után újrahasznál egy jogkivonatot, majd ugyanazzal a névvel újra létrehozni, az hibát eredményez. Javasoljuk, hogy új gyűjtemény létrehozásakor és a gyűjtemény nevének újrafelhasználásakor állítsa a pageState értéket null értékre.
Session cassandraSession = utils.getSession();
try {
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
LocalDateTime now = LocalDateTime.now().minusHours(6).minusMinutes(30);
String query="SELECT * FROM uprofile.user where COSMOS_CHANGEFEED_START_TIME()='"
+ dtf.format(now)+ "'";
byte[] token=null;
System.out.println(query);
while(true)
{
SimpleStatement st=new SimpleStatement(query);
st.setFetchSize(100);
if(token!=null)
st.setPagingStateUnsafe(token);
ResultSet result=cassandraSession.execute(st) ;
token=result.getExecutionInfo().getPagingState().toBytes();
for(Row row:result)
{
System.out.println(row.getString("user_name"));
}
}
} finally {
utils.close();
LOGGER.info("Please delete your table after verifying the presence of the data in portal or from CQL");
}
Ha egy sor módosításait elsődleges kulcs alapján szeretné lekérni, hozzáadhatja az elsődleges kulcsot a lekérdezéshez. Az alábbi példa bemutatja, hogyan követheti nyomon a "user_id = 1" sor módosításait
String query="SELECT * FROM uprofile.user where user_id=1 and COSMOS_CHANGEFEED_START_TIME()='"
+ dtf.format(now)+ "'";
SimpleStatement st=new SimpleStatement(query);
Aktuális korlátozások
A változáscsatorna Cassandra API-val való használatakor a következő korlátozások érvényesek:
- A beszúrások és frissítések jelenleg támogatottak. A törlési művelet még nem támogatott. Áthidaló megoldásként hozzáadhat egy helyreállítható jelölőt a törölt sorokhoz. Adjon hozzá például egy "törölt" nevű mezőt a sorba, és állítsa "igaz" értékre.
- Az utolsó frissítés megmarad, mint a NoSQL-hez készült alapvető API-ban, és az entitás köztes frissítései nem érhetők el.
Hibakezelés
A változáscsatorna Cassandra API-ban való használatakor a következő hibakódok és üzenetek támogatottak:
- HTTP-hibakód: 429 – Ha a változáscsatorna sebességkorlátos, üres lapot ad vissza.