Канал изменений в Azure Cosmos DB для Apache Cassandra
Область применения: Кассандра
Поддержка канала изменений в Azure Cosmos DB для Apache Cassandra доступна через предикаты запросов на языке запросов Cassandra (CQL). Используя условия предикатов, можно отправлять запросы в API канала изменений. Приложения могут получать внесенные в таблицу изменения с помощью первичного ключа (также известного как ключ секции), как это требуется в CQL. На основе полученных результатов можно выполнять дальнейшие действия. Изменения записей в таблице фиксируются в порядке времени их изменения, и в каждом ключе секции соблюдается порядок сортировки.
В следующем примере показано, как получить веб-канал изменений во всех строках в таблице API для пространства ключей Cassandra с помощью .NET. Предикат COSMOS_CHANGEFEED_START_TIME() используется непосредственно в CQL для запрашивания элементов в канале изменений с указанного времени начала (в нашем случае это текущая дата и время). Полный пример для C# можно скачать здесь, а для Java — здесь.
В каждой итерации запрос возобновляется в момент чтения последних изменений с использованием состояния разбиения по страницам. В таблице в пространстве ключей можно увидеть непрерывный поток новых изменений. Отобразятся изменения в записях, которые вставляются или обновляются. Отслеживание операций удаления с помощью канала изменений в API для Cassandra в настоящее время не поддерживается.
Примечание.
Повторное использование маркера после удаления коллекции и последующего воссоздания с тем же именем приводит к ошибке. Рекомендуется задать для pageState значение null при создании новой коллекции и повторном использовании ее имени.
Session cassandraSession = utils.getSession();
try {
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
LocalDateTime now = LocalDateTime.now().minusHours(6).minusMinutes(30);
String query="SELECT * FROM uprofile.user where COSMOS_CHANGEFEED_START_TIME()='"
+ dtf.format(now)+ "'";
byte[] token=null;
System.out.println(query);
while(true)
{
SimpleStatement st=new SimpleStatement(query);
st.setFetchSize(100);
if(token!=null)
st.setPagingStateUnsafe(token);
ResultSet result=cassandraSession.execute(st) ;
token=result.getExecutionInfo().getPagingState().toBytes();
for(Row row:result)
{
System.out.println(row.getString("user_name"));
}
}
} finally {
utils.close();
LOGGER.info("Please delete your table after verifying the presence of the data in portal or from CQL");
}
Чтобы получить изменения в одной записи по первичному ключу, можно добавить первичный ключ в запрос. В примере ниже показано, как отслеживать изменения в строке, где "user_id = 1".
String query="SELECT * FROM uprofile.user where user_id=1 and COSMOS_CHANGEFEED_START_TIME()='"
+ dtf.format(now)+ "'";
SimpleStatement st=new SimpleStatement(query);
Текущие ограничения
Следующие ограничения применимы при использовании канала изменений с API для Cassandra:
- В настоящее время поддерживаются вставки и обновления. Операция удаления пока не поддерживается. В качестве обходного решения в удаляемые записи можно добавить программную метку. Например, добавьте поле в запись с именем "deleted" и задайте для него значение "true".
- Последнее обновление сохраняется, так как в базовом API noSQL и промежуточных обновлениях сущности недоступны.
Обработка ошибок
При использовании канала изменений в API для Cassandra поддерживаются следующие коды ошибок и сообщения:
- Код ошибки HTTP 429 — когда к каналу изменений применяется правило ограничения скорости, он возвращает пустую страницу.