在 Azure Cosmos DB 資料上執行大量作業

適用於:NoSQL

本教學課程會提供指示,說明在 Azure Cosmos DB Java V4 SDK 中執行大量作業。 此版本的 SDK 隨附大量執行工具程式庫內建功能。 如果您使用較舊版本的 JAVA SDK,建議您移轉至最新的版本。 Azure Cosmos DB Java V4 SDK 是目前建議用於 Java 大量支援的解決方案。

目前,只有 Azure Cosmos DB for NoSQL 和 API for Gremlin 帳戶才支援大量執行工具程式庫。 若要了解如何搭配使用大量執行工具 .NET 程式庫和 API for Gremlin,請參閱在 Azure Cosmos DB for Gremlin 中執行大量作業

必要條件

  • 如果您沒有 Azure 訂用帳戶,請在開始前建立免費帳戶

  • 您可以免費試用 Azure Cosmos DB,無須 Azure 訂用帳戶,免費又不需要承諾用量。 或者,您也可以對 https://localhost:8081 端點使用 Azure Cosmos DB 模擬器驗證要求中會提供主索引鍵。

  • Java Development Kit (JDK) 1.8+ \(英文\)

    • 在 Ubuntu 上,執行 apt-get install default-jdk 來安裝 JDK。

    • 務必設定 JAVA_HOME 環境變數,以指向 JDK 安裝所在的資料夾。

  • 下載安裝 Maven 二進位封存檔

    • 在 Ubuntu 上,您可以執行 apt-get install maven 來安裝 Maven。
  • 使用 JAVA 快速入門文章的建立資料庫帳戶一節所述的步驟,建立 Azure Cosmos DB for NoSQL 帳戶。

複製範例應用程式

現在讓我們從 GitHub 下載適用於 Azure Cosmos DB 的 Java V4 SDK 的通用範例存放庫,來切換為使用程式碼。 這些範例應用程式會執行 CRUD 作業,並在 Azure Cosmos DB 上執行其他常見作業。 若要複製存放庫,請開啟命令提示字元,瀏覽至要從其中複製應用程式的目錄,然後執行下列命令:

 git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples 

複製的存放庫會包含 /azure-cosmos-java-sql-api-samples/tree/main/src/main/java/com/azure/cosmos/examples/bulk/async 資料夾中的範例 SampleBulkQuickStartAsync.java。 應用程式會產生文件並執行作業,來大量建立、upsert、取代和刪除 Azure Cosmos DB 中的項目。 在後面幾節中,我們將檢閱範例應用程式中的程式碼。

在 Azure Cosmos DB 中大量執行

  1. 以引數形式讀取 Azure Cosmos DB 的連接字串,並指派給 /examples/common/AccountSettings.java 檔案中定義的變數。 必須設定這些環境變數
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key

若要執行大量範例,請指定其主要類別:

