在 Azure Cosmos DB 資料上執行大量作業
適用於:NoSQL
本教學課程會提供指示,說明在 Azure Cosmos DB Java V4 SDK 中執行大量作業。 此版本的 SDK 隨附大量執行工具程式庫內建功能。 如果您使用較舊版本的 JAVA SDK,建議您移轉至最新的版本。 Azure Cosmos DB Java V4 SDK 是目前建議用於 Java 大量支援的解決方案。
目前,只有 Azure Cosmos DB for NoSQL 和 API for Gremlin 帳戶才支援大量執行工具程式庫。 若要了解如何搭配使用大量執行工具 .NET 程式庫和 API for Gremlin,請參閱在 Azure Cosmos DB for Gremlin 中執行大量作業。
必要條件
如果您沒有 Azure 訂用帳戶,請在開始前建立免費帳戶。
您可以免費試用 Azure Cosmos DB,無須 Azure 訂用帳戶,免費又不需要承諾用量。 或者,您也可以對
https://localhost:8081
端點使用 Azure Cosmos DB 模擬器。 驗證要求中會提供主索引鍵。Java Development Kit (JDK) 1.8+ \(英文\)
在 Ubuntu 上,執行
apt-get install default-jdk
來安裝 JDK。務必設定 JAVA_HOME 環境變數,以指向 JDK 安裝所在的資料夾。
-
- 在 Ubuntu 上,您可以執行
apt-get install maven
來安裝 Maven。
- 在 Ubuntu 上,您可以執行
使用 JAVA 快速入門文章的建立資料庫帳戶一節所述的步驟,建立 Azure Cosmos DB for NoSQL 帳戶。
複製範例應用程式
現在讓我們從 GitHub 下載適用於 Azure Cosmos DB 的 Java V4 SDK 的通用範例存放庫,來切換為使用程式碼。 這些範例應用程式會執行 CRUD 作業,並在 Azure Cosmos DB 上執行其他常見作業。 若要複製存放庫,請開啟命令提示字元,瀏覽至要從其中複製應用程式的目錄,然後執行下列命令:
git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples
複製的存放庫會包含 /azure-cosmos-java-sql-api-samples/tree/main/src/main/java/com/azure/cosmos/examples/bulk/async
資料夾中的範例 SampleBulkQuickStartAsync.java
。 應用程式會產生文件並執行作業,來大量建立、upsert、取代和刪除 Azure Cosmos DB 中的項目。 在後面幾節中,我們將檢閱範例應用程式中的程式碼。
在 Azure Cosmos DB 中大量執行
- 以引數形式讀取 Azure Cosmos DB 的連接字串,並指派給 /
examples/common/AccountSettings.java
檔案中定義的變數。 必須設定這些環境變數
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key
若要執行大量範例,請指定其主要類別:
com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
CosmosAsyncClient
物件會使用下列陳述式進行初始化:client = new CosmosClientBuilder() .endpoint(AccountSettings.HOST) .key(AccountSettings.MASTER_KEY) .preferredRegions(preferredRegions) .contentResponseOnWriteEnabled(true) .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
此範例會建立異步資料庫和容器。 接著會建立多個文件,以執行大量作業。 此項目會將上述文件新增至
Flux<Family>
回應式串流物件:Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); Family johnsonFamilyItem = Families.getJohnsonFamilyItem(); Family smithFamilyItem = Families.getSmithFamilyItem(); // Setup family items to create Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
此範例包含大量建立、upsert、取代和刪除的方法。 在每個方法中,我們會將 BulkWriter
Flux<Family>
串流中的系列文件對應至CosmosBulkOperations
中的多種方法呼叫。 這些作業會新增至另一個回應式串流物件Flux<CosmosItemOperation>
。 然後,串流會傳遞至executeBulkOperations
我們在開頭時建立的非同步container
方法,並大量執行作業。 請參閱下列大量建立方法作為範例:private void bulkCreateItems(Flux<Family> families) { Flux<CosmosItemOperation> cosmosItemOperations = families.map( family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName()))); container.executeBulkOperations(cosmosItemOperations).blockLast(); }
在與應用程式範例相同的目錄中也有類別
BulkWriter.java
。 此類別會示範如何處理速率限制 (429) 和逾時 (408) 大量執行期間可能發生的錯誤,並有效地重試這些作業。 其會在下列方法中實作,也會示範如何實作本地和全域輸送量控制項。private void bulkUpsertItemsWithBulkWriterAbstraction() { Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName())); CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName())); BulkWriter bulkWriter = new BulkWriter(container); bulkWriter.scheduleWrites(andersonItemOperation); bulkWriter.scheduleWrites(wakeFieldItemOperation); bulkWriter.execute().subscribe(); } private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() { ThroughputControlGroupConfig groupConfig = new ThroughputControlGroupConfigBuilder() .setGroupName("group1") .setTargetThroughput(200) .build(); container.enableLocalThroughputControlGroup(groupConfig); Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName())); CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName())); BulkWriter bulkWriter = new BulkWriter(container); bulkWriter.scheduleWrites(andersonItemOperation); bulkWriter.scheduleWrites(wakeFieldItemOperation); bulkWriter.execute().subscribe(); } private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() { String controlContainerId = "throughputControlContainer"; CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId); database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block(); ThroughputControlGroupConfig groupConfig = new ThroughputControlGroupConfigBuilder() .setGroupName("group-" + UUID.randomUUID()) .setTargetThroughput(200) .build(); GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId) .setControlItemRenewInterval(Duration.ofSeconds(5)) .setControlItemExpireInterval(Duration.ofSeconds(20)) .build(); container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig); CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions(); requestOptions.setThroughputControlGroupName(groupConfig.getGroupName()); Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName())); CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName())); BulkWriter bulkWriter = new BulkWriter(container); bulkWriter.scheduleWrites(andersonItemOperation); bulkWriter.scheduleWrites(wakeFieldItemOperation); bulkWriter.execute().subscribe(); }
此外,範例中具有大量建立方法,其說明如何新增回應流程,和如何設定執行選項:
private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) { Flux<CosmosItemOperation> cosmosItemOperations = families.map( family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName()))); container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> { CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation(); if (cosmosBulkOperationResponse.getException() != null) { logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); } else if (cosmosBulkItemResponse == null || !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) { logger.error( "The operation for Item ID: [{}] Item PartitionKey Value: [{}] did not complete " + "successfully with " + "a" + " {} response code.", cosmosItemOperation.<Family>getItem().getId(), cosmosItemOperation.<Family>getItem().getLastName(), cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a"); } else { logger.info( "Item ID: [{}] Item PartitionKey Value: [{}]", cosmosItemOperation.<Family>getItem().getId(), cosmosItemOperation.<Family>getItem().getLastName()); logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode()); logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge()); } if (cosmosBulkItemResponse == null) { return Mono.error(new IllegalStateException("No response retrieved.")); } else { return Mono.just(cosmosBulkItemResponse); } }).blockLast(); } private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) { CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions(); // The default value for maxMicroBatchConcurrency is 1. // By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage. // // Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not. // When the RU has already been under saturation, increasing the concurrency will not help the situation, // rather it may cause more 429 and request timeout. bulkExecutionOptions.setMaxMicroBatchConcurrency(2); Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName()))); container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast(); }
效能提示
使用大量執行程式程式庫時,請考量下列各點以提升效能:
為達到最佳效能,請從位於與 Azure Cosmos DB 帳戶寫入區域相同區域的 Azure VM 中執行應用程式。
為達到更高的輸送量:
- 將 JVM 的堆積大小設定為夠大的數字,以避免在處理大量文件時發生任何記憶體問題。 建議的堆積大小:max(3 GB, 3 * sizeof(在一個批次中傳遞至大量輸入 API 的所有文件))。
- 這需要前置處理時間,在您對大量文件執行大量作業時,這可讓您取得更高的輸送量。 因此,如果您想要匯入 10,000,000 份文件,較好的方式是對 10 份大量文件 (每份中有 1,000,000 份文件) 執行 10 次大量匯入,而不是對 100 份大量文件 (每份中有 100,000 份文件) 執行 100 次大量匯入。
在對應至特定 Azure Cosmos DB 容器的單一虛擬機器中,建議為整個應用程式具現化單一 CosmosAsyncClient 物件。
單一大量作業 API 執行會取用大量用戶端機器的 CPU 和網路 IO。 這是因為由內部繁衍出多個工作,因此請避免在每次執行大量作業 API 呼叫時,您的應用程式處理程序內繁衍出多個並行工作。 如果單一虛擬機器上執行的單一大量作業 API 呼叫無法耗用整個容器的輸送量 (如果容器的輸送量 > 100 萬個 RU/秒),最好建立個別虛擬機器來並行執行大量作業 API 呼叫。
下一步
- 如需大量執行工具功能的概觀,請參閱大量執行工具概觀。