Aracılığıyla paylaş


Azure Cosmos DB .NET V3 SDK'sında toplu yürütücü kitaplığından toplu desteğe geçiş

UYGULANANLAR: NoSQL

Bu makalede, .NET toplu yürütücü kitaplığını kullanan mevcut bir uygulamanın kodunu .NET SDK'sının en son sürümündeki toplu destek özelliğine geçirmek için gerekli adımlar açıklanmaktadır.

Toplu desteği etkinleştirme

AllowBulkExecution yapılandırması aracılığıyla örnekte toplu desteği CosmosClient etkinleştirin:

new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });

Her işlem için Görevler oluşturma

.NET SDK'sında toplu destek, Görev Paralel Kitaplığı'nı ve eşzamanlı olarak gerçekleşen gruplandırma işlemlerini kullanarak çalışır.

SDK'da belge veya işlem listenizi giriş parametresi olarak alacak tek bir yöntem yoktur, bunun yerine toplu olarak yürütmek istediğiniz her işlem için bir Görev oluşturmanız ve ardından bunların tamamlanmasını beklemeniz gerekir.

Örneğin, ilk girişiniz her öğenin aşağıdaki şemaya sahip olduğu öğelerin listesiyse:

public class MyItem
{
    public string id { get; set; }

    public string pk { get; set; }

    public int operationCounter { get; set; } = 0;
}

Toplu içeri aktarma yapmak istiyorsanız (BulkExecutor.BulkImportAsync kullanımına benzer şekilde), öğesine CreateItemAsynceşzamanlı çağrılarınız olması gerekir. Örneğin:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}

Toplu güncelleştirme yapmak istiyorsanız (BulkExecutor.BulkUpdateAsync kullanımına benzer şekilde), öğe değerini güncelleştirdikten sonra yöntemine ReplaceItemAsync eşzamanlı çağrılar yapmanız gerekir. Örneğin:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}

Toplu silme işlemi yapmak istiyorsanız (BulkExecutor.BulkDeleteAsync kullanımına benzer şekilde), her öğenin ve bölüm anahtarıyla öğesine id eş zamanlı çağrılar DeleteItemAsyncyapmanız gerekir. Örneğin:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}

Görev sonuç durumunu yakalama

Önceki kod örneklerinde eş zamanlı bir görev listesi oluşturmuş ve bu görevlerin CaptureOperationResponse her birinde yöntemini çağırmıştık. Bu yöntem, hataları yakalayıp istek birimi kullanımını izleyerek BulkExecutor ile benzer bir yanıt şeması tutmamıza olanak tanıyan bir uzantıdır.

private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
    try
    {
        ItemResponse<T> response = await task;
        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = true,
            RequestUnitsConsumed = task.Result.RequestCharge
        };
    }
    catch (Exception ex)
    {
        if (ex is CosmosException cosmosException)
        {
            return new OperationResponse<T>()
            {
                Item = item,
                RequestUnitsConsumed = cosmosException.RequestCharge,
                IsSuccessful = false,
                CosmosException = cosmosException
            };
        }

        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = false,
            CosmosException = ex
        };
    }
}

OperationResponse burada olarak bildirilir:

public class OperationResponse<T>
{
    public T Item { get; set; }
    public double RequestUnitsConsumed { get; set; } = 0;
    public bool IsSuccessful { get; set; }
    public Exception CosmosException { get; set; }
}

İşlemleri eşzamanlı olarak yürütme

Görevler listesinin tamamının kapsamını izlemek için şu yardımcı sınıfını kullanırız:

public class BulkOperations<T>
{
    public readonly List<Task<OperationResponse<T>>> Tasks;

    private readonly Stopwatch stopwatch = Stopwatch.StartNew();

    public BulkOperations(int operationCount)
    {
        this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
    }

    public async Task<BulkOperationResponse<T>> ExecuteAsync()
    {
        await Task.WhenAll(this.Tasks);
        this.stopwatch.Stop();
        return new BulkOperationResponse<T>()
        {
            TotalTimeTaken = this.stopwatch.Elapsed,
            TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
            SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
            Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
        };
    }
}

ExecuteAsync yöntemi tüm işlemler tamamlanana kadar bekler ve bunu şu şekilde kullanabilirsiniz:

BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();

İstatistikleri yakalama

Önceki kod, tüm işlemler tamamlanana kadar bekler ve gerekli istatistikleri hesaplar. Bu istatistikler, toplu yürütücü kitaplığının BulkImportResponse öğesine benzer.

public class BulkOperationResponse<T>
{
    public TimeSpan TotalTimeTaken { get; set; }
    public int SuccessfulDocuments { get; set; } = 0;
    public double TotalRequestUnitsConsumed { get; set; } = 0;

    public IReadOnlyList<(T, Exception)> Failures { get; set; }
}

içerir BulkOperationResponse :

  1. Toplu destek aracılığıyla işlem listesini işlemek için geçen toplam süre.
  2. Başarılı işlemlerin sayısı.
  3. Tüketilen istek birimlerinin toplamı.
  4. Hatalar varsa, günlüğe kaydetme ve tanımlama amacıyla özel durumu ve ilişkili öğeyi içeren tanımlama listelerini görüntüler.

Yapılandırmayı yeniden deneyin

Toplu yürütücü kitaplığında, denetimi kitaplığa temsilci olarak atamak için RetryOptions 0 ve MaxRetryAttemptsOnThrottledRequests değerlerini ayarlamak MaxRetryWaitTimeInSeconds için bahsedilen yönergeler vardı.

.NET SDK'sında toplu destek için gizli bir davranış yoktur. Yeniden deneme seçeneklerini doğrudan CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests ve CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests aracılığıyla yapılandırabilirsiniz.

Not

Sağlanan istek birimlerinin veri miktarına göre beklenenden çok daha düşük olduğu durumlarda, bunları yüksek değerlere ayarlamayı düşünebilirsiniz. Toplu işlem daha uzun sürer, ancak daha yüksek yeniden denemeler nedeniyle tamamen başarılı olma şansı daha yüksektir.

Performans iyileştirmeleri

.NET SDK'sı ile yapılan diğer işlemlerde olduğu gibi, akış API'lerinin kullanılması daha iyi performansa neden olur ve gereksiz serileştirmeyi önler.

Akış API'lerini kullanmak yalnızca kullandığınız verilerin doğası bir bayt akışıyla (örneğin, dosya akışları) eşleşiyorsa mümkündür. Böyle durumlarda, , veya yöntemlerini kullanmak CreateItemStreamAsyncve ile ResponseMessage çalışmak (yerineItemResponse) elde edilebilecek aktarım hızını DeleteItemStreamAsync artırır. ReplaceItemStreamAsync

Sonraki adımlar

  • .NET SDK sürümleri hakkında daha fazla bilgi edinmek için Azure Cosmos DB SDK makalesine bakın.
  • GitHub'dan tam geçiş kaynak kodunu alın.
  • GitHub'da ek toplu örnekler
  • Azure Cosmos DB'ye geçiş için kapasite planlaması yapmaya mı çalışıyorsunuz?
    • Tek bildiğiniz mevcut veritabanı kümenizdeki sanal çekirdek ve sunucu sayısıysa, sanal çekirdek veya vCPU kullanarak istek birimlerini tahmin etme hakkında bilgi edinin
    • Geçerli veritabanı iş yükünüz için tipik istek oranlarını biliyorsanız Azure Cosmos DB kapasite planlayıcısı kullanarak istek birimlerini tahmin etme hakkında bilgi edinin