com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
  1. CosmosAsyncClient 物件會使用下列陳述式進行初始化:

    client = new CosmosClientBuilder()
        .endpoint(AccountSettings.HOST)
        .key(AccountSettings.MASTER_KEY)
        .preferredRegions(preferredRegions)
        .contentResponseOnWriteEnabled(true)
        .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
    
    
  2. 此範例會建立異步資料庫和容器。 接著會建立多個文件,以執行大量作業。 此項目會將上述文件新增至 Flux<Family> 回應式串流物件:

    Family andersenFamilyItem = Families.getAndersenFamilyItem();
    Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
    Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
    Family smithFamilyItem = Families.getSmithFamilyItem();
    
    //  Setup family items to create
    Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
    
  3. 此範例包含大量建立、upsert、取代和刪除的方法。 在每個方法中,我們會將 BulkWriter Flux<Family> 串流中的系列文件對應至 CosmosBulkOperations 中的多種方法呼叫。 這些作業會新增至另一個回應式串流物件 Flux<CosmosItemOperation>。 然後,串流會傳遞至 executeBulkOperations 我們在開頭時建立的非同步 container 方法,並大量執行作業。 請參閱下列大量建立方法作為範例:

    private void bulkCreateItems(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).blockLast();
    }
    
  4. 在與應用程式範例相同的目錄中也有類別 BulkWriter.java。 此類別會示範如何處理速率限制 (429) 和逾時 (408) 大量執行期間可能發生的錯誤,並有效地重試這些作業。 其會在下列方法中實作,也會示範如何實作本地和全域輸送量控制項。

    private void bulkUpsertItemsWithBulkWriterAbstraction() {
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() {
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group1")
                        .setTargetThroughput(200)
                        .build();
        container.enableLocalThroughputControlGroup(groupConfig);
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() {
        String controlContainerId = "throughputControlContainer";
        CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId);
        database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block();
    
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group-" + UUID.randomUUID())
                        .setTargetThroughput(200)
                        .build();
    
        GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId)
                .setControlItemRenewInterval(Duration.ofSeconds(5))
                .setControlItemExpireInterval(Duration.ofSeconds(20))
                .build();
    
        container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig);
        CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions();
        requestOptions.setThroughputControlGroupName(groupConfig.getGroupName());
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
  5. 此外,範例中具有大量建立方法,其說明如何新增回應流程,和如何設定執行選項:

    private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> {
    
            CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
            CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation();
    
            if (cosmosBulkOperationResponse.getException() != null) {
                logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
            } else if (cosmosBulkItemResponse == null ||
                !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) {
    
                logger.error(
                    "The operation for Item ID: [{}]  Item PartitionKey Value: [{}] did not complete " +
                        "successfully with " + "a" + " {} response code.",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName(),
                    cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a");
            } else {
                logger.info(
                    "Item ID: [{}]  Item PartitionKey Value: [{}]",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName());
                logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode());
                logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge());
            }
            if (cosmosBulkItemResponse == null) {
                return Mono.error(new IllegalStateException("No response retrieved."));
            } else {
                return Mono.just(cosmosBulkItemResponse);
            }
        }).blockLast();
    }
    
    private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) {
        CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions();
    
        // The default value for maxMicroBatchConcurrency is 1.
        // By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage.
        //
        // Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not.
        // When the RU has already been under saturation, increasing the concurrency will not help the situation,
        // rather it may cause more 429 and request timeout.
        bulkExecutionOptions.setMaxMicroBatchConcurrency(2);
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast();
    }
    

    效能提示

    使用大量執行程式程式庫時,請考量下列各點以提升效能:

    • 為達到最佳效能,請從位於與 Azure Cosmos DB 帳戶寫入區域相同區域的 Azure VM 中執行應用程式。

    • 為達到更高的輸送量:

      • 將 JVM 的堆積大小設定為夠大的數字,以避免在處理大量文件時發生任何記憶體問題。 建議的堆積大小:max(3 GB, 3 * sizeof(在一個批次中傳遞至大量輸入 API 的所有文件))。
      • 這需要前置處理時間,在您對大量文件執行大量作業時,這可讓您取得更高的輸送量。 因此,如果您想要匯入 10,000,000 份文件,較好的方式是對 10 份大量文件 (每份中有 1,000,000 份文件) 執行 10 次大量匯入,而不是對 100 份大量文件 (每份中有 100,000 份文件) 執行 100 次大量匯入。
    • 在對應至特定 Azure Cosmos DB 容器的單一虛擬機器中,建議為整個應用程式具現化單一 CosmosAsyncClient 物件。

    • 單一大量作業 API 執行會取用大量用戶端機器的 CPU 和網路 IO。 這是因為由內部繁衍出多個工作,因此請避免在每次執行大量作業 API 呼叫時,您的應用程式處理程序內繁衍出多個並行工作。 如果單一虛擬機器上執行的單一大量作業 API 呼叫無法耗用整個容器的輸送量 (如果容器的輸送量 > 100 萬個 RU/秒),最好建立個別虛擬機器來並行執行大量作業 API 呼叫。

    下一步