Migrar da biblioteca do executor em massa para o suporte em massa no SDK .NET V3 do Azure Cosmos DB

APLICA-SE A: NoSQL

Este artigo descreve os passos necessários para migrar o código de uma aplicação existente que utiliza a biblioteca do executor em massa .NET para a funcionalidade de suporte em massa na versão mais recente do SDK .NET.

Ativar o suporte em massa

Ative o suporte em massa na CosmosClient instância através da configuração AllowBulkExecution :

new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });

Criar Tarefas para cada operação

O suporte em massa no SDK .NET funciona ao tirar partido da Biblioteca Paralela de Tarefas e das operações de agrupamento que ocorrem em simultâneo.

Não existe um único método no SDK que leve a sua lista de documentos ou operações como um parâmetro de entrada, mas, em vez disso, tem de criar uma Tarefa para cada operação que pretende executar em massa e, em seguida, esperar que sejam concluídas.

Por exemplo, se a entrada inicial for uma lista de itens em que cada item tem o seguinte esquema:

public class MyItem
{
    public string id { get; set; }

    public string pk { get; set; }

    public int operationCounter { get; set; } = 0;
}

Se quiser fazer a importação em massa (semelhante à utilização de BulkExecutor.BulkImportAsync), tem de ter chamadas simultâneas para CreateItemAsync. Por exemplo:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}

Se quiser efetuar uma atualização em massa (semelhante à utilização de BulkExecutor.BulkUpdateAsync), tem de ter chamadas simultâneas para ReplaceItemAsync o método depois de atualizar o valor do item. Por exemplo:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}

Se quiser eliminar em massa (semelhante à utilização de BulkExecutor.BulkDeleteAsync), tem de ter chamadas simultâneas para DeleteItemAsync, com a id chave de partição e de cada item. Por exemplo:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}

Capturar o estado do resultado da tarefa

Nos exemplos de código anteriores, criámos uma lista simultânea de tarefas e chamámos o CaptureOperationResponse método em cada uma dessas tarefas. Este método é uma extensão que nos permite manter um esquema de resposta semelhante ao BulkExecutor, ao capturar quaisquer erros e controlar a utilização das unidades de pedido.

private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
    try
    {
        ItemResponse<T> response = await task;
        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = true,
            RequestUnitsConsumed = task.Result.RequestCharge
        };
    }
    catch (Exception ex)
    {
        if (ex is CosmosException cosmosException)
        {
            return new OperationResponse<T>()
            {
                Item = item,
                RequestUnitsConsumed = cosmosException.RequestCharge,
                IsSuccessful = false,
                CosmosException = cosmosException
            };
        }

        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = false,
            CosmosException = ex
        };
    }
}

Quando o OperationResponse é declarado como:

public class OperationResponse<T>
{
    public T Item { get; set; }
    public double RequestUnitsConsumed { get; set; } = 0;
    public bool IsSuccessful { get; set; }
    public Exception CosmosException { get; set; }
}

Executar operações em simultâneo

Para controlar o âmbito de toda a lista de Tarefas, utilizamos esta classe auxiliar:

public class BulkOperations<T>
{
    public readonly List<Task<OperationResponse<T>>> Tasks;

    private readonly Stopwatch stopwatch = Stopwatch.StartNew();

    public BulkOperations(int operationCount)
    {
        this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
    }

    public async Task<BulkOperationResponse<T>> ExecuteAsync()
    {
        await Task.WhenAll(this.Tasks);
        this.stopwatch.Stop();
        return new BulkOperationResponse<T>()
        {
            TotalTimeTaken = this.stopwatch.Elapsed,
            TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
            SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
            Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
        };
    }
}

O ExecuteAsync método aguardará até que todas as operações estejam concluídas e poderá utilizá-la da seguinte forma:

BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();

Capturar estatísticas

O código anterior aguarda até que todas as operações sejam concluídas e calcula as estatísticas necessárias. Estas estatísticas são semelhantes às do BulkImportResponse da biblioteca do executor em massa.

public class BulkOperationResponse<T>
{
    public TimeSpan TotalTimeTaken { get; set; }
    public int SuccessfulDocuments { get; set; } = 0;
    public double TotalRequestUnitsConsumed { get; set; } = 0;

    public IReadOnlyList<(T, Exception)> Failures { get; set; }
}

Contém BulkOperationResponse :

  1. O tempo total necessário para processar a lista de operações através do suporte em massa.
  2. O número de operações bem-sucedidas.
  3. O total de unidades de pedido consumidas.
  4. Se existirem falhas, apresenta uma lista de cadeias de identificação que contêm a exceção e o item associado para fins de registo e identificação.

Configuração de repetição

A biblioteca do executor em massa tinha orientações que mencionavam para definir o MaxRetryWaitTimeInSeconds e MaxRetryAttemptsOnThrottledRequests de RetryOptions para 0 delegar o controlo à biblioteca.

Para o suporte em massa no SDK .NET, não existe nenhum comportamento oculto. Pode configurar as opções de repetição diretamente através de CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests e CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests.

Nota

Nos casos em que as unidades de pedido aprovisionadas são muito inferiores às esperadas com base na quantidade de dados, recomendamos que considere defini-las para valores elevados. A operação em massa demorará mais tempo, mas tem uma maior probabilidade de ser totalmente bem-sucedida devido às repetições mais elevadas.

Melhorias de desempenho

Tal como acontece com outras operações com o SDK .NET, a utilização das APIs de fluxo resulta num melhor desempenho e evita qualquer serialização desnecessária.

A utilização de APIs de fluxo só é possível se a natureza dos dados que utiliza corresponder à de um fluxo de bytes (por exemplo, fluxos de ficheiros). Nestes casos, utilizar os CreateItemStreamAsyncmétodos , ReplaceItemStreamAsyncou DeleteItemStreamAsync e trabalhar com ResponseMessage (em vez de ItemResponse) aumenta o débito que pode ser alcançado.

Passos seguintes