إجراء عمليات مجمعة على بيانات Azure Cosmos DB

ينطبق على: NoSQL

يقدم هذا البرنامج التعليمي إرشادات حول إجراء عمليات مجمعة في Azure Cosmos DB Java V4 SDK. يأتي هذا الإصدار من SDK مزوداً بمكتبة المنفذ المجمعة المضمنة. إذا كنت تستخدم إصداراً أقدم من Java SDK، فمن المستحسن الانتقال إلى أحدث إصدار. Azure Cosmos DB Java V4 SDK هو الحل الحالي الموصى به لدعم Java بالجملة.

حاليا، يتم دعم مكتبة المنفذ المجمع فقط من قبل Azure Cosmos DB ل NoSQL وواجهة برمجة التطبيقات لحسابات Gremlin. للتعرف على استخدام مكتبة .NET للمنفذ المجمع مع واجهة برمجة التطبيقات ل Gremlin، راجع تنفيذ عمليات مجمعة في Azure Cosmos DB ل Gremlin.

المتطلبات الأساسية

استنساخ نموذج التطبيق

الآن دعنا ننتقل إلى العمل باستخدام التعليمات البرمجية عن طريق تنزيل مستودع عينات عام لـ Java V4 SDK لـ Azure Cosmos DB من GitHub. تؤدي نماذج التطبيقات هذه عمليات CRUD وعمليات أخرى شائعة على Azure Cosmos DB. لاستنساخ المستودع، افتح موجه الأوامر، وانتقل إلى الدليل حيث تريد نسخ التطبيق وتشغيل الأمر التالي:

 git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples 

يحتوي المستودع المستنسخ على نموذج SampleBulkQuickStartAsync.java في المجلد /azure-cosmos-java-sql-api-samples/tree/main/src/main/java/com/azure/cosmos/examples/bulk/async. ينشئ التطبيق المستندات وينفذ العمليات لإنشاء العناصر بشكل مجمّع، ورفعها، واستبدالها، وحذفها في Azure Cosmos DB. في الأقسام التالية، سنراجع التعليمة البرمجية في نموذج التطبيق.

التنفيذ المجمع في Azure Cosmos DB

  1. تتم قراءة سلاسل اتصال Azure Cosmos DB كوسائط، ويتم تخصيصها للمتغيرات المحددة في ملف /examples/common/AccountSettings.java. يجب تعيين متغيرات البيئة هذه
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key

لتشغيل العينة المجمعة، حدد فئتها الرئيسية:

com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
  1. تتم تهيئة العنصر CosmosAsyncClient باستخدام العبارات التالية:

    client = new CosmosClientBuilder()
        .endpoint(AccountSettings.HOST)
        .key(AccountSettings.MASTER_KEY)
        .preferredRegions(preferredRegions)
        .contentResponseOnWriteEnabled(true)
        .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
    
    
  2. يقوم النموذج بإنشاء قاعدة بيانات وحاوية غير متزامنة. ثم يقوم بإنشاء مستندات متعددة سيتم تنفيذ عمليات مجمعة عليها. يضيف هذا المستندات إلى عنصر دفق تفاعلي Flux<Family>:

    Family andersenFamilyItem = Families.getAndersenFamilyItem();
    Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
    Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
    Family smithFamilyItem = Families.getSmithFamilyItem();
    
    //  Setup family items to create
    Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
    
  3. يحتوي النموذج على طرق للإنشاء المجمع، والرفع، والاستبدال، والحذف. في كل طريقة، نقوم بتعيين مستندات العائلات في دفق BulkWriter Flux<Family> لاستدعاءات طريقة متعددة في CosmosBulkOperations. تتم إضافة هذه العمليات إلى عنصر دفق تفاعلي آخر Flux<CosmosItemOperation>. ثم يتم تمرير الدفق إلى executeBulkOperations طريقة غير المتزامن container التي أنشأناها في البداية، ويتم تنفيذ العمليات بشكل مجمّع. انظر طريقة الإنشاء المجمعة أدناه كمثال:

    private void bulkCreateItems(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).blockLast();
    }
    
  4. توجد أيضاً فئة BulkWriter.java في نفس الدليل مثل التطبيق النموذجي. توضح هذه الفئة كيفية التعامل مع تحديد المعدل (429) وأخطاء المهلة (408) التي قد تحدث أثناء التنفيذ المجمع، وإعادة محاولة هذه العمليات بشكل فعال. يتم تنفيذه بالأساليب أدناه، كما يوضح كيفية تنفيذ التحكم في معدل النقل المحلي والعالمي.

    private void bulkUpsertItemsWithBulkWriterAbstraction() {
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() {
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group1")
                        .setTargetThroughput(200)
                        .build();
        container.enableLocalThroughputControlGroup(groupConfig);
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() {
        String controlContainerId = "throughputControlContainer";
        CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId);
        database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block();
    
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group-" + UUID.randomUUID())
                        .setTargetThroughput(200)
                        .build();
    
        GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId)
                .setControlItemRenewInterval(Duration.ofSeconds(5))
                .setControlItemExpireInterval(Duration.ofSeconds(20))
                .build();
    
        container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig);
        CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions();
        requestOptions.setThroughputControlGroupName(groupConfig.getGroupName());
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
  5. وبالإضافة إلى ذلك، توجد طرق إنشاء مجمعة في العينة توضح كيفية إضافة معالجة الاستجابة وتعيين خيارات التنفيذ:

    private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> {
    
            CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
            CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation();
    
            if (cosmosBulkOperationResponse.getException() != null) {
                logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
            } else if (cosmosBulkItemResponse == null ||
                !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) {
    
                logger.error(
                    "The operation for Item ID: [{}]  Item PartitionKey Value: [{}] did not complete " +
                        "successfully with " + "a" + " {} response code.",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName(),
                    cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a");
            } else {
                logger.info(
                    "Item ID: [{}]  Item PartitionKey Value: [{}]",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName());
                logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode());
                logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge());
            }
            if (cosmosBulkItemResponse == null) {
                return Mono.error(new IllegalStateException("No response retrieved."));
            } else {
                return Mono.just(cosmosBulkItemResponse);
            }
        }).blockLast();
    }
    
    private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) {
        CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions();
    
        // The default value for maxMicroBatchConcurrency is 1.
        // By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage.
        //
        // Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not.
        // When the RU has already been under saturation, increasing the concurrency will not help the situation,
        // rather it may cause more 429 and request timeout.
        bulkExecutionOptions.setMaxMicroBatchConcurrency(2);
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast();
    }
    

