إجراء عمليات مجمعة على بيانات Azure Cosmos DB
ينطبق على: NoSQL
يقدم هذا البرنامج التعليمي إرشادات حول إجراء عمليات مجمعة في Azure Cosmos DB Java V4 SDK. يأتي هذا الإصدار من SDK مزوداً بمكتبة المنفذ المجمعة المضمنة. إذا كنت تستخدم إصداراً أقدم من Java SDK، فمن المستحسن الانتقال إلى أحدث إصدار. Azure Cosmos DB Java V4 SDK هو الحل الحالي الموصى به لدعم Java بالجملة.
حاليا، يتم دعم مكتبة المنفذ المجمع فقط من قبل Azure Cosmos DB ل NoSQL وواجهة برمجة التطبيقات لحسابات Gremlin. للتعرف على استخدام مكتبة .NET للمنفذ المجمع مع واجهة برمجة التطبيقات ل Gremlin، راجع تنفيذ عمليات مجمعة في Azure Cosmos DB ل Gremlin.
المتطلبات الأساسية
في حال لم يكن لديك اشتراك Azure، فأنشئ حساباً مجانيّاً قبل البدء.
يمكنك تجربة Azure Cosmos DB مجانا دون اشتراك Azure، مجانا والتزامات. أو يمكنك استخدام Azure Cosmos DB Emulator مع
https://localhost:8081
نقطة النهاية. يتم توفير المفتاح الأساسي في طلبات المصادقة.Java Development Kit (JDK) 1.8+
على Ubuntu، شغّل
apt-get install default-jdk
لتثبيت JDK.تأكد من ضبط متغير بيئة التشغيل JAVA_HOME ليشير إلى المجلد حيث تم تركيب JDK.
تنزيل أرشيف Maven ثنائي وتثبيته
- على Ubuntu، يمكنك تشغيل
apt-get install maven
لتثبيت Maven.
- على Ubuntu، يمكنك تشغيل
إنشاء حساب Azure Cosmos DB ل NoSQL باستخدام الخطوات الموضحة في قسم إنشاء حساب قاعدة بيانات من مقالة التشغيل السريع ل Java.
استنساخ نموذج التطبيق
الآن دعنا ننتقل إلى العمل باستخدام التعليمات البرمجية عن طريق تنزيل مستودع عينات عام لـ Java V4 SDK لـ Azure Cosmos DB من GitHub. تؤدي نماذج التطبيقات هذه عمليات CRUD وعمليات أخرى شائعة على Azure Cosmos DB. لاستنساخ المستودع، افتح موجه الأوامر، وانتقل إلى الدليل حيث تريد نسخ التطبيق وتشغيل الأمر التالي:
git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples
يحتوي المستودع المستنسخ على نموذج SampleBulkQuickStartAsync.java
في المجلد /azure-cosmos-java-sql-api-samples/tree/main/src/main/java/com/azure/cosmos/examples/bulk/async
. ينشئ التطبيق المستندات وينفذ العمليات لإنشاء العناصر بشكل مجمّع، ورفعها، واستبدالها، وحذفها في Azure Cosmos DB. في الأقسام التالية، سنراجع التعليمة البرمجية في نموذج التطبيق.
التنفيذ المجمع في Azure Cosmos DB
- تتم قراءة سلاسل اتصال Azure Cosmos DB كوسائط، ويتم تخصيصها للمتغيرات المحددة في ملف /
examples/common/AccountSettings.java
. يجب تعيين متغيرات البيئة هذه
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key
لتشغيل العينة المجمعة، حدد فئتها الرئيسية:
com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
تتم تهيئة العنصر
CosmosAsyncClient
باستخدام العبارات التالية:client = new CosmosClientBuilder() .endpoint(AccountSettings.HOST) .key(AccountSettings.MASTER_KEY) .preferredRegions(preferredRegions) .contentResponseOnWriteEnabled(true) .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
يقوم النموذج بإنشاء قاعدة بيانات وحاوية غير متزامنة. ثم يقوم بإنشاء مستندات متعددة سيتم تنفيذ عمليات مجمعة عليها. يضيف هذا المستندات إلى عنصر دفق تفاعلي
Flux<Family>
:Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); Family johnsonFamilyItem = Families.getJohnsonFamilyItem(); Family smithFamilyItem = Families.getSmithFamilyItem(); // Setup family items to create Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
يحتوي النموذج على طرق للإنشاء المجمع، والرفع، والاستبدال، والحذف. في كل طريقة، نقوم بتعيين مستندات العائلات في دفق BulkWriter
Flux<Family>
لاستدعاءات طريقة متعددة فيCosmosBulkOperations
. تتم إضافة هذه العمليات إلى عنصر دفق تفاعلي آخرFlux<CosmosItemOperation>
. ثم يتم تمرير الدفق إلىexecuteBulkOperations
طريقة غير المتزامنcontainer
التي أنشأناها في البداية، ويتم تنفيذ العمليات بشكل مجمّع. انظر طريقة الإنشاء المجمعة أدناه كمثال:private void bulkCreateItems(Flux<Family> families) { Flux<CosmosItemOperation> cosmosItemOperations = families.map( family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName()))); container.executeBulkOperations(cosmosItemOperations).blockLast(); }
توجد أيضاً فئة
BulkWriter.java
في نفس الدليل مثل التطبيق النموذجي. توضح هذه الفئة كيفية التعامل مع تحديد المعدل (429) وأخطاء المهلة (408) التي قد تحدث أثناء التنفيذ المجمع، وإعادة محاولة هذه العمليات بشكل فعال. يتم تنفيذه بالأساليب أدناه، كما يوضح كيفية تنفيذ التحكم في معدل النقل المحلي والعالمي.private void bulkUpsertItemsWithBulkWriterAbstraction() { Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName())); CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName())); BulkWriter bulkWriter = new BulkWriter(container); bulkWriter.scheduleWrites(andersonItemOperation); bulkWriter.scheduleWrites(wakeFieldItemOperation); bulkWriter.execute().subscribe(); } private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() { ThroughputControlGroupConfig groupConfig = new ThroughputControlGroupConfigBuilder() .setGroupName("group1") .setTargetThroughput(200) .build(); container.enableLocalThroughputControlGroup(groupConfig); Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName())); CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName())); BulkWriter bulkWriter = new BulkWriter(container); bulkWriter.scheduleWrites(andersonItemOperation); bulkWriter.scheduleWrites(wakeFieldItemOperation); bulkWriter.execute().subscribe(); } private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() { String controlContainerId = "throughputControlContainer"; CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId); database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block(); ThroughputControlGroupConfig groupConfig = new ThroughputControlGroupConfigBuilder() .setGroupName("group-" + UUID.randomUUID()) .setTargetThroughput(200) .build(); GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId) .setControlItemRenewInterval(Duration.ofSeconds(5)) .setControlItemExpireInterval(Duration.ofSeconds(20)) .build(); container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig); CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions(); requestOptions.setThroughputControlGroupName(groupConfig.getGroupName()); Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName())); CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName())); BulkWriter bulkWriter = new BulkWriter(container); bulkWriter.scheduleWrites(andersonItemOperation); bulkWriter.scheduleWrites(wakeFieldItemOperation); bulkWriter.execute().subscribe(); }
وبالإضافة إلى ذلك، توجد طرق إنشاء مجمعة في العينة توضح كيفية إضافة معالجة الاستجابة وتعيين خيارات التنفيذ:
private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) { Flux<CosmosItemOperation> cosmosItemOperations = families.map( family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName()))); container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> { CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation(); if (cosmosBulkOperationResponse.getException() != null) { logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); } else if (cosmosBulkItemResponse == null || !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) { logger.error( "The operation for Item ID: [{}] Item PartitionKey Value: [{}] did not complete " + "successfully with " + "a" + " {} response code.", cosmosItemOperation.<Family>getItem().getId(), cosmosItemOperation.<Family>getItem().getLastName(), cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a"); } else { logger.info( "Item ID: [{}] Item PartitionKey Value: [{}]", cosmosItemOperation.<Family>getItem().getId(), cosmosItemOperation.<Family>getItem().getLastName()); logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode()); logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge()); } if (cosmosBulkItemResponse == null) { return Mono.error(new IllegalStateException("No response retrieved.")); } else { return Mono.just(cosmosBulkItemResponse); } }).blockLast(); } private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) { CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions(); // The default value for maxMicroBatchConcurrency is 1. // By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage. // // Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not. // When the RU has already been under saturation, increasing the concurrency will not help the situation, // rather it may cause more 429 and request timeout. bulkExecutionOptions.setMaxMicroBatchConcurrency(2); Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName()))); container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast(); }
تلميحات الأداء
ضع في اعتبارك النقاط التالية؛ للحصول على أداء أفضل عند استخدام مكتبة المنفذ المجمع:
للحصول على أفضل أداء، قم بتشغيل التطبيق الخاص بك من جهاز Azure الظاهري في نفس المنطقة مثل منطقة كتابة حساب Azure Cosmos DB.
لتحقيق إنتاجية أعلى:
- قم بتعيين حجم كومة الذاكرة المؤقتة لـ JVM إلى عدد كبير بما فيه الكفاية؛ لتجنب أية مشكلة في الذاكرة في معالجة عدد كبير من المستندات. حجم الكومة المقترح: max(3 GB, 3 * sizeof(all documents passed to bulk import API in one batch)).
- يوجد وقت معالجة مسبقة، والذي ستتمتع بسببه بسعة معالجة أعلى عند تنفيذ عمليات مجمعة بعدد كبير من المستندات. لذا، إذا كنت تريد استيراد 10,000,000 مستند، فإن تشغيل استيراد كميات كبيرة تبلغ 10 مرات على 10 مجموعات من المستندات، فإن كل مستند من الحجم 1,000,000 أفضل من تشغيل استيراد كميات كبيرة تبلغ 100 مرة على 100 مجموعة من المستندات لكل منها بحجم 100,000 مستند.
يوصى بإنشاء مثيل كائن CosmosAsyncClient واحد للتطبيق بأكمله داخل جهاز ظاهري واحد يتوافق مع حاوية Azure Cosmos DB معينة.
ونظرًا لأن تنفيذ واجهة برمجة التطبيقات (API) لعملية واحدة غير مرغوب فيها يستهلك مجموعة كبيرة من وحدة المعالجة المركزية (CPU) الخاصة بجهاز العميل ووحدة الإدخال/ الإخراج المتصلة بالشبكة. يحدث ذلك من خلال إنتاج عدة مهام داخليًا، وتجنب إنتاج عدة مهام متزامنة داخل عملية التطبيق الخاصة بك لكل عملية تنفيذ استدعاءات API للعملية المجمعة. إذا كانت استدعاءات واجهة برمجة التطبيقات لعملية مجمعة واحدة تعمل على جهاز ظاهري واحد غير قادرة على استهلاك سعة الحاوية بالكامل (إذا كان معدل نقل الحاوية > مليون وحدة طلب/ثانية)، فمن الأفضل إنشاء أجهزة ظاهرية منفصلة لتنفيذ عمليات مجمعة في نفس وقت استدعاءات واجهة برمجة التطبيقات.
الخطوات التالية
- للحصول على نظرة عامة على وظيفة المنفذ المجمع، راجع نظرة عامة على المنفذ المجمع.