你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

针对 Azure Cosmos DB 数据执行批量操作

适用范围: NoSQL

在本教程中,可了解如何在 Azure Cosmos DB Java V4 SDK 上执行批量操作。 此版本的 SDK 附带内置批量执行工具库。 如果使用旧版 Java SDK,建议迁移到最新版本。 Azure Cosmos DB Java V4 SDK 是当前建议用于 Java 批量支持的解决方案。

目前,批量执行工具库仅受 Azure Cosmos DB for NoSQL 和 API for Gremlin 帐户支持。 若要了解如何将批量执行工具 .NET 库与 API for Gremlin 配合使用,请参阅在 Azure Cosmos DB for Gremlin 中执行批量操作

先决条件

克隆示例应用程序

现在,让我们从 GitHub 下载适用于 Java V4 SDK for Azure Cosmos DB 的通用示例存储库来接着处理代码。 这些示例应用程序对 Azure Cosmos DB 执行 CRUD 操作和其他常见操作。 若要克隆该存储库,请打开命令提示符,导航到要将该应用程序复制到的目录,然后运行以下命令:

 git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples 

克隆的存储库在 /azure-cosmos-java-sql-api-samples/tree/main/src/main/java/com/azure/cosmos/examples/bulk/async 文件夹中包含示例 SampleBulkQuickStartAsync.java。 应用程序生成文档并执行操作,来批量创建、更新插入、替换和删除 Azure Cosmos DB 中的项。 在后续部分,我们将查看示例应用中的代码。

Azure Cosmos DB 中的批量执行

  1. Azure Cosmos DB 的连接字符串将作为参数读取,并分配到在 /examples/common/AccountSettings.java 文件中定义的变量。 必须设置这些环境变量
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key

若要运行批量示例,请指定其 Main 类:

com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
  1. 使用以下语句初始化 CosmosAsyncClient 对象:

    client = new CosmosClientBuilder()
        .endpoint(AccountSettings.HOST)
        .key(AccountSettings.MASTER_KEY)
        .preferredRegions(preferredRegions)
        .contentResponseOnWriteEnabled(true)
        .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
    
    
  2. 该示例创建了一个异步数据库和容器。 然后,它会创建多个要对其执行批量操作的文档。 它将这些文档添加到 Flux<Family> 反应式流对象中:

    Family andersenFamilyItem = Families.getAndersenFamilyItem();
    Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
    Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
    Family smithFamilyItem = Families.getSmithFamilyItem();
    
    //  Setup family items to create
    Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
    
  3. 该示例包含用于批量创建、更新插入、替换和删除的方法。 在每个方法中,我们将 BulkWriter Flux<Family> 流中的系列文档映射到 CosmosBulkOperations 中的多个方法调用。 这些操作将添加到另一个反应式流对象 Flux<CosmosItemOperation>。 然后,流会传递给我们在开始时创建的异步 containerexecuteBulkOperations 方法,并批量执行操作。 以下面的批量创建方法为例:

    private void bulkCreateItems(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).blockLast();
    }
    
  4. 示例应用程序所在的目录中也有一个 BulkWriter.java 类。 此类演示了如何处理批量执行期间可能发生的速率限制 (429) 和超时 (408) 错误,以及如何有效地重试这些操作。 此方法在以下方法中实现,还显示如何实现本地和全局吞吐量控制。

    private void bulkUpsertItemsWithBulkWriterAbstraction() {
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() {
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group1")
                        .setTargetThroughput(200)
                        .build();
        container.enableLocalThroughputControlGroup(groupConfig);
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() {
        String controlContainerId = "throughputControlContainer";
        CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId);
        database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block();
    
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group-" + UUID.randomUUID())
                        .setTargetThroughput(200)
                        .build();
    
        GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId)
                .setControlItemRenewInterval(Duration.ofSeconds(5))
                .setControlItemExpireInterval(Duration.ofSeconds(20))
                .build();
    
        container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig);
        CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions();
        requestOptions.setThroughputControlGroupName(groupConfig.getGroupName());
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
  5. 此外,示例中还有批量创建方法,演示如何添加响应处理和设置执行选项:

    private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> {
    
            CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
            CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation();
    
            if (cosmosBulkOperationResponse.getException() != null) {
                logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
            } else if (cosmosBulkItemResponse == null ||
                !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) {
    
                logger.error(
                    "The operation for Item ID: [{}]  Item PartitionKey Value: [{}] did not complete " +
                        "successfully with " + "a" + " {} response code.",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName(),
                    cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a");
            } else {
                logger.info(
                    "Item ID: [{}]  Item PartitionKey Value: [{}]",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName());
                logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode());
                logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge());
            }
            if (cosmosBulkItemResponse == null) {
                return Mono.error(new IllegalStateException("No response retrieved."));
            } else {
                return Mono.just(cosmosBulkItemResponse);
            }
        }).blockLast();
    }
    
    private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) {
        CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions();
    
        // The default value for maxMicroBatchConcurrency is 1.
        // By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage.
        //
        // Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not.
        // When the RU has already been under saturation, increasing the concurrency will not help the situation,
        // rather it may cause more 429 and request timeout.
        bulkExecutionOptions.setMaxMicroBatchConcurrency(2);
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast();
    }
    

性能提示

使用 Bulk Executor 库时,请注意以下几点,以获得更好的性能:

  • 为获得最佳性能,请在 Azure VM 中运行应用程序,该 Azure VM 所处区域与 Azure Cosmos DB 帐户写入区域相同。

  • 为实现更高的吞吐量:

    • 请将 JVM 的堆大小设为足够大的数字,以免在处理大量文档时出现任何内存问题。 建议的堆大小:max(3GB, 3 * sizeof(在一个批中传递给批量导入 API 的文档总数))。
    • 会有一段预处理时间,因此,在对大量的文档执行批量操作时可以获得更高的吞吐量。 如果想要导入 10,000,000 个文档,针对 10 批文档(每个批的大小为 1,000,000)运行批量导入 10 次,比针对 100 批文档(每个批的大小为 100,000 个文档)运行批量导入 100 次会更有利。
  • 建议在单个虚拟机中,为整个应用程序实例化对应于特定 Azure Cosmos DB 容器的单个 CosmosAsyncClient 对象。

  • 原因是单个批量操作 API 执行会消耗客户端计算机的大量 CPU 和网络 IO。 而发生这种情况的原因是在内部生成了多个任务,因此,每次执行批量操作 API 调用时,请避免在应用程序进程中生成多个并发任务。 如果单个虚拟机上运行的单个批量操作 API 调用无法占用整个容器的吞吐量(如果容器吞吐量超过 100 万 RU/秒),最好是创建独立的虚拟机来并发执行批量操作 API 调用。

后续步骤