‏‫تلميحات الأداء

ضع في اعتبارك النقاط التالية؛ للحصول على أداء أفضل عند استخدام مكتبة المنفذ المجمع:

  • للحصول على أفضل أداء، قم بتشغيل التطبيق الخاص بك من جهاز Azure الظاهري في نفس المنطقة مثل منطقة كتابة حساب Azure Cosmos DB.

  • لتحقيق إنتاجية أعلى:

    • قم بتعيين حجم كومة الذاكرة المؤقتة لـ JVM إلى عدد كبير بما فيه الكفاية؛ لتجنب أية مشكلة في الذاكرة في معالجة عدد كبير من المستندات. حجم الكومة المقترح: max(3 GB, 3 * sizeof(all documents passed to bulk import API in one batch)).
    • يوجد وقت معالجة مسبقة، والذي ستتمتع بسببه بسعة معالجة أعلى عند تنفيذ عمليات مجمعة بعدد كبير من المستندات. لذا، إذا كنت تريد استيراد 10,000,000 مستند، فإن تشغيل استيراد كميات كبيرة تبلغ 10 مرات على 10 مجموعات من المستندات، فإن كل مستند من الحجم 1,000,000 أفضل من تشغيل استيراد كميات كبيرة تبلغ 100 مرة على 100 مجموعة من المستندات لكل منها بحجم 100,000 مستند.
  • يوصى بإنشاء مثيل كائن CosmosAsyncClient واحد للتطبيق بأكمله داخل جهاز ظاهري واحد يتوافق مع حاوية Azure Cosmos DB معينة.

  • ونظرًا لأن تنفيذ واجهة برمجة التطبيقات (API) لعملية واحدة غير مرغوب فيها يستهلك مجموعة كبيرة من وحدة المعالجة المركزية (CPU) الخاصة بجهاز العميل ووحدة الإدخال/ الإخراج المتصلة بالشبكة. يحدث ذلك من خلال إنتاج عدة مهام داخليًا، وتجنب إنتاج عدة مهام متزامنة داخل عملية التطبيق الخاصة بك لكل عملية تنفيذ استدعاءات API للعملية المجمعة. إذا كانت استدعاءات واجهة برمجة التطبيقات لعملية مجمعة واحدة تعمل على جهاز ظاهري واحد غير قادرة على استهلاك سعة الحاوية بالكامل (إذا كان معدل نقل الحاوية > مليون وحدة طلب/ثانية)، فمن الأفضل إنشاء أجهزة ظاهرية منفصلة لتنفيذ عمليات مجمعة في نفس وقت استدعاءات واجهة برمجة التطبيقات.

الخطوات التالية