قم بالترحيل من مكتبة المنفذ المجمعة إلى الدعم المجمع في Azure Cosmos DB .NET V3 SDK

ينطبق على: NoSQL

توضح هذه المقالة الخطوات المطلوبة لترحيل التعليمات البرمجية لتطبيق موجود يستخدم مكتبة المنفذ المجمع .NET إلى ميزة الدعم المجمع في أحدث إصدار من .NET SDK.

تفعيل الدعم المجمع

تمكين الدعم المجمع على المثيل CosmosClient من خلال تكوين AllowBulkExecution :

new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });

إنشاء مهام لكل عملية

يعمل الدعم المجمع في .NET SDK من خلال الاستفادة من مكتبة المهام المتوازية وعمليات التجميع التي تحدث بشكل متزامن.

لا توجد طريقة واحدة في SDK ستأخذ قائمة المستندات أو العمليات الخاصة بك كمعامل إدخال، ولكن بدلاً من ذلك، تحتاج إلى إنشاء مهمة لكل عملية تريد تنفيذها بشكل مجمّع، ثم انتظر حتى تكتمل.

على سبيل المثال، إذا كان الإدخال الأولي الخاص بك عبارة عن قائمة بالعناصر حيث يحتوي كل عنصر على المخطط التالي:

public class MyItem
{
    public string id { get; set; }

    public string pk { get; set; }

    public int operationCounter { get; set; } = 0;
}

إذا كنت تريد إجراء استيراد مجمع (على غرار استخدام BulkExecutor.BulkImportAsync)، فستحتاج إلى استدعاءات متزامنة إلى CreateItemAsync. على سبيل المثال:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}

إذا كنت تريد إجراء تحديث مجمع (مشابه لاستخدام BulkExecutor.BulkUpdateAsync)، فستحتاج إلى استدعاءات متزامنة إلى ReplaceItemAsync الأسلوب بعد تحديث قيمة العنصر. على سبيل المثال:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}

وإذا كنت تريد إجراء حذف مجمع (على غرار استخدام BulkExecutor.BulkDeleteAsync)، تحتاج إلى استدعاءات متزامنة إلى DeleteItemAsync، مع id ومفتاح القسم لكل عنصر. على سبيل المثال:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}

التقاط حالة نتيجة المهمة

في أمثلة التعليمات البرمجية السابقة، أنشأنا قائمة متزامنة من المهام، واستدعنا الأسلوب على CaptureOperationResponse كل من هذه المهام. هذا الأسلوب هو ملحق يتيح لنا الاحتفاظ بمخطط استجابة مشابه مثل BulkExecutor، عن طريق التقاط أي أخطاء وتتبع استخدام وحدات الطلب.

private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
    try
    {
        ItemResponse<T> response = await task;
        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = true,
            RequestUnitsConsumed = task.Result.RequestCharge
        };
    }
    catch (Exception ex)
    {
        if (ex is CosmosException cosmosException)
        {
            return new OperationResponse<T>()
            {
                Item = item,
                RequestUnitsConsumed = cosmosException.RequestCharge,
                IsSuccessful = false,
                CosmosException = cosmosException
            };
        }

        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = false,
            CosmosException = ex
        };
    }
}

حيث يتم الإعلان عن OperationResponse ك:

public class OperationResponse<T>
{
    public T Item { get; set; }
    public double RequestUnitsConsumed { get; set; } = 0;
    public bool IsSuccessful { get; set; }
    public Exception CosmosException { get; set; }
}

تنفيذ العمليات في نفس الوقت

لتتبع نطاق قائمة المهام بالكامل، نستخدم فئة المساعد هذه:

public class BulkOperations<T>
{
    public readonly List<Task<OperationResponse<T>>> Tasks;

    private readonly Stopwatch stopwatch = Stopwatch.StartNew();

    public BulkOperations(int operationCount)
    {
        this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
    }

    public async Task<BulkOperationResponse<T>> ExecuteAsync()
    {
        await Task.WhenAll(this.Tasks);
        this.stopwatch.Stop();
        return new BulkOperationResponse<T>()
        {
            TotalTimeTaken = this.stopwatch.Elapsed,
            TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
            SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
            Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
        };
    }
}

ExecuteAsync سينتظر الأسلوب حتى تكتمل جميع العمليات ويمكنك استخدامه كما يلي:

BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();

التقاط الإحصائيات

ينتظر الكود السابق حتى تكتمل جميع العمليات ويحسب الإحصائيات المطلوبة. هذه الإحصائيات مشابهة للإحصائيات الخاصة بمكتبة المنفذ المجمعة BulkImportResponse.

public class BulkOperationResponse<T>
{
    public TimeSpan TotalTimeTaken { get; set; }
    public int SuccessfulDocuments { get; set; } = 0;
    public double TotalRequestUnitsConsumed { get; set; } = 0;

    public IReadOnlyList<(T, Exception)> Failures { get; set; }
}

يحتوي BulkOperationResponse على:

  1. إجمالي الوقت المستغرق لمعالجة قائمة العمليات من خلال الدعم الجماعي.
  2. عدد العمليات الناجحة.
  3. إجمالي وحدات الطلب المستهلكة.
  4. إذا كانت هناك حالات فشل، فإنها تعرض قائمة المجموعات التي تحتوي على الاستثناء والعنصر المرتبط لغرض التسجيل والتعريف.

أعد محاولة التكوين

تتضمن مكتبة المنفذ المجمع إرشادات مذكورة لتعيين MaxRetryWaitTimeInSeconds و MaxRetryAttemptsOnThrottledRequests من RetryOptions إلى 0 لتفويض التحكم إلى المكتبة.

للحصول على دعم جماعي في .NET SDK، لا يوجد سلوك مخفي. يمكنك تكوين خيارات إعادة المحاولة مباشرة من خلال CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests وCosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests.

إشعار

في الحالات التي تكون فيها وحدات الطلبات المقدمة أقل بكثير من المتوقع بناءً على كمية البيانات، قد تحتاج إلى التفكير في تعيينها على قيم عالية. ستستغرق العملية المجمّعة وقتاً أطول ولكن لديها فرصة أكبر للنجاح تماماً بسبب عمليات إعادة المحاولة الأعلى.

تحسينات الأداء

كما هو الحال مع العمليات الأخرى باستخدام .NET SDK، يؤدي استخدام واجهات برمجة التطبيقات للتدفق إلى أداء أفضل وتجنب أي تسلسل غير ضروري.

لا يمكن استخدام واجهات برمجة التطبيقات للتدفق إلا إذا كانت طبيعة البيانات التي تستخدمها تتطابق مع طبيعة تدفق البايت (على سبيل المثال، تدفقات الملفات). في مثل هذه الحالات، يؤدي استخدام CreateItemStreamAsyncReplaceItemStreamAsyncالأساليب أو أو DeleteItemStreamAsync والعمل مع ResponseMessage (بدلا من ItemResponse) إلى زيادة معدل النقل الذي يمكن تحقيقه.

الخطوات التالية