Kanál změn ve službě Azure Cosmos DB for Apache Cassandra

PLATÍ PRO: Cassandra

Podpora kanálu změn ve službě Azure Cosmos DB for Apache Cassandra je dostupná prostřednictvím predikátů dotazů v jazyce CQL (Cassandra Query Language). Pomocí těchto predikátových podmínek můžete dotazovat rozhraní API kanálu změn. Aplikace můžou získat změny provedené v tabulce pomocí primárního klíče (označovaného také jako klíč oddílu), jak je vyžadováno v CQL. Na základě výsledků pak můžete provést další akce. Změny řádků v tabulce se zaznamenávají v pořadí podle času jejich úpravy a pořadí řazení podle klíče oddílu.

Následující příklad ukazuje, jak pomocí .NET získat kanál změn na všech řádcích v tabulce rozhraní API pro Cassandra Keyspace. Predikát COSMOS_CHANGEFEED_START_TIME() se používá přímo v CQL k dotazování položek v kanálu změn ze zadaného počátečního času (v tomto případě aktuálního data a času). Úplnou ukázku pro C# si můžete stáhnout tady a pro Javu tady.

V každé iteraci se dotaz obnoví v posledním bodě, kdy byly přečteny změny pomocí stavu stránkování. V prostoru klíčů vidíme nepřetržitý proud nových změn v tabulce. Uvidíme změny vložených nebo aktualizovaných řádků. Sledování operací odstranění pomocí kanálu změn v rozhraní API pro Cassandra se v současné době nepodporuje.

Poznámka

Opětovném použitím tokenu po vyřazení kolekce a jeho opětovném vytvoření se stejným názvem dojde k chybě. Při vytváření nové kolekce a opakovaném použádání názvu kolekce doporučujeme nastavit pageState na hodnotu null.

    Session cassandraSession = utils.getSession();

    try {
        DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");  
        LocalDateTime now = LocalDateTime.now().minusHours(6).minusMinutes(30);  
        String query="SELECT * FROM uprofile.user where COSMOS_CHANGEFEED_START_TIME()='" 
            + dtf.format(now)+ "'";
        
        byte[] token=null; 
        System.out.println(query); 
        while(true)
        {
            SimpleStatement st=new  SimpleStatement(query);
            st.setFetchSize(100);
            if(token!=null)
                st.setPagingStateUnsafe(token);
            
            ResultSet result=cassandraSession.execute(st) ;
            token=result.getExecutionInfo().getPagingState().toBytes();
            
            for(Row row:result)
            {
                System.out.println(row.getString("user_name"));
            }
        }
    } finally {
        utils.close();
        LOGGER.info("Please delete your table after verifying the presence of the data in portal or from CQL");
    }

Pokud chcete získat změny v jednom řádku podle primárního klíče, můžete do dotazu přidat primární klíč. Následující příklad ukazuje, jak sledovat změny pro řádek, kde "user_id = 1"

    String query="SELECT * FROM uprofile.user where user_id=1 and COSMOS_CHANGEFEED_START_TIME()='" 
                       + dtf.format(now)+ "'";
    SimpleStatement st=new  SimpleStatement(query);

Aktuální omezení

Při použití kanálu změn s rozhraním API pro Cassandra platí následující omezení:

  • Vložení a aktualizace jsou v současné době podporovány. Operace odstranění se zatím nepodporuje. Jako alternativní řešení můžete na řádky, které se odstraňují, přidat měkkou značku. Do řádku například přidejte pole s názvem "odstraněno" a nastavte ho na true.
  • Poslední aktualizace se zachovají jako v základním rozhraní API pro NoSQL a zprostředkující aktualizace entity nejsou k dispozici.

Zpracování chyb

Při použití kanálu změn v rozhraní API pro Cassandra se podporují následující kódy chyb a zprávy:

  • Kód chyby HTTP 429 – Pokud je kanál změn omezený, vrátí prázdnou stránku.

Další kroky