Поделиться через


Миграция с библиотеки Bulk Executor на поддержку массовой обработки в Azure Cosmos DB .NET SDK версии 3

В этой статье описаны необходимые шаги для переноса кода существующего приложения, использующего библиотеку массового исполнителя .NET , в функцию массовой поддержки в последней версии пакета SDK для .NET.

Включение массовой поддержки

Включите массовую поддержку экземпляра CosmosClient с помощью конфигурации AllowBulkExecution :

new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });

Создание задач для каждой операции

Массовая поддержка в пакете SDK для .NET работает путем использования параллельной библиотеки задач и группирования операций.

В пакете SDK нет единого метода, который будет принимать список документов или операций в качестве входного параметра, но вместо этого необходимо создать задачу для каждой операции, которую вы хотите выполнить массово, а затем просто подождите, пока они будут завершены.

Например, если исходные входные данные — это список элементов, в которых каждый элемент имеет следующую схему:

public class MyItem
{
    public string id { get; set; }

    public string pk { get; set; }

    public int operationCounter { get; set; } = 0;
}

Если вы хотите выполнить массовый импорт (аналогично использованию BulkExecutor.BulkImportAsync), необходимо выполнить параллельные вызовы CreateItemAsync. Рассмотрим пример.

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}

Если вы хотите выполнить массовое обновление (аналогично использованию BulkExecutor.BulkUpdateAsync), необходимо иметь одновременные вызовы ReplaceItemAsync метода после обновления значения элемента. Рассмотрим пример.

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}

Если вы хотите выполнить массовое удаление (аналогично использованию BulkExecutor.BulkDeleteAsync), необходимо иметь одновременные вызовы DeleteItemAsyncс id ключом секции каждого элемента. Рассмотрим пример.

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}

Запись состояния результата задачи

В предыдущих примерах кода мы создали параллельный список задач и вызвали CaptureOperationResponse метод для каждой из этих задач. Этот метод является расширением, которое позволяет поддерживать аналогичную схему ответа как BulkExecutor, записывая все ошибки и отслеживая использование единиц запроса.

private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
    try
    {
        ItemResponse<T> response = await task;
        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = true,
            RequestUnitsConsumed = task.Result.RequestCharge
        };
    }
    catch (Exception ex)
    {
        if (ex is CosmosException cosmosException)
        {
            return new OperationResponse<T>()
            {
                Item = item,
                RequestUnitsConsumed = cosmosException.RequestCharge,
                IsSuccessful = false,
                CosmosException = cosmosException
            };
        }

        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = false,
            CosmosException = ex
        };
    }
}

Где OperationResponse объявляется как:

public class OperationResponse<T>
{
    public T Item { get; set; }
    public double RequestUnitsConsumed { get; set; } = 0;
    public bool IsSuccessful { get; set; }
    public Exception CosmosException { get; set; }
}

Параллельное выполнение операций

Чтобы отслеживать область всего списка задач, мы используем этот вспомогательный класс:

public class BulkOperations<T>
{
    public readonly List<Task<OperationResponse<T>>> Tasks;

    private readonly Stopwatch stopwatch = Stopwatch.StartNew();

    public BulkOperations(int operationCount)
    {
        this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
    }

    public async Task<BulkOperationResponse<T>> ExecuteAsync()
    {
        await Task.WhenAll(this.Tasks);
        this.stopwatch.Stop();
        return new BulkOperationResponse<T>()
        {
            TotalTimeTaken = this.stopwatch.Elapsed,
            TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
            SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
            Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
        };
    }
}

Метод ExecuteAsync будет ожидать завершения всех операций, и его можно использовать следующим образом:

BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();

Сбор статистики

Предыдущий код ожидает завершения всех операций и вычисляет необходимую статистику. Эта статистика аналогична статистике библиотеки массовой обработки BulkImportResponse.

public class BulkOperationResponse<T>
{
    public TimeSpan TotalTimeTaken { get; set; }
    public int SuccessfulDocuments { get; set; } = 0;
    public double TotalRequestUnitsConsumed { get; set; } = 0;

    public IReadOnlyList<(T, Exception)> Failures { get; set; }
}

BulkOperationResponse содержит:

  1. Общее время, затраченное на обработку списка операций с помощью пакетной обработки.
  2. Количество успешных операций.
  3. Общее количество единиц запросов, потребляемых.
  4. Если возникают сбои, отображается список кортежей, содержащих исключение и связанный элемент для ведения журнала и идентификации.

Настройка повторных попыток

Библиотека массового исполнителя имела рекомендации, которые упоминали об установке MaxRetryWaitTimeInSeconds и MaxRetryAttemptsOnThrottledRequests параметров RetryOptions на 0, чтобы делегировать управление библиотеке.

Для массовой обработки в .NET SDK нет скрытого поведения. Параметры повторных попыток можно настроить непосредственно через CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests и CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests.

Замечание

В случаях, когда выделенные единицы запросов значительно ниже ожидаемых на основе объема данных, может быть целесообразно установить их на более высокие значения. Массовая операция займет больше времени, но вероятность ее полного успеха выше благодаря увеличенному числу повторных попыток.

Улучшения производительности

Как и в других операциях с пакетом SDK для .NET, использование API потоков приводит к повышению производительности и избегает ненужных сериализации.

Использование API потока возможно только в том случае, если характер используемых данных соответствует типу потока байтов (например, файловых потоков). В таких случаях использование методов CreateItemStreamAsync, ReplaceItemStreamAsync или DeleteItemStreamAsync и работа с ResponseMessage (вместо ItemResponse) увеличивает достигаемую пропускную способность.

Дальнейшие шаги