Feed de alterações no Azure Cosmos DB para Apache Cassandra

APLICA-SE A: Cassandra

O suporte para feeds de alterações no Azure Cosmos DB para Apache Cassandra está disponível através dos predicados de consulta na Linguagem de Consulta para Cassandra (CQL). Com estas condições predicados, pode consultar a API do feed de alterações. As aplicações podem fazer as alterações a uma tabela com a chave primária (também conhecida como chave de partição), conforme é necessário no CQL. Em seguida, pode efetuar mais ações com base nos resultados. As alterações às linhas na tabela são capturadas pela ordem do tempo de modificação e da sequência de ordenação por chave de partição.

O exemplo seguinte mostra como obter um feed de alterações em todas as linhas numa API para a tabela do Keyspace para Cassandra com .NET. O predicado COSMOS_CHANGEFEED_START_TIME() é utilizado diretamente no CQL para consultar itens no feed de alterações a partir de uma hora de início especificada (neste caso, datetime atual). Pode transferir o exemplo completo para C# aqui e para Java aqui.

Em cada iteração, a consulta é retomada no último ponto em que as alterações foram lidas, utilizando o estado de paginação. Podemos ver um fluxo contínuo de novas alterações à tabela no Keyspace. Iremos ver alterações às linhas que são inseridas ou atualizadas. Atualmente, a monitorização das operações de eliminação com o feed de alterações na API para Cassandra não é suportada.

Nota

Reutilizar um token depois de remover uma coleção e, em seguida, recriá-la com o mesmo nome resulta num erro. Recomendamos que defina pageState como nulo ao criar uma nova coleção e reutilizar o nome da coleção.

    Session cassandraSession = utils.getSession();

    try {
        DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");  
        LocalDateTime now = LocalDateTime.now().minusHours(6).minusMinutes(30);  
        String query="SELECT * FROM uprofile.user where COSMOS_CHANGEFEED_START_TIME()='" 
            + dtf.format(now)+ "'";
        
        byte[] token=null; 
        System.out.println(query); 
        while(true)
        {
            SimpleStatement st=new  SimpleStatement(query);
            st.setFetchSize(100);
            if(token!=null)
                st.setPagingStateUnsafe(token);
            
            ResultSet result=cassandraSession.execute(st) ;
            token=result.getExecutionInfo().getPagingState().toBytes();
            
            for(Row row:result)
            {
                System.out.println(row.getString("user_name"));
            }
        }
    } finally {
        utils.close();
        LOGGER.info("Please delete your table after verifying the presence of the data in portal or from CQL");
    }

Para obter as alterações a uma única linha por chave primária, pode adicionar a chave primária na consulta. O exemplo seguinte mostra como controlar as alterações da linha em que "user_id = 1"

    String query="SELECT * FROM uprofile.user where user_id=1 and COSMOS_CHANGEFEED_START_TIME()='" 
                       + dtf.format(now)+ "'";
    SimpleStatement st=new  SimpleStatement(query);

Limitações atuais

As seguintes limitações são aplicáveis ao utilizar o feed de alterações com a API para Cassandra:

  • As inserções e atualizações são atualmente suportadas. A operação de eliminação ainda não é suportada. Como solução, pode adicionar um marcador suave em linhas que estão a ser eliminadas. Por exemplo, adicione um campo na linha denominado "eliminado" e defina-o como "verdadeiro".
  • A última atualização persiste, uma vez que a API principal para NoSQL e as atualizações intermédias à entidade não estão disponíveis.

Processamento de erros

Os seguintes códigos de erro e mensagens são suportados ao utilizar o feed de alterações na API para Cassandra:

  • Código de erro HTTP 429 - Quando o feed de alterações é limitado, devolve uma página vazia.

Passos seguintes