Поделиться через


Миграция из библиотеки исполнителя массовых операций на использование функции поддержки массовых операций в пакете SDK для Azure Cosmos DB .NET версии 3

ОБЛАСТЬ ПРИМЕНЕНИЯ: NoSQL

В этой статье описаны действия, необходимые для переноса кода существующего приложения, использующего библиотеку исполнителя массовых операций для .NET, в функцию поддержки массовых операций в последней версии пакета SDK для .NET.

Включение поддержки массовых операций

Включите поддержку массовых операций в экземпляре CosmosClient с помощью конфигурации AllowBulkExecution:

new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });

Создание задач для каждой операции

Поддержка массовых операций в пакете SDK для .NET осуществляется с помощью библиотеки параллельных задач и операций группирования, которые возникают одновременно.

В пакете SDK нет ни единого метода, который мог бы принимать список документов или операций в качестве входного параметра. Вместо этого вам необходимо создать задачу для каждой операции, которую нужно выполнить в массовом режиме, а затем просто дождаться их завершения.

Например, если в качестве исходных выходных данных используется список элементов, в котором каждый элемент имеет следующую схему:

public class MyItem
{
    public string id { get; set; }

    public string pk { get; set; }

    public int operationCounter { get; set; } = 0;
}

Если требуется выполнить массовый импорт (аналогично использованию свойства BulkExecutor.BulkImportAsync), необходимо выполнить одновременные вызовы метода CreateItemAsync. Например:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}

Если требуется выполнить массовое обновление (аналогично использованию свойства BulkExecutor.BulkUpdateAsync), необходимо выполнить одновременные вызовы метода ReplaceItemAsync после обновления значения элемента. Например:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}

Если требуется выполнить массовое удаление (аналогично использованию свойства BulkExecutor.BulkDeleteAsync), необходимо выполнить одновременные вызовы метода DeleteItemAsync, указав значение id и ключ раздела для каждого элемента. Например:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}

Сбор данных о состоянии результата задачи

В предыдущих примерах кода мы создали параллельный список задач и вызвали метод CaptureOperationResponse для каждой из этих задач. Этот метод является методом расширения, позволяющим реализовать аналогичную схему ответа, что и библиотека BulkExecutor, путем записи всех ошибок и отслеживания объема единиц запросов.

private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
    try
    {
        ItemResponse<T> response = await task;
        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = true,
            RequestUnitsConsumed = task.Result.RequestCharge
        };
    }
    catch (Exception ex)
    {
        if (ex is CosmosException cosmosException)
        {
            return new OperationResponse<T>()
            {
                Item = item,
                RequestUnitsConsumed = cosmosException.RequestCharge,
                IsSuccessful = false,
                CosmosException = cosmosException
            };
        }

        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = false,
            CosmosException = ex
        };
    }
}

Значение OperationResponse объявляется следующим образом:

public class OperationResponse<T>
{
    public T Item { get; set; }
    public double RequestUnitsConsumed { get; set; } = 0;
    public bool IsSuccessful { get; set; }
    public Exception CosmosException { get; set; }
}

Выполнение операций в параллельном режиме

Чтобы отслеживать область всего списка задач, используется следующий вспомогательный класс:

public class BulkOperations<T>
{
    public readonly List<Task<OperationResponse<T>>> Tasks;

    private readonly Stopwatch stopwatch = Stopwatch.StartNew();

    public BulkOperations(int operationCount)
    {
        this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
    }

    public async Task<BulkOperationResponse<T>> ExecuteAsync()
    {
        await Task.WhenAll(this.Tasks);
        this.stopwatch.Stop();
        return new BulkOperationResponse<T>()
        {
            TotalTimeTaken = this.stopwatch.Elapsed,
            TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
            SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
            Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
        };
    }
}

Метод ExecuteAsync будет ожидать завершения всех операций. Затем вы сможете использовать его следующим образом:

BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();

Сбор статистики

Предыдущий код ожидает завершения всех операций и вычисляет необходимую статистику. Эти статистические данные аналогичны свойству BulkImportResponse библиотеки исполнителя массовых операций.

public class BulkOperationResponse<T>
{
    public TimeSpan TotalTimeTaken { get; set; }
    public int SuccessfulDocuments { get; set; } = 0;
    public double TotalRequestUnitsConsumed { get; set; } = 0;

    public IReadOnlyList<(T, Exception)> Failures { get; set; }
}

Метод BulkOperationResponse содержит:

  1. Общее время, затраченное на обработку списка операций с помощью поддержки массовых операций.
  2. Количество успешных операций.
  3. Общее количество использованных единиц запросов.
  4. Если происходят сбои, отображается список кортежей, содержащих исключение и связанный элемент для ведения журнала и идентификации.

Конфигурация повторных попыток

В библиотеке исполнителя массовых операций было руководство, в котором указано, что для MaxRetryWaitTimeInSeconds и MaxRetryAttemptsOnThrottledRequests свойства RetryOptions необходимо установить значение 0, чтобы делегировать управление библиотеке.

Функция поддержки массовых операций в пакете SDK для .NET не имеет скрытого поведения. Параметры повторов можно настроить непосредственно с помощью свойства CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests и CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests.

Примечание.

Если подготовленные единицы запроса намного меньше ожидаемых, исходя из объема данных, можно установить большие значения. Массовая операция занимает больше времени, однако из-за большего числа повторных попыток она имеет больше шансов завершиться успешно.

Улучшения производительности

Как и в случае с другими операциями с пакетом SDK для .NET, использование API-интерфейсов потока приводит к повышению производительности и позволяет избежать ненужной сериализации.

Использование API-интерфейсов потока возможно, только если свойства используемых данных соответствуют потоку байтов (например, потокам файлов). В таких случаях использование методов CreateItemStreamAsync, ReplaceItemStreamAsync или DeleteItemStreamAsync, а также ResponseMessage (вместо ItemResponse) увеличивает пропускную способность, которую можно достичь.

Следующие шаги