Поделиться через


Канал изменений в Azure Cosmos DB для Apache Cassandra

Область применения: Кассандра

Поддержка канала изменений в Azure Cosmos DB для Apache Cassandra доступна через предикаты запросов на языке запросов Cassandra (CQL). Используя условия предикатов, можно отправлять запросы в API канала изменений. Приложения могут получать внесенные в таблицу изменения с помощью первичного ключа (также известного как ключ секции), как это требуется в CQL. На основе полученных результатов можно выполнять дальнейшие действия. Изменения записей в таблице фиксируются в порядке времени их изменения, и в каждом ключе секции соблюдается порядок сортировки.

В следующем примере показано, как получить веб-канал изменений во всех строках в таблице API для пространства ключей Cassandra с помощью .NET. Предикат COSMOS_CHANGEFEED_START_TIME() используется непосредственно в CQL для запрашивания элементов в канале изменений с указанного времени начала (в нашем случае это текущая дата и время). Полный пример для C# можно скачать здесь, а для Java — здесь.

В каждой итерации запрос возобновляется в момент чтения последних изменений с использованием состояния разбиения по страницам. В таблице в пространстве ключей можно увидеть непрерывный поток новых изменений. Отобразятся изменения в записях, которые вставляются или обновляются. Отслеживание операций удаления с помощью канала изменений в API для Cassandra в настоящее время не поддерживается.

Примечание.

Повторное использование маркера после удаления коллекции и последующего воссоздания с тем же именем приводит к ошибке. Рекомендуется задать для pageState значение null при создании новой коллекции и повторном использовании ее имени.

    Session cassandraSession = utils.getSession();

    try {
        DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");  
        LocalDateTime now = LocalDateTime.now().minusHours(6).minusMinutes(30);  
        String query="SELECT * FROM uprofile.user where COSMOS_CHANGEFEED_START_TIME()='" 
            + dtf.format(now)+ "'";
        
        byte[] token=null; 
        System.out.println(query); 
        while(true)
        {
            SimpleStatement st=new  SimpleStatement(query);
            st.setFetchSize(100);
            if(token!=null)
                st.setPagingStateUnsafe(token);
            
            ResultSet result=cassandraSession.execute(st) ;
            token=result.getExecutionInfo().getPagingState().toBytes();
            
            for(Row row:result)
            {
                System.out.println(row.getString("user_name"));
            }
        }
    } finally {
        utils.close();
        LOGGER.info("Please delete your table after verifying the presence of the data in portal or from CQL");
    }

Чтобы получить изменения в одной записи по первичному ключу, можно добавить первичный ключ в запрос. В примере ниже показано, как отслеживать изменения в строке, где "user_id = 1".

    String query="SELECT * FROM uprofile.user where user_id=1 and COSMOS_CHANGEFEED_START_TIME()='" 
                       + dtf.format(now)+ "'";
    SimpleStatement st=new  SimpleStatement(query);

Текущие ограничения

Следующие ограничения применимы при использовании канала изменений с API для Cassandra:

  • В настоящее время поддерживаются вставки и обновления. Операция удаления пока не поддерживается. В качестве обходного решения в удаляемые записи можно добавить программную метку. Например, добавьте поле в запись с именем "deleted" и задайте для него значение "true".
  • Последнее обновление сохраняется, так как в базовом API noSQL и промежуточных обновлениях сущности недоступны.

Обработка ошибок

При использовании канала изменений в API для Cassandra поддерживаются следующие коды ошибок и сообщения:

  • Код ошибки HTTP 429 — когда к каналу изменений применяется правило ограничения скорости, он возвращает пустую страницу.

Следующие шаги