Миграция из библиотеки исполнителя массовых операций на использование функции поддержки массовых операций в пакете SDK для Azure Cosmos DB .NET версии 3
ОБЛАСТЬ ПРИМЕНЕНИЯ: NoSQL
В этой статье описаны действия, необходимые для переноса кода существующего приложения, использующего библиотеку исполнителя массовых операций для .NET, в функцию поддержки массовых операций в последней версии пакета SDK для .NET.
Включение поддержки массовых операций
Включите поддержку массовых операций в экземпляре CosmosClient
с помощью конфигурации AllowBulkExecution:
new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });
Создание задач для каждой операции
Поддержка массовых операций в пакете SDK для .NET осуществляется с помощью библиотеки параллельных задач и операций группирования, которые возникают одновременно.
В пакете SDK нет ни единого метода, который мог бы принимать список документов или операций в качестве входного параметра. Вместо этого вам необходимо создать задачу для каждой операции, которую нужно выполнить в массовом режиме, а затем просто дождаться их завершения.
Например, если в качестве исходных выходных данных используется список элементов, в котором каждый элемент имеет следующую схему:
public class MyItem
{
public string id { get; set; }
public string pk { get; set; }
public int operationCounter { get; set; } = 0;
}
Если требуется выполнить массовый импорт (аналогично использованию свойства BulkExecutor.BulkImportAsync), необходимо выполнить одновременные вызовы метода CreateItemAsync
. Например:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}
Если требуется выполнить массовое обновление (аналогично использованию свойства BulkExecutor.BulkUpdateAsync), необходимо выполнить одновременные вызовы метода ReplaceItemAsync
после обновления значения элемента. Например:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}
Если требуется выполнить массовое удаление (аналогично использованию свойства BulkExecutor.BulkDeleteAsync), необходимо выполнить одновременные вызовы метода DeleteItemAsync
, указав значение id
и ключ раздела для каждого элемента. Например:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}
Сбор данных о состоянии результата задачи
В предыдущих примерах кода мы создали параллельный список задач и вызвали метод CaptureOperationResponse
для каждой из этих задач. Этот метод является методом расширения, позволяющим реализовать аналогичную схему ответа, что и библиотека BulkExecutor, путем записи всех ошибок и отслеживания объема единиц запросов.
private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
try
{
ItemResponse<T> response = await task;
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = true,
RequestUnitsConsumed = task.Result.RequestCharge
};
}
catch (Exception ex)
{
if (ex is CosmosException cosmosException)
{
return new OperationResponse<T>()
{
Item = item,
RequestUnitsConsumed = cosmosException.RequestCharge,
IsSuccessful = false,
CosmosException = cosmosException
};
}
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = false,
CosmosException = ex
};
}
}
Значение OperationResponse
объявляется следующим образом:
public class OperationResponse<T>
{
public T Item { get; set; }
public double RequestUnitsConsumed { get; set; } = 0;
public bool IsSuccessful { get; set; }
public Exception CosmosException { get; set; }
}
Выполнение операций в параллельном режиме
Чтобы отслеживать область всего списка задач, используется следующий вспомогательный класс:
public class BulkOperations<T>
{
public readonly List<Task<OperationResponse<T>>> Tasks;
private readonly Stopwatch stopwatch = Stopwatch.StartNew();
public BulkOperations(int operationCount)
{
this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
}
public async Task<BulkOperationResponse<T>> ExecuteAsync()
{
await Task.WhenAll(this.Tasks);
this.stopwatch.Stop();
return new BulkOperationResponse<T>()
{
TotalTimeTaken = this.stopwatch.Elapsed,
TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
};
}
}
Метод ExecuteAsync
будет ожидать завершения всех операций. Затем вы сможете использовать его следующим образом:
BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();
Сбор статистики
Предыдущий код ожидает завершения всех операций и вычисляет необходимую статистику. Эти статистические данные аналогичны свойству BulkImportResponse библиотеки исполнителя массовых операций.
public class BulkOperationResponse<T>
{
public TimeSpan TotalTimeTaken { get; set; }
public int SuccessfulDocuments { get; set; } = 0;
public double TotalRequestUnitsConsumed { get; set; } = 0;
public IReadOnlyList<(T, Exception)> Failures { get; set; }
}
Метод BulkOperationResponse
содержит:
- Общее время, затраченное на обработку списка операций с помощью поддержки массовых операций.
- Количество успешных операций.
- Общее количество использованных единиц запросов.
- Если происходят сбои, отображается список кортежей, содержащих исключение и связанный элемент для ведения журнала и идентификации.
Конфигурация повторных попыток
В библиотеке исполнителя массовых операций было руководство, в котором указано, что для MaxRetryWaitTimeInSeconds
и MaxRetryAttemptsOnThrottledRequests
свойства RetryOptions необходимо установить значение 0
, чтобы делегировать управление библиотеке.
Функция поддержки массовых операций в пакете SDK для .NET не имеет скрытого поведения. Параметры повторов можно настроить непосредственно с помощью свойства CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests и CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests.
Примечание.
Если подготовленные единицы запроса намного меньше ожидаемых, исходя из объема данных, можно установить большие значения. Массовая операция занимает больше времени, однако из-за большего числа повторных попыток она имеет больше шансов завершиться успешно.
Улучшения производительности
Как и в случае с другими операциями с пакетом SDK для .NET, использование API-интерфейсов потока приводит к повышению производительности и позволяет избежать ненужной сериализации.
Использование API-интерфейсов потока возможно, только если свойства используемых данных соответствуют потоку байтов (например, потокам файлов). В таких случаях использование методов CreateItemStreamAsync
, ReplaceItemStreamAsync
или DeleteItemStreamAsync
, а также ResponseMessage
(вместо ItemResponse
) увеличивает пропускную способность, которую можно достичь.
Следующие шаги
- Дополнительные сведения о выпусках пакета SDK для .NET см. в статье Пакет SDK для Azure Cosmos DB.
- Скачайте полный исходный код миграции на сайте GitHub.
- Дополнительные примеры массовых операций на GitHub
- Если вы планируете ресурсы для миграции в Azure Cosmos DB,
- Если вам известно только количество виртуальных ядер и серверов в существующем кластере баз данных, прочитайте об оценке единиц запроса на основе этих данных.
- Если вам известна стандартная частота запросов для текущей рабочей нагрузки базы данных, ознакомьтесь со статьей о расчете единиц запросов с помощью планировщика ресурсов Azure Cosmos DB