Ler em inglês

Partilhar via


Migrar da biblioteca de executores em massa para o suporte em massa no SDK do Azure Cosmos DB .NET V3

APLICA-SE A: NoSQL

Este artigo descreve as etapas necessárias para migrar o código de um aplicativo existente que usa a biblioteca de executores em massa do .NET para o recurso de suporte em massa na versão mais recente do SDK do .NET.

Habilite o suporte em massa

Habilite o suporte em massa na CosmosClient instância por meio da configuração AllowBulkExecution :

new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });

Criar tarefas para cada operação

O suporte em massa no SDK do .NET funciona aproveitando a Biblioteca Paralela de Tarefas e agrupando operações que ocorrem simultaneamente.

Não há um único método no SDK que tomará sua lista de documentos ou operações como um parâmetro de entrada, mas sim, você precisa criar uma tarefa para cada operação que deseja executar em massa e, em seguida, simplesmente esperar que eles sejam concluídos.

Por exemplo, se a entrada inicial for uma lista de itens em que cada item tem o seguinte esquema:

public class MyItem
{
    public string id { get; set; }

    public string pk { get; set; }

    public int operationCounter { get; set; } = 0;
}

Se você quiser fazer a importação em massa (semelhante ao uso de BulkExecutor.BulkImportAsync), precisará ter chamadas simultâneas para CreateItemAsync. Por exemplo:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}

Se você quiser fazer a atualização em massa (semelhante ao uso de BulkExecutor.BulkUpdateAsync), você precisa ter chamadas simultâneas para o ReplaceItemAsync método depois de atualizar o valor do item. Por exemplo:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}

E se você quiser fazer a exclusão em massa (semelhante ao uso de BulkExecutor.BulkDeleteAsync), você precisa ter chamadas simultâneas para DeleteItemAsync, com a id chave e partição de cada item. Por exemplo:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}

Capturar o estado do resultado da tarefa

Nos exemplos de código anteriores, criamos uma lista simultânea de tarefas e chamamos o CaptureOperationResponse método em cada uma dessas tarefas. Este método é uma extensão que nos permite manter um esquema de resposta semelhante ao BulkExecutor, capturando quaisquer erros e rastreando o uso das unidades de solicitação.

private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
    try
    {
        ItemResponse<T> response = await task;
        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = true,
            RequestUnitsConsumed = task.Result.RequestCharge
        };
    }
    catch (Exception ex)
    {
        if (ex is CosmosException cosmosException)
        {
            return new OperationResponse<T>()
            {
                Item = item,
                RequestUnitsConsumed = cosmosException.RequestCharge,
                IsSuccessful = false,
                CosmosException = cosmosException
            };
        }

        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = false,
            CosmosException = ex
        };
    }
}

Se o for declarado OperationResponse como:

public class OperationResponse<T>
{
    public T Item { get; set; }
    public double RequestUnitsConsumed { get; set; } = 0;
    public bool IsSuccessful { get; set; }
    public Exception CosmosException { get; set; }
}

Executar operações simultaneamente

Para controlar o escopo de toda a lista de Tarefas, usamos esta classe auxiliar:

public class BulkOperations<T>
{
    public readonly List<Task<OperationResponse<T>>> Tasks;

    private readonly Stopwatch stopwatch = Stopwatch.StartNew();

    public BulkOperations(int operationCount)
    {
        this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
    }

    public async Task<BulkOperationResponse<T>> ExecuteAsync()
    {
        await Task.WhenAll(this.Tasks);
        this.stopwatch.Stop();
        return new BulkOperationResponse<T>()
        {
            TotalTimeTaken = this.stopwatch.Elapsed,
            TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
            SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
            Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
        };
    }
}

O ExecuteAsync método aguardará até que todas as operações sejam concluídas e você poderá usá-lo assim:

BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();

Capturar estatísticas

O código anterior aguarda até que todas as operações sejam concluídas e calcula as estatísticas necessárias. Essas estatísticas são semelhantes às da biblioteca de executores em massa BulkImportResponse.

public class BulkOperationResponse<T>
{
    public TimeSpan TotalTimeTaken { get; set; }
    public int SuccessfulDocuments { get; set; } = 0;
    public double TotalRequestUnitsConsumed { get; set; } = 0;

    public IReadOnlyList<(T, Exception)> Failures { get; set; }
}

O BulkOperationResponse contém:

  1. O tempo total necessário para processar a lista de operações através do suporte em massa.
  2. O número de operações bem-sucedidas.
  3. O total de unidades de solicitação consumidas.
  4. Se houver falhas, ele exibirá uma lista de tuplas que contêm a exceção e o item associado para fins de registro e identificação.

Repetir configuração

A biblioteca executora em massa tinha orientações mencionadas para definir o MaxRetryWaitTimeInSeconds e MaxRetryAttemptsOnThrottledRequests de RetryOptions para 0 delegar o controle à biblioteca.

Para suporte em massa no SDK do .NET, não há nenhum comportamento oculto. Você pode configurar as opções de repetição diretamente através de CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests e CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests.

Nota

Nos casos em que as unidades de solicitação provisionadas são muito menores do que o esperado com base na quantidade de dados, convém considerar defini-las como valores altos. A operação em massa levará mais tempo, mas tem uma chance maior de ter sucesso completo devido às tentativas mais altas.

Melhoramentos de desempenho

Como em outras operações com o SDK do .NET, o uso das APIs de fluxo resulta em melhor desempenho e evita qualquer serialização desnecessária.

O uso de APIs de fluxo só é possível se a natureza dos dados usados corresponder à de um fluxo de bytes (por exemplo, fluxos de arquivos). Nesses casos, usar os CreateItemStreamAsyncmétodos , ReplaceItemStreamAsyncou DeleteItemStreamAsync e trabalhar com ResponseMessage (em vez de) aumenta a taxa de ItemResponsetransferência que pode ser alcançada.

Próximos passos