Alterar o modelo de pull do feed de alterações no Azure Cosmos DB
APLICA-SE A: NoSQL
Com o modelo de pull do feed de alterações, você pode consumir o feed de alterações do Azure Cosmos DB em seu próprio ritmo. Do mesmo modo do que já é possível fazer com o processador do feed de alterações, você pode usar o modelo de pull do feed de alterações para paralelizar o processamento de alterações entre vários consumidores de feed de alterações.
Comparar com o processador do feed de alterações
Muitos cenários podem processar o feed de alterações usando o processador do feed de alterações ou o modelo de pull do feed de alterações. Os tokens de continuação do modelo de pull e o contêiner de concessão do processador de feed de alterações atuam como indicadores para o último item processado ou lote de itens no feed de alterações.
No entanto, não é possível converter tokens de continuação em uma concessão ou vice-versa.
Observação
Na maioria dos casos, quando você precisa ler do feed de alterações, a opção mais simples é usar o processador do feed de alterações.
Você deve considerar o uso do modelo de pull nestes cenários:
- Para ler as alterações de uma chave de partição específica.
- Para controlar o ritmo no qual seu cliente recebe alterações para processamento.
- Para fazer uma leitura única dos dados existentes no feed de alterações (por exemplo, fazer uma migração de dados).
Aqui estão algumas diferenças importantes entre o processador do feed de alterações e o modelo de pull do feed de alterações:
Recurso | Alterar o processador de feed | Modelo de pull do feed de alterações |
---|---|---|
Acompanhar o ponto atual no processamento do feed de alterações | Concessão (armazenada em um contêiner do Azure Cosmos DB) | Token de continuação (armazenado na memória ou persistido manualmente) |
Capacidade de reproduzir alterações passadas | Sim, com o modelo de push | Sim, com o modelo de pull |
Sondagem para alterações futuras | Verifica automaticamente se há alterações com base no valor WithPollInterval especificado pelo usuário |
Manual |
Comportamento em que não há novas alterações | Aguarde automaticamente o valor para WithPollInterval e, em seguida, verifique novamente |
Deve verificar o status e verificar novamente manualmente |
Processar alterações de todo o contêiner | Sim, e paralelizado automaticamente em vários threads e computadores consumindo do mesmo contêiner | Sim, e paralelizado manualmente usando FeedRange |
Processar alterações de apenas uma chave de partição | Sem suporte | Sim |
Observação
Ao contrário da leitura usando o processador do feed de alterações, ao usar o modelo de pull você deve lidar explicitamente com casos em que não há novas alterações.
Trabalhando com o modelo de pull
Para processar o feed de alterações usando o modelo de pull, crie uma instância de FeedIterator
. Ao criar inicialmente um FeedIterator
, você deve especificar um valor de ChangeFeedStartFrom
obrigatório, que consiste na posição inicial para ler as alterações e o FeedRange
desejado. O FeedRange
é um intervalo de valores de chave de partição e especifica os itens que podem ser lidos do feed de alterações usando esse FeedIterator
específico. Você também deve especificar um valor de ChangeFeedMode
necessário para o modo no qual deseja processar alterações: versão mais recente ou todas as versões e exclusões. Use ChangeFeedMode.LatestVersion
ou ChangeFeedMode.AllVersionsAndDeletes
para indicar em qual modo você deseja ler o feed de alterações. Ao usar o modo todas as versões e exclusões, você deve selecionar um início do feed de alterações do valor de Now()
ou de um token de continuação específico.
Opcionalmente, você pode especificar ChangeFeedRequestOptions
para definir um PageSizeHint
. Quando definida, essa propriedade define o número máximo de itens recebidos por página. Se as operações na coleção monitorada forem executadas por meio de procedimentos armazenados, o escopo da transação será preservado durante a leitura de itens do feed de alterações. Como resultado, o número de itens recebidos pode ser maior que o valor especificado para que os itens alterados pela mesma transação sejam retornados como parte de um lote atômico.
Aqui está um exemplo para obter um FeedIterator
no modo de versão mais recente que retorna objetos de entidade, neste caso, um objeto User
:
FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Dica
Antes da versão 3.34.0
, o modo de versão mais recente pode ser usado definindo ChangeFeedMode.Incremental
. Tanto Incremental
quantoLatestVersion
referem-se ao modo de versão mais recente do feed de alterações e os aplicativos que usam qualquer um dos modos verão o mesmo comportamento.
O modo de todas as versões e exclusões está em versão prévia e pode ser usado com a versão >= 3.32.0-preview
do SDK do .NET. Aqui está um exemplo para obter FeedIterator
em todas as versões e excluir o modo que retorna objetos User
:
FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);
Observação
No modo de versão mais recente, você receberá objetos que representam o item que foi alterado com alguns metadados extras. O modo de todas as versões e exclusões retorna um modelo de dados diferente. Para obter mais informações, consulte Analisar o objeto de resposta.
Você pode obter o exemplo completo para modo de versão mais recente ou modo de todas as versões e exclusões.
Consumir o feed de alterações por meio de fluxos
FeedIterator
para ambos os modos de feed de alterações existem duas opções. Além dos exemplos que retornam objetos de entidade, você também pode obter a resposta com suporte a Stream
. Os fluxos permitem que você leia os dados sem necessidade de desserializá-los primeiro, economizando recursos do cliente.
Aqui está um exemplo para obter FeedIterator
no modo de versão mais recente que retorna Stream
:
FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Consumir as alterações para um contêiner inteiro
Se você não fornecer um parâmetro FeedRange
para FeedIterator
, poderá processar o feed de alterações de um contêiner inteiro em seu próprio ritmo. Aqui está um exemplo que começa a ler todas as alterações começando no momento atual, usando o modo de versão mais recente:
FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);
while (iteratorForTheEntireContainer.HasMoreResults)
{
FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Como o feed de alterações é efetivamente uma lista infinita de itens que abrangem todas as futuras gravações e atualizações, o valor de HasMoreResults
é sempre true
. Ao tentar ler o feed de alterações e não houver nenhuma nova alteração disponível, você recebe uma resposta com o status NotModified
. No exemplo acima, ela é tratada aguardando cinco segundos antes de verificar novamente se há alterações.
Consumir as alterações para uma chave de partição
Em alguns casos, talvez você queira processar apenas as alterações para uma chave de partição específica. Você pode obter FeedIterator
para uma chave de partição específica e processar as alterações da mesma maneira que pode fazê-lo para um contêiner inteiro.
FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));
while (iteratorForThePartitionKey.HasMoreResults)
{
FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Usar FeedRange para paralelização
No processador do feed de alterações, o trabalho é distribuído automaticamente entre vários consumidores. No modelo de pull do feed de alterações, você pode usar o FeedRange
para paralelizar o processamento do feed de alterações. Um FeedRange
representa um intervalo de valores de chave de partição.
Aqui está um exemplo que mostra como obter uma lista de intervalos para o contêiner:
IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();
Ao obter a lista de valores FeedRange
para seu contêiner, você obtém um FeedRange
por partição física.
Usando um FeedRange
, você pode criar um FeedIterator
para paralelizar o processamento do feed de alterações em vários computadores ou threads. Ao contrário do exemplo anterior, que mostrou como obter um FeedIterator
para o contêiner inteiro ou uma única chave de partição, é possível usar FeedRanges para obter vários FeedIterators que podem processar o feed de alterações em paralelo.
No caso em que você deseja usar o FeedRanges, você precisa ter um processo orquestrador que obtém FeedRanges e os distribui para esses computadores. Essa distribuição pode ser:
- Usando
FeedRange.ToJsonString
e distribuindo esse valor de cadeia de caracteres. Os consumidores podem usar esse valor comFeedRange.FromJsonString
. - Se a distribuição estiver em processo, passando a referência do objeto
FeedRange
.
Aqui está uma amostra que mostra como ler desde o início do feed de alterações do contêiner usando dois computadores hipotéticos separados que estão realizando a leitura em paralelo:
Computador 1:
FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Computador 2:
FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Salvar tokens de continuação
Você pode salvar a posição de seu FeedIterator
obtendo o token de continuação. Um token de continuação é um valor de cadeia de caracteres que mantém o controle das últimas alterações processadas do FeedIterator e permite que FeedIterator
retome nesse ponto mais tarde. O token de continuação, se especificado, tem precedência sobre os valores da Hora de início e Começar do início. O código a seguir lê o feed de alterações desde a criação do contêiner. Depois que não houver mais alterações disponíveis, ele manterá um token de continuação para que o consumo do feed de alterações possa ser retomado posteriormente.
FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
string continuation = null;
while (iterator.HasMoreResults)
{
FeedResponse<User> response = await iterator.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
continuation = response.ContinuationToken;
// Stop the consumption since there are no new changes
break;
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);
Quando você estiver usando o modo de versão mais recente, o token de continuação FeedIterator
nunca expirará enquanto o contêiner do Azure Cosmos DB ainda existir. Quando você estiver usando o modo todas as versões e exclusões, o token de continuação FeedIterator
é válido desde que as alterações tenham ocorrido dentro da janela de retenção para backups contínuos.