대량 실행기 라이브러리에서 Azure Cosmos DB .NET V3 SDK의 대량 지원으로 마이그레이션

적용 대상: NoSQL

이 문서에서는 .NET 대량 실행기 라이브러리를 사용하는 기존 애플리케이션의 코드를 최신 버전의 .NET SDK에서 대량 지원 기능으로 마이그레이션하는 데 필요한 단계를 설명합니다.

대량 지원 사용

AllowBulkExecution 구성을 통해 CosmosClient 인스턴스에서 대량 지원을 사용합니다.

new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });

각 작업(operation)에 대한 작업(task) 만들기

.NET SDK의 대량 지원은 작업 병렬 라이브러리와 동시에 발생하는 그룹화 작업을 활용하여 작동합니다.

SDK에는 문서 또는 작업 목록을 입력 매개 변수로 사용하는 단일 메서드가 없지만 대량으로 실행하려는 각 작업에 대해 작업을 만든 다음 완료될 때까지 기다리기만 하면 됩니다.

예를 들어 초기 입력이 각 항목에 다음 스키마가 있는 항목 목록인 경우:

public class MyItem
{
    public string id { get; set; }

    public string pk { get; set; }

    public int operationCounter { get; set; } = 0;
}

대량 가져오기(BulkExecutor.BulkImportAsync 사용과 유사)를 수행하려면 CreateItemAsync에 대한 동시 호출이 있어야 합니다. 예시:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}

대량 업데이트(BulkExecutor.BulkUpdateAsync 사용과 유사)를 수행하려면 항목 값을 업데이트한 후 ReplaceItemAsync 메서드에 대한 동시 호출이 있어야 합니다. 예시:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}

그리고 대량 삭제(BulkExecutor.BulkDeleteAsync 사용과 유사)를 수행하려면 각 항목의 id 및 파티션 키를 사용하여 DeleteItemAsync에 대한 동시 호출을 수행해야 합니다. 예시:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}

작업 결과 상태 캡처

이전 코드 예에서는 동시 작업 목록을 만들고 각 작업에서 CaptureOperationResponse 메서드를 호출했습니다. 이 메서드는 오류를 캡처하고 요청 단위 사용량을 추적하여 BulkExecutor와 유사한 응답 스키마를 유지할 수 있게 해주는 확장입니다.

private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
    try
    {
        ItemResponse<T> response = await task;
        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = true,
            RequestUnitsConsumed = task.Result.RequestCharge
        };
    }
    catch (Exception ex)
    {
        if (ex is CosmosException cosmosException)
        {
            return new OperationResponse<T>()
            {
                Item = item,
                RequestUnitsConsumed = cosmosException.RequestCharge,
                IsSuccessful = false,
                CosmosException = cosmosException
            };
        }

        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = false,
            CosmosException = ex
        };
    }
}

여기서 OperationResponse는 다음과 같이 선언됩니다.

public class OperationResponse<T>
{
    public T Item { get; set; }
    public double RequestUnitsConsumed { get; set; } = 0;
    public bool IsSuccessful { get; set; }
    public Exception CosmosException { get; set; }
}

작업을 동시에 실행

전체 작업 목록의 범위를 추적하기 위해 다음 도우미 클래스를 사용합니다.

public class BulkOperations<T>
{
    public readonly List<Task<OperationResponse<T>>> Tasks;

    private readonly Stopwatch stopwatch = Stopwatch.StartNew();

    public BulkOperations(int operationCount)
    {
        this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
    }

    public async Task<BulkOperationResponse<T>> ExecuteAsync()
    {
        await Task.WhenAll(this.Tasks);
        this.stopwatch.Stop();
        return new BulkOperationResponse<T>()
        {
            TotalTimeTaken = this.stopwatch.Elapsed,
            TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
            SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
            Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
        };
    }
}

ExecuteAsync 메서드는 모든 작업이 완료될 때까지 기다리며 다음과 같이 사용할 수 있습니다.

BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();

통계 캡처

이전 코드는 모든 작업이 완료될 때까지 기다렸다가 필요한 통계를 계산합니다. 이러한 통계는 대량 실행기 라이브러리의 BulkImportResponse 통계와 유사합니다.

public class BulkOperationResponse<T>
{
    public TimeSpan TotalTimeTaken { get; set; }
    public int SuccessfulDocuments { get; set; } = 0;
    public double TotalRequestUnitsConsumed { get; set; } = 0;

    public IReadOnlyList<(T, Exception)> Failures { get; set; }
}

BulkOperationResponse에는 다음이 포함됩니다.

  1. 대량 지원을 통해 작업 목록을 처리하는 데 걸린 총 시간입니다.
  2. 성공한 작업의 수입니다.
  3. 사용된 총 요청 단위입니다.
  4. 오류가 있는 경우 예외가 포함된 튜플 목록과 로깅 및 식별을 위한 관련 항목이 표시됩니다.

재시도 구성

대량 실행기 라이브러리에는 라이브러리에 제어를 위임하기 위해 RetryOptionsMaxRetryWaitTimeInSecondsMaxRetryAttemptsOnThrottledRequests0으로 설정하는 것을 언급한 지침이 있었습니다.

.NET SDK의 대량 지원에는 숨겨진 동작이 없습니다. CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequestsCosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests를 통해 직접 재시도 옵션을 구성할 수 있습니다.

참고 항목

프로비저닝된 요청 단위가 데이터 양에 따라 예상보다 훨씬 낮은 경우 높은 값으로 설정하는 것을 고려할 수 있습니다. 대량 작업은 시간이 더 오래 걸리지만 재시도 횟수가 높기 때문에 완전히 성공할 확률이 더 높습니다.

성능 개선 사항

.NET SDK를 사용한 다른 작업과 마찬가지로 스트리밍 API를 사용하면 성능이 향상되고 불필요한 직렬화가 방지됩니다.

스트리밍 API 사용은 사용하는 데이터의 특성이 바이트 스트리밍(예: 파일 스트리밍)의 특성과 일치하는 경우에만 가능합니다. 이러한 경우 CreateItemStreamAsync, ReplaceItemStreamAsync 또는 DeleteItemStreamAsync 메서드를 사용하고 ResponseMessage(ItemResponse 대신)를 사용하면 달성할 수 있는 처리량이 증가합니다.

다음 단계