Apache Cassandra için Azure Cosmos DB'de değişiklik akışı

ŞUNLAR IÇIN GEÇERLIDIR: Cassandra

Apache Cassandra için Azure Cosmos DB'de değişiklik akışı desteği, Cassandra Sorgu Dili'ndeki (CQL) sorgu koşullarından kullanılabilir. Bu koşul koşullarını kullanarak değişiklik akışı API'sini sorgulayabilirsiniz. Uygulamalar CQL'de gerekli olduğu gibi birincil anahtarı (bölüm anahtarı olarak da bilinir) kullanarak tabloda yapılan değişiklikleri alabilir. Ardından sonuçlara göre daha fazla eylem gerçekleştirebilirsiniz. Tablodaki satırlarda yapılan değişiklikler, değişiklik süreleri ve bölüm anahtarı başına sıralama düzeni sırasıyla yakalanır.

Aşağıdaki örnek, .NET kullanarak Cassandra Keyspace tablosu için API'deki tüm satırlarda değişiklik akışı alma işlemini gösterir. COSMOS_CHANGEFEED_START_TIME() koşulu, belirtilen başlangıç saatinden (bu örnekte geçerli tarih saat) değişiklik akışındaki öğeleri sorgulamak için doğrudan CQL içinde kullanılır. C# için ve Java için tam örneği buradan indirebilirsiniz.

Her yinelemede, disk belleği durumu kullanılarak sorgu son noktadaki değişiklikler okundu. Keyspace'te tabloda yapılan yeni değişikliklerin sürekli akışını görebiliriz. Eklenen veya güncelleştirilen satırlarda değişiklikler göreceğiz. Cassandra için API'de değişiklik akışı kullanılarak silme işlemlerinin izlenmesi şu anda desteklenmiyor.

Not

Bir koleksiyonu bıraktıktan sonra bir belirteci yeniden kullanarak aynı adla yeniden oluşturmak hataya neden olur. Yeni bir koleksiyon oluştururken ve koleksiyon adını yeniden kullanırken pageState değerini null olarak ayarlamanızı öneririz.

    Session cassandraSession = utils.getSession();

    try {
        DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");  
        LocalDateTime now = LocalDateTime.now().minusHours(6).minusMinutes(30);  
        String query="SELECT * FROM uprofile.user where COSMOS_CHANGEFEED_START_TIME()='" 
            + dtf.format(now)+ "'";
        
        byte[] token=null; 
        System.out.println(query); 
        while(true)
        {
            SimpleStatement st=new  SimpleStatement(query);
            st.setFetchSize(100);
            if(token!=null)
                st.setPagingStateUnsafe(token);
            
            ResultSet result=cassandraSession.execute(st) ;
            token=result.getExecutionInfo().getPagingState().toBytes();
            
            for(Row row:result)
            {
                System.out.println(row.getString("user_name"));
            }
        }
    } finally {
        utils.close();
        LOGGER.info("Please delete your table after verifying the presence of the data in portal or from CQL");
    }

Değişiklikleri birincil anahtara göre tek bir satıra almak için, birincil anahtarı sorguya ekleyebilirsiniz. Aşağıdaki örnekte, "user_id = 1" olan satırdaki değişikliklerin nasıl izlendiği gösterilmektedir

    String query="SELECT * FROM uprofile.user where user_id=1 and COSMOS_CHANGEFEED_START_TIME()='" 
                       + dtf.format(now)+ "'";
    SimpleStatement st=new  SimpleStatement(query);

Geçerli sınırlamalar

Cassandra için API ile değişiklik akışı kullanılırken aşağıdaki sınırlamalar geçerlidir:

  • Eklemeler ve güncelleştirmeler şu anda desteklenmektedir. Silme işlemi henüz desteklenmiyor. Geçici bir çözüm olarak, silinen satırlara geçici bir işaretçi ekleyebilirsiniz. Örneğin, satıra "deleted" adlı bir alan ekleyin ve bunu "true" olarak ayarlayın.
  • NoSQL için çekirdek API'de olduğu gibi son güncelleştirme kalıcı hale getirilir ve varlığa yönelik ara güncelleştirmeler kullanılamaz.

Hata işleme

Cassandra için API'de değişiklik akışı kullanılırken aşağıdaki hata kodları ve iletiler desteklenir:

  • HTTP hata kodu 429 - Değişiklik akışı hız sınırına sahip olduğunda boş bir sayfa döndürür.

Sonraki adımlar